Skip to content

Commit d121b39

Browse files
committed
core: thread EntityLfuCache instead of EntityCache across blocks
The AMP runner previously created an EntityCache with a fake SeqGenerator(0) and shared it across block boundaries, even though an EntityCache should always be tied to a single block. Now only the LfuCache is threaded through, and EntityCache is created locally in process_record_batch_group with the correct block's SeqGenerator.
1 parent 69ddb1b commit d121b39

File tree

2 files changed

+22
-28
lines changed

2 files changed

+22
-28
lines changed

core/src/amp_subgraph/runner/data_processing.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,22 @@ use graph::{
1111
},
1212
blockchain::block_stream::FirehoseCursor,
1313
cheap_clone::CheapClone,
14-
components::store::{EntityCache, ModificationsAndCache, SeqGenerator},
14+
components::store::{EntityCache, EntityLfuCache, ModificationsAndCache, SeqGenerator},
1515
};
1616
use slog::{debug, trace};
1717

1818
use super::{data_stream::TablePtr, Compat, Context, Error};
1919

2020
pub(super) async fn process_record_batch_groups<AC>(
2121
cx: &mut Context<AC>,
22-
mut entity_cache: EntityCache,
22+
mut entity_lfu_cache: EntityLfuCache,
2323
record_batch_groups: RecordBatchGroups,
2424
stream_table_ptr: Arc<[TablePtr]>,
2525
latest_block: BlockNumber,
26-
) -> Result<EntityCache, Error> {
26+
) -> Result<EntityLfuCache, Error> {
2727
if record_batch_groups.is_empty() {
2828
debug!(cx.logger, "Received no record batch groups");
29-
return Ok(entity_cache);
29+
return Ok(entity_lfu_cache);
3030
}
3131

3232
let from_block = record_batch_groups
@@ -50,9 +50,9 @@ pub(super) async fn process_record_batch_groups<AC>(
5050
"record_batches_count" => record_batch_group.record_batches.len()
5151
);
5252

53-
entity_cache = process_record_batch_group(
53+
entity_lfu_cache = process_record_batch_group(
5454
cx,
55-
entity_cache,
55+
entity_lfu_cache,
5656
block_number,
5757
block_hash,
5858
record_batch_group,
@@ -79,32 +79,36 @@ pub(super) async fn process_record_batch_groups<AC>(
7979
"to_block" => to_block
8080
);
8181

82-
Ok(entity_cache)
82+
Ok(entity_lfu_cache)
8383
}
8484

8585
async fn process_record_batch_group<AC>(
8686
cx: &mut Context<AC>,
87-
mut entity_cache: EntityCache,
87+
entity_lfu_cache: EntityLfuCache,
8888
block_number: BlockNumber,
8989
block_hash: BlockHash,
9090
record_batch_group: RecordBatchGroup,
9191
stream_table_ptr: &[TablePtr],
9292
latest_block: BlockNumber,
93-
) -> Result<EntityCache, Error> {
93+
) -> Result<EntityLfuCache, Error> {
9494
let _section = cx
9595
.metrics
9696
.stopwatch
9797
.start_section("process_record_batch_group");
9898

99-
entity_cache.seq_gen = SeqGenerator::new(block_number.compat());
100-
10199
let RecordBatchGroup { record_batches } = record_batch_group;
102100

103101
if record_batches.is_empty() {
104102
debug!(cx.logger, "Record batch group is empty");
105-
return Ok(entity_cache);
103+
return Ok(entity_lfu_cache);
106104
}
107105

106+
let mut entity_cache = EntityCache::with_current(
107+
cx.store.cheap_clone(),
108+
entity_lfu_cache,
109+
SeqGenerator::new(block_number.compat()),
110+
);
111+
108112
let block_timestamp = if cx.manifest.schema.has_aggregations() {
109113
decode_block_timestamp(&record_batches)
110114
.map_err(|e| e.context("failed to decode block timestamp"))?
@@ -135,7 +139,6 @@ async fn process_record_batch_group<AC>(
135139
}
136140

137141
let section = cx.metrics.stopwatch.start_section("as_modifications");
138-
let vid_gen = entity_cache.vid_gen();
139142
let ModificationsAndCache {
140143
modifications,
141144
entity_lfu_cache,
@@ -171,11 +174,7 @@ async fn process_record_batch_group<AC>(
171174
cx.metrics.deployment_synced.record(true);
172175
}
173176

174-
Ok(EntityCache::with_current(
175-
cx.store.cheap_clone(),
176-
entity_lfu_cache,
177-
vid_gen,
178-
))
177+
Ok(entity_lfu_cache)
179178
}
180179

181180
async fn process_record_batch<AC>(

core/src/amp_subgraph/runner/mod.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@ use std::time::{Duration, Instant};
1111
use anyhow::Result;
1212
use futures::StreamExt;
1313
use graph::{
14-
amp::Client,
15-
cheap_clone::CheapClone,
16-
components::store::{EntityCache, SeqGenerator},
17-
data::subgraph::schema::SubgraphError,
14+
amp::Client, cheap_clone::CheapClone, components::store::EntityLfuCache,
15+
data::subgraph::schema::SubgraphError, util::lfu_cache::LfuCache,
1816
};
1917
use slog::{debug, error, warn};
2018
use tokio_util::sync::CancellationToken;
@@ -106,18 +104,15 @@ where
106104
.update(latest_block.min(cx.end_block()));
107105

108106
let mut deployment_is_failed = cx.store.health().await?.is_failed();
109-
// The SeqGenerator gets replaced with one for the correct block
110-
// number in `process_record_batch_groups`, so the initial value
111-
// doesn't matter much.
112-
let mut entity_cache = EntityCache::new(cx.store.cheap_clone(), SeqGenerator::new(0));
107+
let mut entity_lfu_cache: EntityLfuCache = LfuCache::new();
113108
let mut stream = new_data_stream(cx, latest_block);
114109

115110
while let Some(result) = stream.next().await {
116111
let (record_batch_groups, stream_table_ptr) = result?;
117112

118-
entity_cache = process_record_batch_groups(
113+
entity_lfu_cache = process_record_batch_groups(
119114
cx,
120-
entity_cache,
115+
entity_lfu_cache,
121116
record_batch_groups,
122117
stream_table_ptr,
123118
latest_block,

0 commit comments

Comments
 (0)