fix(topology): Fix for issue causing stalling on shutdown for sinks configured w/ disk buffers#24949
Conversation
|
@codex review |
This comment was marked as outdated.
This comment was marked as outdated.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: eb5630a0fd
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
src/topology/running.rs
Outdated
| for (_, trigger) in self.detach_triggers { | ||
| trigger.into_inner().cancel(); |
There was a problem hiding this comment.
Preserve graceful draining during global shutdown
Canceling every sink detach_trigger at the top of RunningTopology::stop forces all sink input streams (take_until_if(tripwire)) to end before shutdown_coordinator.shutdown_all(...) starts upstream shutdown, so sinks can terminate while events are still buffered or in flight. That changes graceful shutdown semantics into a lossy path for any pipeline with pending events, because those events are never drained by the sink.
Useful? React with 👍 / 👎.
…e to missed writer notification
eb5630a to
28a64b0
Compare
| let notified = ledger.writer_notified(); | ||
| let mut wait_for_writer = spawn(notified); |
There was a problem hiding this comment.
This is effectively the following, which is closer to the original code, right?
| let notified = ledger.writer_notified(); | |
| let mut wait_for_writer = spawn(notified); | |
| let notified = spawn(ledger.writer_notified()); |
| pub fn writer_notified(&self) -> Notified<'_> { | ||
| self.writer_notify.notified() |
There was a problem hiding this comment.
I don't see how this functionally changes anything, other than changing the return type from an effective impl Future into a concrete type. Am I missing something?
There was a problem hiding this comment.
This allows the caller to register to be notified before checking some pre-condition to await for, avoiding any races between the await and the condition. This is more of a defensive change, the core of the bugfix is checking is_writer_done in reader.rs
There was a problem hiding this comment.
What I'm saying is that async fn wait_for_writer(&self) desugars to fn wait_for_writer(&self) -> impl Future<Output = ()> and contains a state machine that just waits on writer_notify.notified when it's polled (which the optimizer will simplify to just calling the original future), and the new code returns the inner future directly, which will behave the same way. I guess what I'm keying on is that the comment change seems to indicate that the behavior of this function changed, but it hasn't.
Summary
This PR includes two fixes when sinks configured with buffers are shutdown. First issue is observed when reloading a config that contains a sink with a disk buffer. Vector will wait until
batch.timeout_secshas completed which causes buffers to flush and unblocks the reload process. The fix for this is to send thecancel()signal to the sink, so it doesn't block on its buffer not being flushed downstream.The second fix is for an issue with the same root cause. I noticed the same hang on issue of control-c with the
aws_s3sink. I employed the same solution there in thestop()method - to call thecancel()signal early in thestop()method.Vector configuration
How did you test this PR?
By using the config above and a simple HTTP traffic generator. Modifying the sink buffer and re-saving the file, looking for errors from the source.
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References
DatadogandPrometheusand S3 sink hangs forever #17666Notes
@vectordotdev/vectorto reach out to us regarding this PR.pre-pushhook, please see this template.make fmtmake check-clippy(if there are failures it's possible some of them can be fixed withmake clippy-fix)make testgit merge origin masterandgit push.Cargo.lock), pleaserun
make build-licensesto regenerate the license inventory and commit the changes (if any). More details here.