Skip to content

Commit 924f140

Browse files
Add a counter for active subscriptions to the payments poller metrics
1 parent 37d8c7b commit 924f140

3 files changed

Lines changed: 55 additions & 1 deletion

File tree

aggregation_mode/payments_poller/src/db.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,21 @@ impl Db {
4343
.await
4444
.map(|_| ())
4545
}
46+
47+
pub async fn count_total_active_subscriptions(
48+
&self,
49+
epoch: BigDecimal,
50+
) -> Result<i64, sqlx::Error> {
51+
let (count,) = sqlx::query_as::<_, (i64,)>(
52+
"
53+
SELECT COUNT(*)
54+
FROM payment_events
55+
WHERE started_at < $1 AND $1 < valid_until",
56+
)
57+
.bind(epoch)
58+
.fetch_one(&self.pool)
59+
.await?;
60+
61+
Ok(count)
62+
}
4663
}

aggregation_mode/payments_poller/src/metrics.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::Arc;
66
pub struct PaymentsPollerMetrics {
77
pub registry: Registry,
88
pub last_processed_block: Gauge,
9+
pub active_subscriptions: Gauge,
910
}
1011

1112
impl PaymentsPollerMetrics {
@@ -17,14 +18,21 @@ impl PaymentsPollerMetrics {
1718
"Last processed block by poller"
1819
))?;
1920

21+
let active_subscriptions = Gauge::with_opts(opts!(
22+
"active_subscriptions",
23+
"Active payment subscriptions by poller"
24+
))?;
25+
2026
registry.register(Box::new(last_processed_block.clone()))?;
27+
registry.register(Box::new(active_subscriptions.clone()))?;
2128

2229
// Arc is used because metrics are a shared resource accessed by both the background and metrics HTTP
2330
// server and the application code, across multiple Actix worker threads. The server outlives start(),
2431
// so the data must be static and safely shared between threads.
2532
let metrics = Arc::new(Self {
2633
registry,
2734
last_processed_block,
35+
active_subscriptions,
2836
});
2937

3038
let server_metrics = metrics.clone();
@@ -63,4 +71,8 @@ impl PaymentsPollerMetrics {
6371
pub fn register_last_processed_block(&self, value: u64) {
6472
self.last_processed_block.set(value as f64);
6573
}
74+
75+
pub fn register_active_subscriptions(&self, value: i64) {
76+
self.active_subscriptions.set(value as f64);
77+
}
6678
}

aggregation_mode/payments_poller/src/payments.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use std::{str::FromStr, sync::Arc};
1+
use std::{
2+
str::FromStr,
3+
sync::Arc,
4+
time::{SystemTime, UNIX_EPOCH},
5+
};
26

37
use crate::{
48
config::Config,
@@ -131,7 +135,28 @@ impl PaymentsPoller {
131135
continue;
132136
};
133137

138+
let now_epoch = match SystemTime::now().duration_since(UNIX_EPOCH) {
139+
Ok(duration) => duration.as_secs(),
140+
Err(_) => {
141+
continue;
142+
}
143+
};
144+
145+
// Note: This implies a call to the database, and may be optimized to reduce the amount of calls
146+
let Ok(active_subscriptions_amount) = self
147+
.db
148+
.count_total_active_subscriptions(
149+
BigDecimal::from_str(&now_epoch.to_string()).unwrap(),
150+
)
151+
.await
152+
else {
153+
tracing::error!("Failed to get the active subscriptions amount");
154+
continue;
155+
};
156+
134157
self.metrics.register_last_processed_block(current_block);
158+
self.metrics
159+
.register_active_subscriptions(active_subscriptions_amount);
135160

136161
tokio::time::sleep(std::time::Duration::from_secs(
137162
seconds_to_wait_between_polls,

0 commit comments

Comments
 (0)