1818#include "google/cloud/internal/clock.h"
1919#include "google/cloud/status_or.h"
2020#include "google/cloud/version.h"
21- #include <chrono>
22- #include <deque>
2321#include <memory>
2422#include <mutex>
2523#include <utility>
@@ -35,67 +33,15 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3533template <typename T>
3634class ChannelUsage : public std::enable_shared_from_this<ChannelUsage<T>> {
3735 public:
38- using Clock = ::google::cloud::internal::SteadyClock;
3936 ChannelUsage() = default;
40- explicit ChannelUsage(std::shared_ptr<T> stub, std::shared_ptr<Clock> clock =
41- std::make_shared<Clock>())
42- : stub_(std::move(stub)), clock_(std::move(clock)) {}
37+ explicit ChannelUsage(std::shared_ptr<T> stub) : stub_(std::move(stub)) {}
4338
4439 // This constructor is only used in testing.
45- ChannelUsage(std::shared_ptr<T> stub, std::shared_ptr<Clock> clock ,
46- int initial_outstanding_rpcs )
40+ ChannelUsage(std::shared_ptr<T> stub, int initial_outstanding_rpcs ,
41+ Status last_refresh_status = {} )
4742 : stub_(std::move(stub)),
48- clock_(std::move(clock)),
49- outstanding_rpcs_(initial_outstanding_rpcs) {}
50-
51- // Computes the weighted average of outstanding RPCs on the channel over the
52- // past 60 seconds.
53- StatusOr<int> average_outstanding_rpcs() {
54- auto constexpr kWindowSeconds = 60;
55- auto constexpr kWindowDuration = std::chrono::seconds(kWindowSeconds);
56- std::scoped_lock lk(mu_);
57- if (!last_refresh_status_.ok()) return last_refresh_status_;
58- // If there are no measurements then the stub has never been used. In real
59- // use this will be 0. In testing we sometimes set an initial value.
60- if (measurements_.empty()) return outstanding_rpcs_;
61- auto now = clock_->Now();
62- auto last_time = now;
63- auto window_start = now - kWindowDuration;
64-
65- double sum = 0.0;
66- double total_weight = 0.0;
67- auto iter = measurements_.rbegin();
68- while (iter != measurements_.rend() && iter->timestamp >= window_start) {
69- double weight =
70- std::chrono::duration<double>(last_time - iter->timestamp).count();
71- last_time = iter->timestamp;
72- sum += iter->outstanding_rpcs * weight;
73- total_weight += weight;
74- ++iter;
75- }
76-
77- // It's unlikely we will have a measurement at precisely the beginning of
78- // the window. So, we need to use the first measurement outside the window
79- // to compute a measurement for the missing part of the window using a
80- // weight equal to the missing time.
81- if (iter != measurements_.rend()) {
82- double weight = std::max(0.0, kWindowSeconds - total_weight);
83- sum += iter->outstanding_rpcs * weight;
84- total_weight += weight;
85- // We want to keep one measurement that's at least 60s old to provide a
86- // starting value for the next window.
87- ++iter;
88- }
89-
90- if (measurements_.size() > 1) {
91- measurements_.erase(measurements_.begin(), iter.base());
92- }
93- // After iterating through the measurements if the total_weight is zero,
94- // then all of the measurements occurred at time == now, and returning the
95- // current number of outstanding RPCs is most correct.
96- return total_weight == 0.0 ? outstanding_rpcs_
97- : static_cast<int>(sum / total_weight);
98- }
43+ outstanding_rpcs_(initial_outstanding_rpcs),
44+ last_refresh_status_(std::move(last_refresh_status)) {}
9945
10046 StatusOr<int> instant_outstanding_rpcs() {
10147 std::scoped_lock lk(mu_);
@@ -123,32 +69,19 @@ class ChannelUsage : public std::enable_shared_from_this<ChannelUsage<T>> {
12369 std::shared_ptr<T> AcquireStub() {
12470 std::scoped_lock lk(mu_);
12571 ++outstanding_rpcs_;
126- auto time = clock_->Now();
127- measurements_.emplace_back(outstanding_rpcs_, time);
12872 return stub_;
12973 }
13074
13175 void ReleaseStub() {
13276 std::scoped_lock lk(mu_);
13377 --outstanding_rpcs_;
134- measurements_.emplace_back(outstanding_rpcs_, clock_->Now());
13578 }
13679
13780 private:
13881 mutable std::mutex mu_;
13982 std::shared_ptr<T> stub_;
140- std::shared_ptr<Clock> clock_ = std::make_shared<Clock>();
14183 int outstanding_rpcs_ = 0;
14284 Status last_refresh_status_;
143- struct Measurement {
144- Measurement(int outstanding_rpcs, std::chrono::steady_clock::time_point p)
145- : outstanding_rpcs(outstanding_rpcs), timestamp(p) {}
146- int outstanding_rpcs;
147- std::chrono::steady_clock::time_point timestamp;
148- };
149- // Older measurements are removed as part of the average_outstanding_rpcs
150- // method.
151- std::deque<Measurement> measurements_;
15285};
15386
15487GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
0 commit comments