-
Notifications
You must be signed in to change notification settings - Fork 2.1k
fix(topology): Fix for issue causing stalling on shutdown for sinks configured w/ disk buffers #24949
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
fix(topology): Fix for issue causing stalling on shutdown for sinks configured w/ disk buffers #24949
Changes from all commits
a004f6a
b059d9c
8eb4778
d52b8ac
9b96386
9f6ea32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| Fixed issue during shutdown or restart of a sink with disk buffer configured where | ||
| component would stall for batch.timeout_sec before fully gracefully shutting down. | ||
|
|
||
| authors: graphcareful | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -588,10 +588,24 @@ impl RunningTopology { | |
| .collect::<HashSet<_>>(); | ||
|
|
||
| // For any existing sink that has a conflicting resource dependency with a changed/added | ||
| // sink, or for any sink that we want to reuse their buffer, we need to explicit wait for | ||
| // them to finish processing so we can reclaim ownership of those resources/buffers. | ||
| // sink, for any sink that we want to reuse their buffer, or for any sink with a disk | ||
| // buffer being rebuilt, we need to explicitly wait for them to finish processing so we | ||
| // can reclaim ownership of those resources/buffers. | ||
| // | ||
| // Changed sinks with disk buffers that are NOT being reused must also be | ||
| // waited on so we can ensure the disk buffer file lock is released before | ||
| // the replacement buffer is built. | ||
| let changed_disk_buffer_sinks = diff.sinks.to_change.iter().filter(|key| { | ||
| !reuse_buffers.contains(*key) | ||
| && self | ||
| .config | ||
| .sink(key) | ||
| .is_some_and(|s| s.buffer.has_disk_stage()) | ||
| }); | ||
|
|
||
| let wait_for_sinks = conflicting_sinks | ||
| .chain(reuse_buffers.iter().cloned()) | ||
| .chain(changed_disk_buffer_sinks.cloned()) | ||
| .collect::<HashSet<_>>(); | ||
|
|
||
| // First, we remove any inputs to removed sinks so they can naturally shut down. | ||
|
|
@@ -631,15 +645,19 @@ impl RunningTopology { | |
| })) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| let mut buffer_release_barriers: HashMap<ComponentKey, Vec<_>> = HashMap::new(); | ||
| for key in &sinks_to_change { | ||
| debug!(component_id = %key, "Changing sink."); | ||
| if reuse_buffers.contains(key) { | ||
| self.detach_triggers | ||
| .remove(key) | ||
| .unwrap() | ||
| .into_inner() | ||
| .cancel(); | ||
| // Cancel the detach trigger for all changed sinks so the sink's input | ||
| // stream terminates and the sink task can complete. Without this, the | ||
| // old sink task never exits and the reload stalls at `previous.await`. | ||
| self.detach_triggers | ||
| .remove(key) | ||
| .unwrap() | ||
| .into_inner() | ||
| .cancel(); | ||
|
Comment on lines
+654
to
+658
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This unconditional Useful? React with 👍 / 👎. |
||
|
|
||
| if reuse_buffers.contains(key) { | ||
| // We explicitly clone the input side of the buffer and store it so we don't lose | ||
| // it when we remove the inputs below. | ||
| // | ||
|
|
@@ -651,6 +669,18 @@ impl RunningTopology { | |
| // info about which sinks are having their buffers reused and treat them differently | ||
| // at other stages. | ||
| buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone()); | ||
| } else { | ||
| // For changed sinks with disk buffers that are NOT being reused, take | ||
| // the writer-drop notification receivers before we remove the input. | ||
| // We await these after the old sink tasks complete to guarantee all | ||
| // disk buffer locks are released before we build the replacement. | ||
| let rxs = match self.inputs.get(key) { | ||
| Some(s) => s.take_buffer_release_barriers().await, | ||
| None => Vec::new(), | ||
graphcareful marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| }; | ||
| if !rxs.is_empty() { | ||
| buffer_release_barriers.insert((*key).clone(), rxs); | ||
| } | ||
| } | ||
| self.remove_inputs(key, diff, new_config).await; | ||
| } | ||
|
|
@@ -694,6 +724,14 @@ impl RunningTopology { | |
|
|
||
| buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx))))); | ||
| } | ||
|
|
||
| // Wait for all old disk buffer writers to be fully released (by | ||
| // the fanout) so file locks are dropped before we build a replacement. | ||
| if let Some(rxs) = buffer_release_barriers.remove(key) { | ||
| for rx in rxs { | ||
| let _ = rx.await; | ||
| } | ||
|
Comment on lines
+730
to
+733
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Waiting on the barrier here can hang reload indefinitely when a changed sink is paused (not removed) and upstream stops emitting. Useful? React with 👍 / 👎. |
||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update this, fix no longer involves shutdown path