Skip to content

Commit 69ddb1b

Browse files
committed
graph, runtime, core: introduce VidGenerator for shared VID sequences
Replace the per-EntityCache vid_seq and seq fields with a shared VidGenerator backed by Arc<AtomicU32>. Created once per block and cheaply cloned across all EntityCache instances, this eliminates VID collisions in ipfs.map() callbacks and offchain triggers without manual sequence threading.
1 parent 4db386b commit 69ddb1b

File tree

15 files changed

+180
-170
lines changed

15 files changed

+180
-170
lines changed

core/src/amp_subgraph/runner/data_processing.rs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use graph::{
1111
},
1212
blockchain::block_stream::FirehoseCursor,
1313
cheap_clone::CheapClone,
14-
components::store::{EntityCache, ModificationsAndCache},
14+
components::store::{EntityCache, ModificationsAndCache, SeqGenerator},
1515
};
1616
use slog::{debug, trace};
1717

@@ -96,6 +96,8 @@ async fn process_record_batch_group<AC>(
9696
.stopwatch
9797
.start_section("process_record_batch_group");
9898

99+
entity_cache.seq_gen = SeqGenerator::new(block_number.compat());
100+
99101
let RecordBatchGroup { record_batches } = record_batch_group;
100102

101103
if record_batches.is_empty() {
@@ -121,7 +123,6 @@ async fn process_record_batch_group<AC>(
121123
process_record_batch(
122124
cx,
123125
&mut entity_cache,
124-
block_number,
125126
record_batch,
126127
stream_table_ptr[stream_index],
127128
)
@@ -134,6 +135,7 @@ async fn process_record_batch_group<AC>(
134135
}
135136

136137
let section = cx.metrics.stopwatch.start_section("as_modifications");
138+
let vid_gen = entity_cache.vid_gen();
137139
let ModificationsAndCache {
138140
modifications,
139141
entity_lfu_cache,
@@ -172,13 +174,13 @@ async fn process_record_batch_group<AC>(
172174
Ok(EntityCache::with_current(
173175
cx.store.cheap_clone(),
174176
entity_lfu_cache,
177+
vid_gen,
175178
))
176179
}
177180

178181
async fn process_record_batch<AC>(
179182
cx: &mut Context<AC>,
180183
entity_cache: &mut EntityCache,
181-
block_number: BlockNumber,
182184
record_batch: RecordBatch,
183185
(i, j): TablePtr,
184186
) -> Result<(), Error> {
@@ -209,13 +211,11 @@ async fn process_record_batch<AC>(
209211
let key = match key {
210212
Some(key) => key,
211213
None => {
212-
let entity_id = entity_cache
213-
.generate_id(id_type, block_number.compat())
214-
.map_err(|e| {
215-
Error::Deterministic(e.context(format!(
216-
"failed to generate a new id for an entity of type '{entity_name}'"
217-
)))
218-
})?;
214+
let entity_id = entity_cache.seq_gen.id(id_type).map_err(|e| {
215+
Error::Deterministic(e.context(format!(
216+
"failed to generate a new id for an entity of type '{entity_name}'"
217+
)))
218+
})?;
219219

220220
entity_data.push(("id".into(), entity_id.clone().into()));
221221
entity_type.key(entity_id)
@@ -229,14 +229,11 @@ async fn process_record_batch<AC>(
229229
)))
230230
})?;
231231

232-
entity_cache
233-
.set(key, entity, block_number.compat(), None)
234-
.await
235-
.map_err(|e| {
236-
Error::Deterministic(e.context(format!(
237-
"failed to store a new entity of type '{entity_name}' with id '{entity_id}'"
238-
)))
239-
})?;
232+
entity_cache.set(key, entity, None).await.map_err(|e| {
233+
Error::Deterministic(e.context(format!(
234+
"failed to store a new entity of type '{entity_name}' with id '{entity_id}'"
235+
)))
236+
})?;
240237
}
241238

242239
Ok(())

core/src/amp_subgraph/runner/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use std::time::{Duration, Instant};
1111
use anyhow::Result;
1212
use futures::StreamExt;
1313
use graph::{
14-
amp::Client, cheap_clone::CheapClone, components::store::EntityCache,
14+
amp::Client,
15+
cheap_clone::CheapClone,
16+
components::store::{EntityCache, SeqGenerator},
1517
data::subgraph::schema::SubgraphError,
1618
};
1719
use slog::{debug, error, warn};
@@ -104,7 +106,10 @@ where
104106
.update(latest_block.min(cx.end_block()));
105107

106108
let mut deployment_is_failed = cx.store.health().await?.is_failed();
107-
let mut entity_cache = EntityCache::new(cx.store.cheap_clone());
109+
// The SeqGenerator gets replaced with one for the correct block
110+
// number in `process_record_batch_groups`, so the initial value
111+
// doesn't matter much.
112+
let mut entity_cache = EntityCache::new(cx.store.cheap_clone(), SeqGenerator::new(0));
108113
let mut stream = new_data_stream(cx, latest_block);
109114

110115
while let Some(result) = stream.next().await {

core/src/subgraph/runner/mod.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use graph::blockchain::{
1717
Block, BlockTime, Blockchain, DataSource as _, SubgraphFilter, Trigger, TriggerFilter as _,
1818
TriggerFilterWrapper,
1919
};
20-
use graph::components::store::{EmptyStore, GetScope, ReadStore, StoredDynamicDataSource};
20+
use graph::components::store::{
21+
EmptyStore, GetScope, ReadStore, SeqGenerator, StoredDynamicDataSource,
22+
};
2123
use graph::components::subgraph::InstanceDSTemplate;
2224
use graph::components::trigger_processor::RunnableTriggers;
2325
use graph::components::{
@@ -1019,9 +1021,9 @@ where
10191021
.ready_offchain_events()
10201022
.non_deterministic()?;
10211023

1022-
let onchain_vid_seq = block_state.entity_cache.vid_seq;
1024+
let vid_gen = block_state.seq_gen();
10231025
let (offchain_mods, processed_offchain_data_sources, persisted_off_chain_data_sources) =
1024-
self.handle_offchain_triggers(offchain_events, block, onchain_vid_seq)
1026+
self.handle_offchain_triggers(offchain_events, block, vid_gen)
10251027
.await
10261028
.non_deterministic()?;
10271029

@@ -1066,9 +1068,11 @@ where
10661068
// Causality region for onchain triggers.
10671069
let causality_region = PoICausalityRegion::from_network(&self.inputs.network);
10681070

1071+
let vid_gen = SeqGenerator::new(block_ptr.number);
10691072
let mut block_state = BlockState::new(
10701073
self.inputs.store.clone(),
10711074
std::mem::take(&mut self.state.entity_lfu_cache),
1075+
vid_gen,
10721076
);
10731077

10741078
let _section = self
@@ -1471,7 +1475,7 @@ where
14711475
&mut self,
14721476
triggers: Vec<offchain::TriggerData>,
14731477
block: &Arc<C::Block>,
1474-
mut next_vid_seq: u32,
1478+
vid_gen: SeqGenerator,
14751479
) -> Result<
14761480
(
14771481
Vec<EntityModification>,
@@ -1488,12 +1492,11 @@ where
14881492
// Using an `EmptyStore` and clearing the cache for each trigger is a makeshift way to
14891493
// get causality region isolation.
14901494
let schema = ReadStore::input_schema(&self.inputs.store);
1491-
let mut block_state = BlockState::new(EmptyStore::new(schema), LfuCache::new());
1492-
1493-
// Continue the vid sequence from the previous trigger (or from
1494-
// onchain processing) so that each offchain trigger does not
1495-
// reset to RESERVED_VIDS and produce duplicate VIDs.
1496-
block_state.entity_cache.vid_seq = next_vid_seq;
1495+
let mut block_state = BlockState::new(
1496+
EmptyStore::new(schema),
1497+
LfuCache::new(),
1498+
vid_gen.cheap_clone(),
1499+
);
14971500

14981501
// PoI ignores offchain events.
14991502
// See also: poi-ignores-offchain
@@ -1560,10 +1563,6 @@ where
15601563
return Err(anyhow!("{}", err));
15611564
}
15621565

1563-
// Carry forward the vid sequence so the next iteration doesn't
1564-
// reset to RESERVED_VIDS and produce duplicate VIDs.
1565-
next_vid_seq = block_state.entity_cache.vid_seq;
1566-
15671566
mods.extend(
15681567
block_state
15691568
.entity_cache
@@ -1685,7 +1684,6 @@ async fn update_proof_of_indexing(
16851684
key: EntityKey,
16861685
digest: Bytes,
16871686
block_time: BlockTime,
1688-
block: BlockNumber,
16891687
) -> Result<(), Error> {
16901688
let digest_name = entity_cache.schema.poi_digest();
16911689
let mut data = vec![
@@ -1700,12 +1698,11 @@ async fn update_proof_of_indexing(
17001698
data.push((entity_cache.schema.poi_block_time(), block_time));
17011699
}
17021700
let poi = entity_cache.make_entity(data)?;
1703-
entity_cache.set(key, poi, block, None).await
1701+
entity_cache.set(key, poi, None).await
17041702
}
17051703

17061704
let _section_guard = stopwatch.start_section("update_proof_of_indexing");
17071705

1708-
let block_number = proof_of_indexing.get_block();
17091706
let mut proof_of_indexing = proof_of_indexing.take();
17101707

17111708
for (causality_region, stream) in proof_of_indexing.drain() {
@@ -1742,7 +1739,6 @@ async fn update_proof_of_indexing(
17421739
entity_key,
17431740
updated_proof_of_indexing,
17441741
block_time,
1745-
block_number,
17461742
)
17471743
.await?;
17481744
}

graph/src/components/store/entity_cache.rs

Lines changed: 62 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use anyhow::{anyhow, bail};
22
use std::borrow::Borrow;
33
use std::collections::HashMap;
44
use std::fmt::{self, Debug};
5+
use std::sync::atomic::{AtomicU32, Ordering};
56
use std::sync::Arc;
67

78
use crate::cheap_clone::CheapClone;
@@ -21,6 +22,52 @@ pub type EntityLfuCache = LfuCache<EntityKey, Option<Arc<Entity>>>;
2122
// Currently none is used, but lets reserve a few more.
2223
const RESERVED_VIDS: u32 = 100;
2324

25+
/// Shared generator for VID and entity ID sequences within a block.
26+
///
27+
/// Created once per block and shared (via `Arc`) across all `EntityCache`
28+
/// instances that operate on the same block. This prevents VID collisions
29+
/// when multiple isolated caches (e.g. ipfs.map callbacks, offchain
30+
/// triggers) write entities in the same block.
31+
#[derive(Clone, Debug)]
32+
pub struct SeqGenerator {
33+
block: BlockNumber,
34+
vid_seq: Arc<AtomicU32>,
35+
id_seq: Arc<AtomicU32>,
36+
}
37+
38+
impl CheapClone for SeqGenerator {
39+
fn cheap_clone(&self) -> Self {
40+
self.clone()
41+
}
42+
}
43+
44+
impl SeqGenerator {
45+
pub fn new(block: BlockNumber) -> Self {
46+
SeqGenerator {
47+
block,
48+
vid_seq: Arc::new(AtomicU32::new(RESERVED_VIDS)),
49+
id_seq: Arc::new(AtomicU32::new(0)),
50+
}
51+
}
52+
53+
/// Return the next VID. The VID encodes the block number in the upper
54+
/// 32 bits and a monotonically increasing sequence in the lower 32
55+
/// bits.
56+
pub fn vid(&self) -> i64 {
57+
let seq = self.vid_seq.fetch_add(1, Ordering::Relaxed);
58+
((self.block as i64) << 32) + seq as i64
59+
}
60+
61+
/// Generate the next entity ID for the given ID type. The ID encodes
62+
/// the block number in the upper 32 bits and a monotonically
63+
/// increasing sequence in the lower 32 bits.
64+
pub fn id(&self, id_type: IdType) -> anyhow::Result<Id> {
65+
let seq = self.id_seq.fetch_add(1, Ordering::Relaxed);
66+
67+
id_type.generate_id(self.block, seq)
68+
}
69+
}
70+
2471
/// The scope in which the `EntityCache` should perform a `get` operation
2572
pub enum GetScope {
2673
/// Get from all previously stored entities in the store
@@ -103,16 +150,8 @@ pub struct EntityCache {
103150

104151
pub schema: InputSchema,
105152

106-
/// A sequence number for generating entity IDs. We use one number for
107-
/// all id's as the id's are scoped by block and a u32 has plenty of
108-
/// room for all changes in one block. To ensure reproducability of
109-
/// generated IDs, the `EntityCache` needs to be newly instantiated for
110-
/// each block
111-
seq: u32,
112-
113-
// Sequence number of the next VID value for this block. The value written
114-
// in the database consist of a block number and this SEQ number.
115-
pub vid_seq: u32,
153+
/// Shared sequence generator for VIDs and entity IDs within a block.
154+
pub seq_gen: SeqGenerator,
116155
}
117156

118157
impl Debug for EntityCache {
@@ -131,16 +170,15 @@ pub struct ModificationsAndCache {
131170
}
132171

133172
impl EntityCache {
134-
pub fn new(store: Arc<dyn s::ReadStore>) -> Self {
173+
pub fn new(store: Arc<dyn s::ReadStore>, seq_gen: SeqGenerator) -> Self {
135174
Self {
136175
current: LfuCache::new(),
137176
updates: HashMap::new(),
138177
handler_updates: HashMap::new(),
139178
in_handler: false,
140179
schema: store.input_schema(),
141180
store,
142-
seq: 0,
143-
vid_seq: RESERVED_VIDS,
181+
seq_gen,
144182
}
145183
}
146184

@@ -152,19 +190,26 @@ impl EntityCache {
152190
self.schema.make_entity(iter)
153191
}
154192

155-
pub fn with_current(store: Arc<dyn s::ReadStore>, current: EntityLfuCache) -> EntityCache {
193+
pub fn with_current(
194+
store: Arc<dyn s::ReadStore>,
195+
current: EntityLfuCache,
196+
seq_gen: SeqGenerator,
197+
) -> EntityCache {
156198
EntityCache {
157199
current,
158200
updates: HashMap::new(),
159201
handler_updates: HashMap::new(),
160202
in_handler: false,
161203
schema: store.input_schema(),
162204
store,
163-
seq: 0,
164-
vid_seq: RESERVED_VIDS,
205+
seq_gen,
165206
}
166207
}
167208

209+
pub fn seq_gen(&self) -> SeqGenerator {
210+
self.seq_gen.cheap_clone()
211+
}
212+
168213
pub(crate) fn enter_handler(&mut self) {
169214
assert!(!self.in_handler);
170215
self.in_handler = true;
@@ -368,7 +413,6 @@ impl EntityCache {
368413
&mut self,
369414
key: EntityKey,
370415
entity: Entity,
371-
block: BlockNumber,
372416
write_capacity_remaining: Option<&mut usize>,
373417
) -> Result<(), anyhow::Error> {
374418
// check the validate for derived fields
@@ -386,9 +430,7 @@ impl EntityCache {
386430
*write_capacity_remaining -= weight;
387431
}
388432

389-
// The next VID is based on a block number and a sequence within the block
390-
let vid = ((block as i64) << 32) + self.vid_seq as i64;
391-
self.vid_seq += 1;
433+
let vid = self.seq_gen.vid();
392434
let mut entity = entity;
393435
let old_vid = entity.set_vid(vid).expect("the vid should be set");
394436
// Make sure that there was no VID previously set for this entity.
@@ -457,16 +499,6 @@ impl EntityCache {
457499
for (key, op) in other.updates {
458500
self.entity_op(key, op);
459501
}
460-
// Carry forward vid_seq to prevent VID collisions when the caller
461-
// continues writing entities after merging.
462-
self.vid_seq = self.vid_seq.max(other.vid_seq);
463-
}
464-
465-
/// Generate an id.
466-
pub fn generate_id(&mut self, id_type: IdType, block: BlockNumber) -> anyhow::Result<Id> {
467-
let id = id_type.generate_id(block, self.seq)?;
468-
self.seq += 1;
469-
Ok(id)
470502
}
471503

472504
/// Return the changes that have been made via `set` and `remove` as

graph/src/components/store/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ use diesel::pg::Pg;
99
use diesel::serialize::{Output, ToSql};
1010
use diesel::sql_types::Integer;
1111
use diesel_derives::{AsExpression, FromSqlRow};
12-
pub use entity_cache::{EntityCache, EntityLfuCache, GetScope, ModificationsAndCache};
12+
pub use entity_cache::{
13+
EntityCache, EntityLfuCache, GetScope, ModificationsAndCache, SeqGenerator,
14+
};
1315
use slog::Logger;
1416
use tokio_stream::wrappers::ReceiverStream;
1517

0 commit comments

Comments
 (0)