Skip to content

Commit 8619036

Browse files
committed
core, graph: Rename GRAPH_AMP_MAX_BUFFER_SIZE to GRAPH_AMP_BUFFER_SIZE
1 parent 6ed2660 commit 8619036

7 files changed

Lines changed: 21 additions & 21 deletions

File tree

core/src/amp_subgraph/runner/context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub(in super::super) struct Context<AC> {
1818
pub(super) logger: Logger,
1919
pub(super) client: Arc<AC>,
2020
pub(super) store: Arc<dyn WritableStore>,
21-
pub(super) max_buffer_size: usize,
21+
pub(super) buffer_size: usize,
2222
pub(super) max_block_range: usize,
2323
pub(super) backoff: ExponentialBackoff,
2424
pub(super) deployment: DeploymentHash,
@@ -45,7 +45,7 @@ impl<AC> Context<AC> {
4545
logger,
4646
client,
4747
store,
48-
max_buffer_size: env.max_buffer_size,
48+
buffer_size: env.buffer_size,
4949
max_block_range: env.max_block_range,
5050
backoff,
5151
deployment,

core/src/amp_subgraph/runner/data_stream.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ where
3333
let logger = cx.logger.new(slog::o!("process" => "new_data_stream"));
3434
let client = cx.client.cheap_clone();
3535
let manifest = cx.manifest.clone();
36-
let max_buffer_size = cx.max_buffer_size;
36+
let buffer_size = cx.buffer_size;
3737
let max_block_range = cx.max_block_range;
3838
let stopwatch = cx.metrics.stopwatch.cheap_clone();
3939

@@ -74,7 +74,7 @@ where
7474
&logger,
7575
query_streams,
7676
table_ptrs,
77-
max_buffer_size,
77+
buffer_size,
7878
&stopwatch,
7979
start_block,
8080
);
@@ -130,7 +130,7 @@ fn build_data_stream<E>(
130130
logger: &slog::Logger,
131131
query_streams: Vec<(String, BoxStream<'static, Result<ResponseBatch, E>>)>,
132132
table_ptrs: Arc<[TablePtr]>,
133-
max_buffer_size: usize,
133+
buffer_size: usize,
134134
stopwatch: &StopwatchMetrics,
135135
min_start_block: BlockNumber,
136136
) -> BoxStream<'static, Result<(RecordBatchGroups, Arc<[TablePtr]>), Error>>
@@ -141,7 +141,7 @@ where
141141
let mut load_first_record_batch_group_section =
142142
Some(stopwatch.start_section("load_first_record_batch_group"));
143143

144-
StreamAggregator::new(logger, query_streams, max_buffer_size)
144+
StreamAggregator::new(logger, query_streams, buffer_size)
145145
.map_ok(move |response| (response, table_ptrs.cheap_clone()))
146146
.map_err(Error::from)
147147
.map(move |result| {

docs/amp-powered-subgraphs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ Amp-powered subgraphs feature introduces the following new ENV variables:
563563

564564
- `GRAPH_AMP_FLIGHT_SERVICE_ADDRESS` – The address of the Amp Flight gRPC service. _Defaults to `None`, which disables support for Amp-powered subgraphs._
565565
- `GRAPH_AMP_FLIGHT_SERVICE_TOKEN` – Token used to authenticate Amp Flight gRPC service requests. _Defaults to `None`, which disables authentication._
566-
- `GRAPH_AMP_MAX_BUFFER_SIZE` – Maximum number of response batches to buffer in memory per stream for each SQL query. _Defaults to `1,000`._
566+
- `GRAPH_AMP_BUFFER_SIZE` – Maximum number of response batches to buffer in memory per stream for each SQL query. _Defaults to `1,000`._
567567
- `GRAPH_AMP_MAX_BLOCK_RANGE` – Maximum number of blocks to request per stream for each SQL query. _Defaults to `2,000,000`._
568568
- `GRAPH_AMP_QUERY_RETRY_MIN_DELAY_SECONDS` – Minimum time to wait before retrying a failed SQL query to the Amp server. _Defaults to `1` second._
569569
- `GRAPH_AMP_QUERY_RETRY_MAX_DELAY_SECONDS` – Maximum time to wait before retrying a failed SQL query to the Amp server. _Defaults to `600` seconds._

graph/src/amp/stream_aggregator/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl StreamAggregator {
6060
pub fn new<E>(
6161
logger: &Logger,
6262
named_streams: impl IntoIterator<Item = (String, BoxStream<'static, Result<ResponseBatch, E>>)>,
63-
max_buffer_size: usize,
63+
buffer_size: usize,
6464
) -> Self
6565
where
6666
E: std::error::Error + IsDeterministic + Send + Sync + 'static,
@@ -103,7 +103,7 @@ impl StreamAggregator {
103103

104104
Self {
105105
named_streams,
106-
buffer: Buffer::new(num_streams, max_buffer_size),
106+
buffer: Buffer::new(num_streams, buffer_size),
107107
logger,
108108
is_finalized: false,
109109
is_failed: false,

graph/src/amp/stream_aggregator/record_batch/buffer.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,21 @@ use super::{Aggregator, RecordBatchGroup, RecordBatchGroups, StreamRecordBatch};
1111
pub(in super::super) struct Buffer {
1212
aggregators: Vec<Aggregator>,
1313
num_streams: usize,
14-
max_buffer_size: usize,
14+
buffer_size: usize,
1515
}
1616

1717
impl Buffer {
1818
/// Creates a new buffer that can handle exactly `num_streams` number of streams.
1919
///
2020
/// Creates a new associated `Aggregator` for each stream.
21-
/// The `max_buffer_size` specifies how many record batches for each stream can be buffered at most.
22-
pub(in super::super) fn new(num_streams: usize, max_buffer_size: usize) -> Self {
21+
/// The `buffer_size` specifies how many record batches for each stream can be buffered at most.
22+
pub(in super::super) fn new(num_streams: usize, buffer_size: usize) -> Self {
2323
let aggregators = (0..num_streams).map(|_| Aggregator::new()).collect();
2424

2525
Self {
2626
aggregators,
2727
num_streams,
28-
max_buffer_size,
28+
buffer_size,
2929
}
3030
}
3131

@@ -130,7 +130,7 @@ impl Buffer {
130130
/// Panics if the `stream_index` is greater than the initialized number of streams.
131131
pub(in super::super) fn has_capacity(&self, stream_index: usize) -> bool {
132132
assert!(stream_index < self.num_streams);
133-
self.aggregators[stream_index].len() < self.max_buffer_size
133+
self.aggregators[stream_index].len() < self.buffer_size
134134
}
135135

136136
/// Returns `true` if the stream `stream_index` is not allowed to make progress and

graph/src/env/amp.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub struct AmpEnv {
77
/// This is the maximum number of record batches that can be output by a single block.
88
///
99
/// Defaults to `1,000`.
10-
pub max_buffer_size: usize,
10+
pub buffer_size: usize,
1111

1212
/// Maximum number of blocks to request per stream for each SQL query.
1313
/// Limiting this value reduces load on the Amp server when processing heavy queries.
@@ -32,22 +32,22 @@ pub struct AmpEnv {
3232
}
3333

3434
impl AmpEnv {
35-
const DEFAULT_MAX_BUFFER_SIZE: usize = 1_000;
35+
const DEFAULT_BUFFER_SIZE: usize = 1_000;
3636
const DEFAULT_MAX_BLOCK_RANGE: usize = 2_000_000;
3737
const DEFAULT_QUERY_RETRY_MIN_DELAY: Duration = Duration::from_secs(1);
3838
const DEFAULT_QUERY_RETRY_MAX_DELAY: Duration = Duration::from_secs(600);
3939

4040
pub(super) fn new(raw_env: &super::Inner) -> Self {
4141
Self {
42-
max_buffer_size: raw_env
43-
.amp_max_buffer_size
42+
buffer_size: raw_env
43+
.amp_buffer_size
4444
.and_then(|value| {
4545
if value == 0 {
4646
return None;
4747
}
4848
Some(value)
4949
})
50-
.unwrap_or(Self::DEFAULT_MAX_BUFFER_SIZE),
50+
.unwrap_or(Self::DEFAULT_BUFFER_SIZE),
5151
max_block_range: raw_env
5252
.amp_max_block_range
5353
.map(|mut value| {

graph/src/env/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -608,8 +608,8 @@ struct Inner {
608608
)]
609609
disable_deployment_hash_validation: EnvVarBoolean,
610610

611-
#[envconfig(from = "GRAPH_AMP_MAX_BUFFER_SIZE")]
612-
amp_max_buffer_size: Option<usize>,
611+
#[envconfig(from = "GRAPH_AMP_BUFFER_SIZE")]
612+
amp_buffer_size: Option<usize>,
613613
#[envconfig(from = "GRAPH_AMP_MAX_BLOCK_RANGE")]
614614
amp_max_block_range: Option<usize>,
615615
#[envconfig(from = "GRAPH_AMP_QUERY_RETRY_MIN_DELAY_SECONDS")]

0 commit comments

Comments
 (0)