Skip to content

Commit 6ed2660

Browse files
committed
graph: Fix lost waker bug in StreamAggregator polling
When a stream returns Poll::Ready but the aggregator can't produce output yet (empty batch or data buffered without completed groups), the waker is consumed and never re-registered. This causes the aggregator to stop being polled, deadlocking the subgraph. Fix by calling cx.waker().wake_by_ref() before returning Pending whenever any stream returned Ready during the polling cycle.
1 parent ccc1835 commit 6ed2660

1 file changed

Lines changed: 13 additions & 0 deletions

File tree

  • graph/src/amp/stream_aggregator

graph/src/amp/stream_aggregator/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ impl StreamAggregator {
115115
cx: &mut task::Context<'_>,
116116
) -> Poll<Option<Result<RecordBatchGroups, Error>>> {
117117
let mut made_progress = false;
118+
let mut needs_repoll = false;
118119

119120
for (stream_index, (stream_name, stream)) in self.named_streams.iter_mut().enumerate() {
120121
let logger = self.logger.new(slog::o!(
@@ -152,6 +153,7 @@ impl StreamAggregator {
152153
match buffer_result {
153154
Ok(()) => {
154155
made_progress = true;
156+
needs_repoll = true;
155157

156158
debug!(logger, "Buffered record batch";
157159
"buffer_size" => self.buffer.size(stream_index),
@@ -167,6 +169,7 @@ impl StreamAggregator {
167169
}
168170
Poll::Ready(Some(Ok(_empty_record_batch))) => {
169171
debug!(logger, "Received an empty record batch");
172+
needs_repoll = true;
170173
}
171174
Poll::Ready(Some(Err(e))) => {
172175
self.is_failed = true;
@@ -209,6 +212,16 @@ impl StreamAggregator {
209212
return Poll::Ready(None);
210213
}
211214

215+
// When any stream returned `Poll::Ready` but we couldn't produce
216+
// output (e.g. empty batch, or data buffered but no completed
217+
// groups yet), the waker was consumed by that stream's poll call
218+
// and won't be re-registered until we poll it again. Schedule an
219+
// immediate re-poll so those streams get polled again and their
220+
// wakers are properly re-registered.
221+
if needs_repoll {
222+
cx.waker().wake_by_ref();
223+
}
224+
212225
Poll::Pending
213226
}
214227
}

0 commit comments

Comments
 (0)