Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/24929_fix_stall_on_disk_shutdown.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fixed issue during shutdown or restart of a sink with disk buffer configured where
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update this, fix no longer involves shutdown path

component would stall for batch.timeout_sec before fully gracefully shutting down.

authors: graphcareful
7 changes: 7 additions & 0 deletions lib/vector-buffers/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,13 @@ impl Default for BufferConfig {
}

impl BufferConfig {
/// Returns true if any stage in this buffer configuration uses disk-based storage.
pub fn has_disk_stage(&self) -> bool {
self.stages()
.iter()
.any(|stage| matches!(stage, BufferType::DiskV2 { .. }))
}

/// Gets all of the configured stages for this buffer.
pub fn stages(&self) -> &[BufferType] {
match self {
Expand Down
68 changes: 59 additions & 9 deletions lib/vector-buffers/src/topology/channel/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{sync::Arc, time::Instant};

use async_recursion::async_recursion;
use derivative::Derivative;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, oneshot};
use tracing::Span;
use vector_common::internal_event::{InternalEventHandle, Registered, register};

Expand All @@ -14,14 +14,32 @@ use crate::{
variants::disk_v2::{self, ProductionFilesystem},
};

/// Wraps a disk buffer writer together with a drop-notification channel.
///
/// When every `Arc<DiskWriterHandle>` clone is dropped (i.e. no sender holds
/// the writer any more), the `oneshot::Sender` inside is dropped too, which
/// completes the paired `oneshot::Receiver`. This lets the topology wait for
/// the disk buffer lock to be fully released before creating a replacement.
///
/// The writer and the drop-notification receiver live behind separate mutexes
/// so that taking the barrier never contends with an in-flight `send`.
#[derive(Debug)]
pub struct DiskWriterHandle<T: Bufferable> {
writer: Mutex<disk_v2::BufferWriter<T, ProductionFilesystem>>,
_drop_tx: oneshot::Sender<()>,
/// The receiving end of the drop notification. Only the first caller to
/// take it gets `Some`; subsequent calls return `None`.
drop_rx: Mutex<Option<oneshot::Receiver<()>>>,
}

/// Adapter for papering over various sender backends.
#[derive(Clone, Debug)]
pub enum SenderAdapter<T: Bufferable> {
/// The in-memory channel buffer.
InMemory(LimitedSender<T>),

/// The disk v2 buffer.
DiskV2(Arc<Mutex<disk_v2::BufferWriter<T, ProductionFilesystem>>>),
DiskV2(Arc<DiskWriterHandle<T>>),
}

impl<T: Bufferable> From<LimitedSender<T>> for SenderAdapter<T> {
Expand All @@ -32,7 +50,12 @@ impl<T: Bufferable> From<LimitedSender<T>> for SenderAdapter<T> {

impl<T: Bufferable> From<disk_v2::BufferWriter<T, ProductionFilesystem>> for SenderAdapter<T> {
fn from(v: disk_v2::BufferWriter<T, ProductionFilesystem>) -> Self {
Self::DiskV2(Arc::new(Mutex::new(v)))
let (drop_tx, drop_rx) = oneshot::channel();
Self::DiskV2(Arc::new(DiskWriterHandle {
writer: Mutex::new(v),
_drop_tx: drop_tx,
drop_rx: Mutex::new(Some(drop_rx)),
}))
}
}

Expand All @@ -43,8 +66,8 @@ where
pub(crate) async fn send(&mut self, item: T) -> crate::Result<()> {
match self {
Self::InMemory(tx) => tx.send(item).await.map_err(Into::into),
Self::DiskV2(writer) => {
let mut writer = writer.lock().await;
Self::DiskV2(handle) => {
let mut writer = handle.writer.lock().await;

writer.write_record(item).await.map(|_| ()).map_err(|e| {
// TODO: Could some errors be handled and not be unrecoverable? Right now,
Expand All @@ -66,8 +89,8 @@ where
.try_send(item)
.map(|()| None)
.or_else(|e| Ok(Some(e.into_inner()))),
Self::DiskV2(writer) => {
let mut writer = writer.lock().await;
Self::DiskV2(handle) => {
let mut writer = handle.writer.lock().await;

writer.try_write_record(item).await.map_err(|e| {
// TODO: Could some errors be handled and not be unrecoverable? Right now,
Expand All @@ -86,8 +109,8 @@ where
pub(crate) async fn flush(&mut self) -> crate::Result<()> {
match self {
Self::InMemory(_) => Ok(()),
Self::DiskV2(writer) => {
let mut writer = writer.lock().await;
Self::DiskV2(handle) => {
let mut writer = handle.writer.lock().await;
writer.flush().await.map_err(|e| {
// Errors on the I/O path, which is all that flushing touches, are never recoverable.
error!("Disk buffer writer has encountered an unrecoverable error.");
Expand All @@ -104,6 +127,18 @@ where
Self::DiskV2(_) => None,
}
}

/// Takes the drop-notification receiver for a disk buffer writer.
///
/// Returns `Some` for `DiskV2` senders on the first call, `None` thereafter
/// (or for `InMemory` senders). The returned receiver completes when every
/// `Arc` clone of the underlying writer handle has been dropped.
pub async fn take_buffer_release_barrier(&self) -> Option<oneshot::Receiver<()>> {
match self {
Self::InMemory(_) => None,
Self::DiskV2(handle) => handle.drop_rx.lock().await.take(),
}
}
}

/// A buffer sender.
Expand Down Expand Up @@ -194,6 +229,21 @@ impl<T: Bufferable> BufferSender<T> {
}

impl<T: Bufferable> BufferSender<T> {
/// Takes the drop-notification receivers for all disk buffer writers in this
/// sender chain (base + any overflow stages).
/// See [`SenderAdapter::take_buffer_release_barrier`].
#[async_recursion]
pub async fn take_buffer_release_barriers(&self) -> Vec<oneshot::Receiver<()>> {
let mut rxs = Vec::new();
if let Some(rx) = self.base.take_buffer_release_barrier().await {
rxs.push(rx);
}
if let Some(overflow) = self.overflow.as_ref() {
rxs.extend(overflow.take_buffer_release_barriers().await);
}
rxs
}

#[cfg(test)]
pub(crate) fn get_base_ref(&self) -> &SenderAdapter<T> {
&self.base
Expand Down
54 changes: 46 additions & 8 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,10 +588,24 @@ impl RunningTopology {
.collect::<HashSet<_>>();

// For any existing sink that has a conflicting resource dependency with a changed/added
// sink, or for any sink that we want to reuse their buffer, we need to explicit wait for
// them to finish processing so we can reclaim ownership of those resources/buffers.
// sink, for any sink that we want to reuse their buffer, or for any sink with a disk
// buffer being rebuilt, we need to explicitly wait for them to finish processing so we
// can reclaim ownership of those resources/buffers.
//
// Changed sinks with disk buffers that are NOT being reused must also be
// waited on so we can ensure the disk buffer file lock is released before
// the replacement buffer is built.
let changed_disk_buffer_sinks = diff.sinks.to_change.iter().filter(|key| {
!reuse_buffers.contains(*key)
&& self
.config
.sink(key)
.is_some_and(|s| s.buffer.has_disk_stage())
});

let wait_for_sinks = conflicting_sinks
.chain(reuse_buffers.iter().cloned())
.chain(changed_disk_buffer_sinks.cloned())
.collect::<HashSet<_>>();

// First, we remove any inputs to removed sinks so they can naturally shut down.
Expand Down Expand Up @@ -631,15 +645,19 @@ impl RunningTopology {
}))
.collect::<Vec<_>>();

let mut buffer_release_barriers: HashMap<ComponentKey, Vec<_>> = HashMap::new();
for key in &sinks_to_change {
debug!(component_id = %key, "Changing sink.");
if reuse_buffers.contains(key) {
self.detach_triggers
.remove(key)
.unwrap()
.into_inner()
.cancel();
// Cancel the detach trigger for all changed sinks so the sink's input
// stream terminates and the sink task can complete. Without this, the
// old sink task never exits and the reload stalls at `previous.await`.
self.detach_triggers
.remove(key)
.unwrap()
.into_inner()
.cancel();
Comment on lines +654 to +658
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep graceful drain for non-reused sink reloads

This unconditional cancel() now runs for every changed sink, not just buffer-reuse cases. Because sink input is wrapped with take_until_if(tripwire) in builder.rs, canceling here terminates the stream immediately and bypasses natural draining of queued events. For changed sinks whose buffers are not reused (especially in-memory buffers), this introduces event loss during reloads that previously allowed graceful drain.

Useful? React with 👍 / 👎.


if reuse_buffers.contains(key) {
// We explicitly clone the input side of the buffer and store it so we don't lose
// it when we remove the inputs below.
//
Expand All @@ -651,6 +669,18 @@ impl RunningTopology {
// info about which sinks are having their buffers reused and treat them differently
// at other stages.
buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
} else {
// For changed sinks with disk buffers that are NOT being reused, take
// the writer-drop notification receivers before we remove the input.
// We await these after the old sink tasks complete to guarantee all
// disk buffer locks are released before we build the replacement.
let rxs = match self.inputs.get(key) {
Some(s) => s.take_buffer_release_barriers().await,
None => Vec::new(),
};
if !rxs.is_empty() {
buffer_release_barriers.insert((*key).clone(), rxs);
}
}
self.remove_inputs(key, diff, new_config).await;
}
Expand Down Expand Up @@ -694,6 +724,14 @@ impl RunningTopology {

buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
}

// Wait for all old disk buffer writers to be fully released (by
// the fanout) so file locks are dropped before we build a replacement.
if let Some(rxs) = buffer_release_barriers.remove(key) {
for rx in rxs {
let _ = rx.await;
}
Comment on lines +730 to +733
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Prevent deadlock when awaiting disk writer release barrier

Waiting on the barrier here can hang reload indefinitely when a changed sink is paused (not removed) and upstream stops emitting. remove_inputs may send Pause, and fanout processes control messages during send activity; if no more sends occur, the paused sender clone holding the disk writer is never dropped, so this rx.await never resolves. That turns certain quiet pipelines into permanent reload stalls.

Useful? React with 👍 / 👎.

}
}
}

Expand Down
Loading
Loading