Skip to content

Instantly share code, notes, and snippets.

@JorgeMarinoDev
Created March 30, 2023 13:14
Show Gist options
  • Select an option

  • Save JorgeMarinoDev/a08334b04f9a75f89a26e5df527cba1c to your computer and use it in GitHub Desktop.

Select an option

Save JorgeMarinoDev/a08334b04f9a75f89a26e5df527cba1c to your computer and use it in GitHub Desktop.
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
#include "opentelemetry/sdk/metrics/metric_reader.h"
#include "opentelemetry/sdk/common/global_log_handler.h"
#include <onDemandMetricReader.hpp>
#include "opentelemetry/sdk/metrics/push_metric_exporter.h"
#include <chrono>
#if defined(_MSC_VER)
# pragma warning(suppress : 5204)
# include <future>
#else
# include <future>
#endif
OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{
OnDemandMetricReader::OnDemandMetricReader(
std::unique_ptr<PushMetricExporter> exporter,
const OnDemandMetricReaderOptions &option) : exporter_{std::move(exporter)}
{
}
AggregationTemporality OnDemandMetricReader::GetAggregationTemporality(
InstrumentType instrument_type) const noexcept
{
return exporter_->GetAggregationTemporality(instrument_type);
}
void OnDemandMetricReader::OnInitialized() noexcept
{
}
bool OnDemandMetricReader::CollectAndExportOnce()
{
std::atomic<bool> cancel_export_for_timeout{false};
auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] {
Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) {
if (cancel_export_for_timeout)
{
OTEL_INTERNAL_LOG_ERROR(
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
<< export_timeout_millis_.count() << " ms, and timed out");
return false;
}
this->exporter_->Export(metric_data);
return true;
});
});
std::future_status status;
do
{
status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_));
if (status == std::future_status::timeout)
{
cancel_export_for_timeout = true;
break;
}
} while (status != std::future_status::ready);
return true;
}
bool OnDemandMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
{
return exporter_->ForceFlush(timeout);
}
bool OnDemandMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept
{
return exporter_->Shutdown(timeout);
}
} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
#pragma once
#include "opentelemetry/sdk/metrics/metric_reader.h"
#include "opentelemetry/version.h"
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <thread>
OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{
class PushMetricExporter;
constexpr std::chrono::milliseconds kExportTimeOutMillis = std::chrono::milliseconds(3000);
struct OnDemandMetricReaderOptions
{
/* how long the export can run before it is cancelled. */
std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(kExportTimeOutMillis);
};
class OnDemandMetricReader : public MetricReader
{
public:
OnDemandMetricReader(std::unique_ptr<PushMetricExporter> exporter,
const OnDemandMetricReaderOptions &option);
AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) const noexcept override;
bool CollectAndExportOnce();
private:
bool OnForceFlush(std::chrono::microseconds timeout) noexcept override;
bool OnShutDown(std::chrono::microseconds timeout) noexcept override;
void OnInitialized() noexcept override;
std::unique_ptr<PushMetricExporter> exporter_;
std::chrono::milliseconds export_interval_millis_;
std::chrono::milliseconds export_timeout_millis_;
/* Synchronization primitives */
std::condition_variable cv_;
std::mutex cv_m_;
};
} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment