Skip to content

Commit d42d1c7

Browse files
committed
graph, store: Use ctid-based join delete for call cache eviction
Replace the two-loop structure (fetch stale contracts, then delete their call_cache rows in tiny batches) with a single adaptive loop that joins call_cache to call_meta directly and deletes by ctid. The old approach did `WHERE contract_address = ANY(N addresses)` on the 6.9B row call_cache table repeatedly, which required expensive index scans. The new approach lets Postgres join the small filtered call_meta result against call_cache using the contract_address index, then deletes by physical row address (ctid), which is the fastest possible delete path. Also removes the GRAPH_STORE_STALE_CALL_CACHE_CONTRACTS_BATCH_SIZE env var since batch size is now fully controlled by AdaptiveBatchSize.
1 parent a56cc78 commit d42d1c7

2 files changed

Lines changed: 83 additions & 140 deletions

File tree

graph/src/env/store.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,6 @@ pub struct EnvVarsStore {
171171
/// Disables storing or reading `eth_call` results from the store call cache.
172172
/// Set by `GRAPH_STORE_DISABLE_CALL_CACHE`. Defaults to false.
173173
pub disable_call_cache: bool,
174-
/// The number of contracts to delete from the call cache in one batch
175-
/// when clearing stale entries, set by
176-
/// `GRAPH_STORE_STALE_CALL_CACHE_CONTRACTS_BATCH_SIZE`. The default
177-
/// value is 100 contracts.
178-
pub stale_call_cache_contracts_batch_size: usize,
179174
/// Set by `GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE`. Default is false.
180175
/// Set to true to disable chain_head_ptr caching (safety escape hatch).
181176
pub disable_chain_head_ptr_cache: bool,
@@ -253,7 +248,6 @@ impl TryFrom<InnerStore> for EnvVarsStore {
253248
account_like_min_versions_count: x.account_like_min_versions_count,
254249
account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0),
255250
disable_call_cache: x.disable_call_cache,
256-
stale_call_cache_contracts_batch_size: x.stale_call_cache_contracts_batch_size,
257251
disable_chain_head_ptr_cache: x.disable_chain_head_ptr_cache,
258252
connection_validation_idle_secs: Duration::from_secs(x.connection_validation_idle_secs),
259253
connection_unavailable_retry: Duration::from_secs(
@@ -370,11 +364,6 @@ pub struct InnerStore {
370364
account_like_max_unique_ratio: Option<ZeroToOneF64>,
371365
#[envconfig(from = "GRAPH_STORE_DISABLE_CALL_CACHE", default = "false")]
372366
disable_call_cache: bool,
373-
#[envconfig(
374-
from = "GRAPH_STORE_STALE_CALL_CACHE_CONTRACTS_BATCH_SIZE",
375-
default = "100"
376-
)]
377-
stale_call_cache_contracts_batch_size: usize,
378367
#[envconfig(from = "GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE", default = "false")]
379368
disable_chain_head_ptr_cache: bool,
380369
#[envconfig(from = "GRAPH_STORE_CONNECTION_VALIDATION_IDLE_SECS", default = "30")]

store/postgres/src/chain_store.rs

Lines changed: 83 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use graph::parking_lot::RwLock;
1212
use graph::prelude::alloy::primitives::B256;
1313
use graph::prelude::MetricsRegistry;
1414
use graph::prometheus::{CounterVec, GaugeVec};
15-
use graph::slog::{info, Logger};
15+
use graph::slog::{info, o, Logger};
1616
use graph::stable_hash::crypto_stable_hash;
1717
use graph::util::herd_cache::HerdCache;
1818

@@ -305,11 +305,6 @@ mod data {
305305
fn contract_address(&self) -> DynColumn<Bytea> {
306306
self.table.column::<Bytea, _>("contract_address")
307307
}
308-
309-
fn accessed_at(&self) -> DynColumn<diesel::sql_types::Date> {
310-
self.table
311-
.column::<diesel::sql_types::Date, _>(Self::ACCESSED_AT)
312-
}
313308
}
314309

315310
#[derive(Clone, Debug)]
@@ -1707,117 +1702,72 @@ mod data {
17071702
Ok(())
17081703
}
17091704

1710-
/// Find up to `batch_limit` contract addresses that have not
1711-
/// been accessed in the last `ttl_days` days
1712-
pub(super) async fn stale_contracts(
1705+
/// Delete up to `batch_size` call_cache rows whose contract_address
1706+
/// appears in call_meta with accessed_at older than `ttl_days`.
1707+
/// Returns the number of deleted rows.
1708+
pub(super) async fn delete_stale_calls_batch(
17131709
&self,
17141710
conn: &mut AsyncPgConnection,
1715-
batch_limit: usize,
1716-
ttl_days: usize,
1717-
) -> Result<Vec<Vec<u8>>, StoreError> {
1718-
let ttl_days = ttl_days as i64;
1719-
let batch_limit = batch_limit as i64;
1720-
match self {
1721-
Storage::Shared => {
1722-
use public::eth_call_meta as meta;
1723-
1724-
meta::table
1725-
.select(meta::contract_address)
1726-
.filter(
1727-
meta::accessed_at
1728-
.lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())),
1729-
)
1730-
.limit(batch_limit)
1731-
.get_results::<Vec<u8>>(conn)
1732-
.await
1733-
.map_err(StoreError::from)
1734-
}
1735-
Storage::Private(Schema { call_meta, .. }) => call_meta
1736-
.table()
1737-
.select(call_meta.contract_address())
1738-
.filter(
1739-
call_meta
1740-
.accessed_at()
1741-
.lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())),
1742-
)
1743-
.limit(batch_limit)
1744-
.get_results::<Vec<u8>>(conn)
1745-
.await
1746-
.map_err(StoreError::from),
1747-
}
1748-
}
1749-
1750-
/// Delete up to `batch_size` calls for the given
1751-
/// `stale_contracts`. Returns the number of deleted calls.
1752-
pub(super) async fn delete_calls(
1753-
&self,
1754-
conn: &mut AsyncPgConnection,
1755-
stale_contracts: &[Vec<u8>],
1711+
ttl_days: i64,
17561712
batch_size: i64,
17571713
) -> Result<usize, StoreError> {
1758-
match self {
1759-
Storage::Shared => {
1760-
use public::eth_call_cache as cache;
1761-
1762-
let next_batch = cache::table
1763-
.select(cache::id)
1764-
.filter(cache::contract_address.eq_any(stale_contracts))
1765-
.limit(batch_size)
1766-
.get_results::<Vec<u8>>(conn)
1767-
.await?;
1768-
1769-
diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch)))
1770-
.execute(conn)
1771-
.await
1772-
.map_err(StoreError::from)
1773-
}
1774-
Storage::Private(Schema { call_cache, .. }) => {
1775-
let delete_cache_query = format!(
1776-
"WITH targets AS (
1777-
SELECT id
1778-
FROM {qname}
1779-
WHERE contract_address = ANY($1)
1780-
LIMIT $2
1781-
)
1782-
DELETE FROM {qname} USING targets
1783-
WHERE {qname}.id = targets.id",
1784-
qname = call_cache.qname
1785-
);
1786-
1787-
sql_query(&delete_cache_query)
1788-
.bind::<Array<Bytea>, _>(stale_contracts)
1789-
.bind::<BigInt, _>(batch_size)
1790-
.execute(conn)
1791-
.await
1792-
.map_err(StoreError::from)
1793-
}
1794-
}
1714+
let (cache, meta) = match self {
1715+
Storage::Shared => (
1716+
ETHEREUM_CALL_CACHE_TABLE_NAME,
1717+
ETHEREUM_CALL_META_TABLE_NAME,
1718+
),
1719+
Storage::Private(Schema {
1720+
call_cache,
1721+
call_meta,
1722+
..
1723+
}) => (call_cache.qname.as_str(), call_meta.qname.as_str()),
1724+
};
1725+
let query = format!(
1726+
"/* controller='call_cache_cleanup',days={ttl_days},batch_size={batch_size} */ \
1727+
delete from {cache}
1728+
where ctid in (
1729+
select cc.ctid
1730+
from {cache} cc
1731+
join {meta} cm
1732+
on cc.contract_address = cm.contract_address
1733+
where cm.accessed_at < current_date - $1 * interval '1 day'
1734+
limit $2
1735+
)"
1736+
);
1737+
sql_query(&query)
1738+
.bind::<BigInt, _>(ttl_days)
1739+
.bind::<BigInt, _>(batch_size)
1740+
.execute(conn)
1741+
.await
1742+
.map_err(StoreError::from)
17951743
}
17961744

1797-
pub(super) async fn delete_contracts(
1745+
/// Delete all call_meta entries with accessed_at older than
1746+
/// `ttl_days`. Should be called after all corresponding
1747+
/// call_cache rows have been deleted.
1748+
pub(super) async fn delete_stale_meta(
17981749
&self,
17991750
conn: &mut AsyncPgConnection,
1800-
stale_contracts: &[Vec<u8>],
1751+
ttl_days: i64,
18011752
) -> Result<usize, StoreError> {
18021753
match self {
18031754
Storage::Shared => {
18041755
use public::eth_call_meta as meta;
18051756

1806-
diesel::delete(
1807-
meta::table.filter(meta::contract_address.eq_any(stale_contracts)),
1808-
)
1757+
diesel::delete(meta::table.filter(
1758+
meta::accessed_at.lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())),
1759+
))
18091760
.execute(conn)
18101761
.await
18111762
.map_err(StoreError::from)
18121763
}
18131764
Storage::Private(Schema { call_meta, .. }) => {
1814-
let delete_meta_query = format!(
1815-
"DELETE FROM {} WHERE contract_address = ANY($1)",
1765+
let query = format!(
1766+
"delete from {} where accessed_at < current_date - $1 * interval '1 day'",
18161767
call_meta.qname
18171768
);
1818-
1819-
sql_query(&delete_meta_query)
1820-
.bind::<Array<Bytea>, _>(stale_contracts)
1769+
sql_query(&query)
1770+
.bind::<BigInt, _>(ttl_days)
18211771
.execute(conn)
18221772
.await
18231773
.map_err(StoreError::from)
@@ -3277,57 +3227,61 @@ impl ChainStoreTrait for ChainStore {
32773227
}
32783228

32793229
async fn clear_stale_call_cache(&self, ttl_days: usize) -> Result<(), Error> {
3230+
const LOG_INTERVAL: Duration = Duration::from_mins(5);
3231+
32803232
let conn = &mut self.pool.get_permitted().await?;
32813233

3234+
let logger = self.logger.new(o!("component" => "CallCacheCleanup"));
3235+
32823236
self.storage
32833237
.ensure_contract_address_index(conn, &self.logger)
32843238
.await?;
32853239

3286-
let mut total_calls: usize = 0;
3287-
let mut total_contracts: usize = 0;
3288-
let contracts_batch_size: usize = ENV_VARS.store.stale_call_cache_contracts_batch_size;
3240+
let ttl_days = ttl_days as i64;
3241+
let mut total_deleted: usize = 0;
3242+
let mut batch_count: usize = 0;
32893243
let mut batch_size = AdaptiveBatchSize::with_size(100);
3244+
let mut last_log = Instant::now();
32903245

32913246
loop {
3292-
let stale_contracts = self
3247+
let current_size = batch_size.size;
3248+
let start = Instant::now();
3249+
let deleted = self
32933250
.storage
3294-
.stale_contracts(conn, contracts_batch_size, ttl_days)
3251+
.delete_stale_calls_batch(conn, ttl_days, current_size)
32953252
.await?;
32963253

3297-
if stale_contracts.is_empty() {
3254+
batch_size.adapt(start.elapsed());
3255+
total_deleted += deleted;
3256+
batch_count += 1;
3257+
3258+
if last_log.elapsed() >= LOG_INTERVAL {
32983259
info!(
3299-
self.logger,
3300-
"Finished cleaning call cache: deleted {} entries for {} contracts",
3301-
total_calls,
3302-
total_contracts
3260+
logger,
3261+
"deleted {} entries in this batch \
3262+
({} total in {} batches, batch_size now {})",
3263+
deleted,
3264+
total_deleted,
3265+
batch_count,
3266+
batch_size.size
33033267
);
3304-
break;
3268+
last_log = Instant::now();
33053269
}
33063270

3307-
loop {
3308-
let current_size = batch_size.size;
3309-
let start = Instant::now();
3310-
let deleted_count = self
3311-
.storage
3312-
.delete_calls(conn, &stale_contracts, current_size)
3313-
.await?;
3314-
3315-
batch_size.adapt(start.elapsed());
3316-
3317-
total_calls += deleted_count;
3318-
3319-
if (deleted_count as i64) < current_size {
3320-
break;
3321-
}
3271+
if (deleted as i64) < current_size {
3272+
break;
33223273
}
3274+
}
33233275

3324-
let deleted_contracts = self
3325-
.storage
3326-
.delete_contracts(conn, &stale_contracts)
3327-
.await?;
3276+
let contracts_deleted = self.storage.delete_stale_meta(conn, ttl_days).await?;
33283277

3329-
total_contracts += deleted_contracts;
3330-
}
3278+
info!(
3279+
logger,
3280+
"deleted {} cache entries and {} contract entries in {} batches",
3281+
total_deleted,
3282+
contracts_deleted,
3283+
batch_count
3284+
);
33313285

33323286
Ok(())
33333287
}

0 commit comments

Comments
 (0)