Created
March 30, 2023 13:14
-
-
Save JorgeMarinoDev/a08334b04f9a75f89a26e5df527cba1c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Copyright The OpenTelemetry Authors | |
| // SPDX-License-Identifier: Apache-2.0 | |
| #include "opentelemetry/sdk/metrics/metric_reader.h" | |
| #include "opentelemetry/sdk/common/global_log_handler.h" | |
| #include <onDemandMetricReader.hpp> | |
| #include "opentelemetry/sdk/metrics/push_metric_exporter.h" | |
| #include <chrono> | |
| #if defined(_MSC_VER) | |
| # pragma warning(suppress : 5204) | |
| # include <future> | |
| #else | |
| # include <future> | |
| #endif | |
| OPENTELEMETRY_BEGIN_NAMESPACE | |
| namespace sdk | |
| { | |
| namespace metrics | |
| { | |
| OnDemandMetricReader::OnDemandMetricReader( | |
| std::unique_ptr<PushMetricExporter> exporter, | |
| const OnDemandMetricReaderOptions &option) : exporter_{std::move(exporter)} | |
| { | |
| } | |
| AggregationTemporality OnDemandMetricReader::GetAggregationTemporality( | |
| InstrumentType instrument_type) const noexcept | |
| { | |
| return exporter_->GetAggregationTemporality(instrument_type); | |
| } | |
| void OnDemandMetricReader::OnInitialized() noexcept | |
| { | |
| } | |
| bool OnDemandMetricReader::CollectAndExportOnce() | |
| { | |
| std::atomic<bool> cancel_export_for_timeout{false}; | |
| auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] { | |
| Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) { | |
| if (cancel_export_for_timeout) | |
| { | |
| OTEL_INTERNAL_LOG_ERROR( | |
| "[Periodic Exporting Metric Reader] Collect took longer configured time: " | |
| << export_timeout_millis_.count() << " ms, and timed out"); | |
| return false; | |
| } | |
| this->exporter_->Export(metric_data); | |
| return true; | |
| }); | |
| }); | |
| std::future_status status; | |
| do | |
| { | |
| status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); | |
| if (status == std::future_status::timeout) | |
| { | |
| cancel_export_for_timeout = true; | |
| break; | |
| } | |
| } while (status != std::future_status::ready); | |
| return true; | |
| } | |
| bool OnDemandMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept | |
| { | |
| return exporter_->ForceFlush(timeout); | |
| } | |
| bool OnDemandMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept | |
| { | |
| return exporter_->Shutdown(timeout); | |
| } | |
| } // namespace metrics | |
| } // namespace sdk | |
| OPENTELEMETRY_END_NAMESPACE |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Copyright The OpenTelemetry Authors | |
| // SPDX-License-Identifier: Apache-2.0 | |
| #pragma once | |
| #include "opentelemetry/sdk/metrics/metric_reader.h" | |
| #include "opentelemetry/version.h" | |
| #include <atomic> | |
| #include <chrono> | |
| #include <condition_variable> | |
| #include <thread> | |
| OPENTELEMETRY_BEGIN_NAMESPACE | |
| namespace sdk | |
| { | |
| namespace metrics | |
| { | |
| class PushMetricExporter; | |
| constexpr std::chrono::milliseconds kExportTimeOutMillis = std::chrono::milliseconds(3000); | |
| struct OnDemandMetricReaderOptions | |
| { | |
| /* how long the export can run before it is cancelled. */ | |
| std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(kExportTimeOutMillis); | |
| }; | |
| class OnDemandMetricReader : public MetricReader | |
| { | |
| public: | |
| OnDemandMetricReader(std::unique_ptr<PushMetricExporter> exporter, | |
| const OnDemandMetricReaderOptions &option); | |
| AggregationTemporality GetAggregationTemporality( | |
| InstrumentType instrument_type) const noexcept override; | |
| bool CollectAndExportOnce(); | |
| private: | |
| bool OnForceFlush(std::chrono::microseconds timeout) noexcept override; | |
| bool OnShutDown(std::chrono::microseconds timeout) noexcept override; | |
| void OnInitialized() noexcept override; | |
| std::unique_ptr<PushMetricExporter> exporter_; | |
| std::chrono::milliseconds export_interval_millis_; | |
| std::chrono::milliseconds export_timeout_millis_; | |
| /* Synchronization primitives */ | |
| std::condition_variable cv_; | |
| std::mutex cv_m_; | |
| }; | |
| } // namespace metrics | |
| } // namespace sdk | |
| OPENTELEMETRY_END_NAMESPACE |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment