Skip to content

Commit 4a67e58

Browse files
committed
remove prefered order feature
1 parent f10a34b commit 4a67e58

4 files changed

Lines changed: 16 additions & 81 deletions

File tree

aggregation_mode/db/src/orchestrator.rs

Lines changed: 6 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,4 @@
1-
use std::{
2-
future::Future,
3-
sync::{
4-
atomic::{AtomicBool, Ordering},
5-
Arc,
6-
},
7-
time::Duration,
8-
};
1+
use std::{future::Future, sync::Arc, time::Duration};
92

103
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
114

@@ -21,8 +14,6 @@ enum Operation {
2114
#[derive(Debug)]
2215
struct DbNode {
2316
pool: Pool<Postgres>,
24-
last_read_failed: AtomicBool,
25-
last_write_failed: AtomicBool,
2617
}
2718

2819
/// Database orchestrator for running reads/writes across multiple PostgreSQL nodes with retry/backoff.
@@ -77,11 +68,7 @@ impl DbOrchestrator {
7768
.map(|url| {
7869
let pool = PgPoolOptions::new().max_connections(5).connect_lazy(url)?;
7970

80-
Ok(Arc::new(DbNode {
81-
pool,
82-
last_read_failed: AtomicBool::new(false),
83-
last_write_failed: AtomicBool::new(false),
84-
}))
71+
Ok(Arc::new(DbNode { pool }))
8572
})
8673
.collect::<Result<Vec<_>, sqlx::Error>>()
8774
.map_err(DbOrchestratorError::Sqlx)?;
@@ -92,23 +79,7 @@ impl DbOrchestrator {
9279
})
9380
}
9481

95-
pub async fn write<T, Q, Fut>(&self, query: Q) -> Result<T, sqlx::Error>
96-
where
97-
Q: Fn(Pool<Postgres>) -> Fut,
98-
Fut: Future<Output = Result<T, sqlx::Error>>,
99-
{
100-
self.query::<T, Q, Fut>(query, Operation::Write).await
101-
}
102-
103-
pub async fn read<T, Q, Fut>(&self, query: Q) -> Result<T, sqlx::Error>
104-
where
105-
Q: Fn(Pool<Postgres>) -> Fut,
106-
Fut: Future<Output = Result<T, sqlx::Error>>,
107-
{
108-
self.query::<T, Q, Fut>(query, Operation::Read).await
109-
}
110-
111-
async fn query<T, Q, Fut>(&self, query_fn: Q, operation: Operation) -> Result<T, sqlx::Error>
82+
pub async fn query<T, Q, Fut>(&self, query_fn: Q) -> Result<T, sqlx::Error>
11283
where
11384
Q: Fn(Pool<Postgres>) -> Fut,
11485
Fut: Future<Output = Result<T, sqlx::Error>>,
@@ -117,7 +88,7 @@ impl DbOrchestrator {
11788
let mut delay = Duration::from_millis(self.retry_config.min_delay_millis);
11889

11990
loop {
120-
match self.execute_once(&query_fn, operation).await {
91+
match self.execute_once(&query_fn).await {
12192
Ok(value) => return Ok(value),
12293
Err(RetryError::Permanent(err)) => return Err(err),
12394
Err(RetryError::Transient(err)) => {
@@ -134,38 +105,23 @@ impl DbOrchestrator {
134105
}
135106
}
136107

137-
async fn execute_once<T, Q, Fut>(
138-
&self,
139-
query_fn: &Q,
140-
operation: Operation,
141-
) -> Result<T, RetryError<sqlx::Error>>
108+
async fn execute_once<T, Q, Fut>(&self, query_fn: &Q) -> Result<T, RetryError<sqlx::Error>>
142109
where
143110
Q: Fn(Pool<Postgres>) -> Fut,
144111
Fut: Future<Output = Result<T, sqlx::Error>>,
145112
{
146113
let mut last_error = None;
147114

148-
for idx in self.preferred_order(operation) {
149-
let node = &self.nodes[idx];
115+
for (idx, node) in self.nodes.iter().enumerate() {
150116
let pool = node.pool.clone();
151117

152118
match query_fn(pool).await {
153119
Ok(res) => {
154-
match operation {
155-
Operation::Read => node.last_read_failed.store(false, Ordering::Relaxed),
156-
Operation::Write => node.last_write_failed.store(false, Ordering::Relaxed),
157-
};
158120
return Ok(res);
159121
}
160122
Err(err) => {
161123
if Self::is_connection_error(&err) {
162124
tracing::warn!(node_index = idx, error = ?err, "database query failed");
163-
match operation {
164-
Operation::Read => node.last_read_failed.store(true, Ordering::Relaxed),
165-
Operation::Write => {
166-
node.last_write_failed.store(true, Ordering::Relaxed)
167-
}
168-
};
169125
last_error = Some(err);
170126
} else {
171127
return Err(RetryError::Permanent(err));
@@ -179,27 +135,6 @@ impl DbOrchestrator {
179135
))
180136
}
181137

182-
fn preferred_order(&self, operation: Operation) -> Vec<usize> {
183-
let mut preferred = Vec::with_capacity(self.nodes.len());
184-
let mut fallback = Vec::new();
185-
186-
for (idx, node) in self.nodes.iter().enumerate() {
187-
let failed = match operation {
188-
Operation::Read => node.last_read_failed.load(Ordering::Relaxed),
189-
Operation::Write => node.last_write_failed.load(Ordering::Relaxed),
190-
};
191-
192-
if failed {
193-
fallback.push(idx);
194-
} else {
195-
preferred.push(idx);
196-
}
197-
}
198-
199-
preferred.extend(fallback);
200-
preferred
201-
}
202-
203138
fn is_connection_error(error: &sqlx::Error) -> bool {
204139
matches!(
205140
error,

aggregation_mode/gateway/src/db.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl Db {
6060

6161
pub async fn count_tasks_by_address(&self, address: &str) -> Result<i64, sqlx::Error> {
6262
self.orchestrator
63-
.read(async |pool| {
63+
.query(async |pool| {
6464
let (count,) =
6565
sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM tasks WHERE address = $1")
6666
.bind(address.to_lowercase())
@@ -77,7 +77,7 @@ impl Db {
7777
task_id: Uuid,
7878
) -> Result<Option<Vec<u8>>, sqlx::Error> {
7979
self.orchestrator
80-
.read(async |pool| {
80+
.query(async |pool| {
8181
sqlx::query_scalar::<_, Option<Vec<u8>>>(
8282
"SELECT merkle_path FROM tasks WHERE task_id = $1",
8383
)
@@ -95,7 +95,7 @@ impl Db {
9595
nonce: i64,
9696
) -> Result<Vec<Receipt>, sqlx::Error> {
9797
self.orchestrator
98-
.read(async |pool| {
98+
.query(async |pool| {
9999
sqlx::query_as::<_, Receipt>(
100100
"SELECT status,merkle_path,nonce,address FROM tasks
101101
WHERE address = $1
@@ -116,7 +116,7 @@ impl Db {
116116
limit: i64,
117117
) -> Result<Vec<Receipt>, sqlx::Error> {
118118
self.orchestrator
119-
.read(async |pool| {
119+
.query(async |pool| {
120120
sqlx::query_as::<_, Receipt>(
121121
"SELECT status,merkle_path,nonce,address FROM tasks
122122
WHERE address = $1
@@ -133,7 +133,7 @@ impl Db {
133133

134134
pub async fn get_daily_tasks_by_address(&self, address: &str) -> Result<i64, sqlx::Error> {
135135
self.orchestrator
136-
.read(async |pool| {
136+
.query(async |pool| {
137137
sqlx::query_scalar::<_, i64>(
138138
"SELECT COUNT(*)
139139
FROM tasks
@@ -157,7 +157,7 @@ impl Db {
157157
nonce: i64,
158158
) -> Result<Uuid, sqlx::Error> {
159159
self.orchestrator
160-
.write(async |pool| {
160+
.query(async |pool| {
161161
sqlx::query_scalar::<_, Uuid>(
162162
"INSERT INTO tasks (
163163
address,
@@ -187,7 +187,7 @@ impl Db {
187187
epoch: BigDecimal,
188188
) -> Result<bool, sqlx::Error> {
189189
self.orchestrator
190-
.read(async |pool| {
190+
.query(async |pool| {
191191
sqlx::query_scalar::<_, bool>(
192192
"SELECT EXISTS (
193193
SELECT 1 FROM payment_events

aggregation_mode/payments_poller/src/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl Db {
6262
tx_hash: &str,
6363
) -> Result<(), sqlx::Error> {
6464
self.orchestrator
65-
.write(async |pool| {
65+
.query(async |pool| {
6666
sqlx::query(
6767
"INSERT INTO payment_events (address, started_at, amount, valid_until, tx_hash)
6868
VALUES ($1, $2, $3, $4, $5)

aggregation_mode/proof_aggregator/src/backend/db.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl Db {
6060
limit: i64,
6161
) -> Result<Vec<Task>, DbError> {
6262
self.orchestrator
63-
.write(async |pool| {
63+
.query(async |pool| {
6464
sqlx::query_as::<_, Task>(
6565
"WITH selected AS (
6666
SELECT task_id
@@ -91,7 +91,7 @@ impl Db {
9191
let updates_ref = &updates;
9292

9393
self.orchestrator
94-
.write(|pool| {
94+
.query(|pool| {
9595
let updates = updates_ref;
9696
async move {
9797
let mut tx = pool.begin().await?;

0 commit comments

Comments
 (0)