diff --git a/changelog.d/24929_fix_stall_on_disk_shutdown.fix.md b/changelog.d/24929_fix_stall_on_disk_shutdown.fix.md new file mode 100644 index 0000000000000..0414375f62be5 --- /dev/null +++ b/changelog.d/24929_fix_stall_on_disk_shutdown.fix.md @@ -0,0 +1,4 @@ +Fixed issue during shutdown or restart of a sink with disk buffer configured where +component would stall for batch.timeout_sec before fully gracefully shutting down. + +authors: graphcareful diff --git a/lib/vector-buffers/src/config.rs b/lib/vector-buffers/src/config.rs index ea51bfce3cb57..0cc9d77cd29d1 100644 --- a/lib/vector-buffers/src/config.rs +++ b/lib/vector-buffers/src/config.rs @@ -371,6 +371,13 @@ impl Default for BufferConfig { } impl BufferConfig { + /// Returns true if any stage in this buffer configuration uses disk-based storage. + pub fn has_disk_stage(&self) -> bool { + self.stages() + .iter() + .any(|stage| matches!(stage, BufferType::DiskV2 { .. })) + } + /// Gets all of the configured stages for this buffer. pub fn stages(&self) -> &[BufferType] { match self { diff --git a/lib/vector-buffers/src/topology/channel/sender.rs b/lib/vector-buffers/src/topology/channel/sender.rs index c3eb22c3c952e..1dc7dcdb17da9 100644 --- a/lib/vector-buffers/src/topology/channel/sender.rs +++ b/lib/vector-buffers/src/topology/channel/sender.rs @@ -2,7 +2,7 @@ use std::{sync::Arc, time::Instant}; use async_recursion::async_recursion; use derivative::Derivative; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, oneshot}; use tracing::Span; use vector_common::internal_event::{InternalEventHandle, Registered, register}; @@ -14,6 +14,24 @@ use crate::{ variants::disk_v2::{self, ProductionFilesystem}, }; +/// Wraps a disk buffer writer together with a drop-notification channel. +/// +/// When every `Arc` clone is dropped (i.e. no sender holds +/// the writer any more), the `oneshot::Sender` inside is dropped too, which +/// completes the paired `oneshot::Receiver`. This lets the topology wait for +/// the disk buffer lock to be fully released before creating a replacement. +/// +/// The writer and the drop-notification receiver live behind separate mutexes +/// so that taking the barrier never contends with an in-flight `send`. +#[derive(Debug)] +pub struct DiskWriterHandle { + writer: Mutex>, + _drop_tx: oneshot::Sender<()>, + /// The receiving end of the drop notification. Only the first caller to + /// take it gets `Some`; subsequent calls return `None`. + drop_rx: Mutex>>, +} + /// Adapter for papering over various sender backends. #[derive(Clone, Debug)] pub enum SenderAdapter { @@ -21,7 +39,7 @@ pub enum SenderAdapter { InMemory(LimitedSender), /// The disk v2 buffer. - DiskV2(Arc>>), + DiskV2(Arc>), } impl From> for SenderAdapter { @@ -32,7 +50,12 @@ impl From> for SenderAdapter { impl From> for SenderAdapter { fn from(v: disk_v2::BufferWriter) -> Self { - Self::DiskV2(Arc::new(Mutex::new(v))) + let (drop_tx, drop_rx) = oneshot::channel(); + Self::DiskV2(Arc::new(DiskWriterHandle { + writer: Mutex::new(v), + _drop_tx: drop_tx, + drop_rx: Mutex::new(Some(drop_rx)), + })) } } @@ -43,8 +66,8 @@ where pub(crate) async fn send(&mut self, item: T) -> crate::Result<()> { match self { Self::InMemory(tx) => tx.send(item).await.map_err(Into::into), - Self::DiskV2(writer) => { - let mut writer = writer.lock().await; + Self::DiskV2(handle) => { + let mut writer = handle.writer.lock().await; writer.write_record(item).await.map(|_| ()).map_err(|e| { // TODO: Could some errors be handled and not be unrecoverable? Right now, @@ -66,8 +89,8 @@ where .try_send(item) .map(|()| None) .or_else(|e| Ok(Some(e.into_inner()))), - Self::DiskV2(writer) => { - let mut writer = writer.lock().await; + Self::DiskV2(handle) => { + let mut writer = handle.writer.lock().await; writer.try_write_record(item).await.map_err(|e| { // TODO: Could some errors be handled and not be unrecoverable? Right now, @@ -86,8 +109,8 @@ where pub(crate) async fn flush(&mut self) -> crate::Result<()> { match self { Self::InMemory(_) => Ok(()), - Self::DiskV2(writer) => { - let mut writer = writer.lock().await; + Self::DiskV2(handle) => { + let mut writer = handle.writer.lock().await; writer.flush().await.map_err(|e| { // Errors on the I/O path, which is all that flushing touches, are never recoverable. error!("Disk buffer writer has encountered an unrecoverable error."); @@ -104,6 +127,18 @@ where Self::DiskV2(_) => None, } } + + /// Takes the drop-notification receiver for a disk buffer writer. + /// + /// Returns `Some` for `DiskV2` senders on the first call, `None` thereafter + /// (or for `InMemory` senders). The returned receiver completes when every + /// `Arc` clone of the underlying writer handle has been dropped. + pub async fn take_buffer_release_barrier(&self) -> Option> { + match self { + Self::InMemory(_) => None, + Self::DiskV2(handle) => handle.drop_rx.lock().await.take(), + } + } } /// A buffer sender. @@ -194,6 +229,21 @@ impl BufferSender { } impl BufferSender { + /// Takes the drop-notification receivers for all disk buffer writers in this + /// sender chain (base + any overflow stages). + /// See [`SenderAdapter::take_buffer_release_barrier`]. + #[async_recursion] + pub async fn take_buffer_release_barriers(&self) -> Vec> { + let mut rxs = Vec::new(); + if let Some(rx) = self.base.take_buffer_release_barrier().await { + rxs.push(rx); + } + if let Some(overflow) = self.overflow.as_ref() { + rxs.extend(overflow.take_buffer_release_barriers().await); + } + rxs + } + #[cfg(test)] pub(crate) fn get_base_ref(&self) -> &SenderAdapter { &self.base diff --git a/src/topology/running.rs b/src/topology/running.rs index db667e4891110..429fc08a4d7ec 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -588,10 +588,24 @@ impl RunningTopology { .collect::>(); // For any existing sink that has a conflicting resource dependency with a changed/added - // sink, or for any sink that we want to reuse their buffer, we need to explicit wait for - // them to finish processing so we can reclaim ownership of those resources/buffers. + // sink, for any sink that we want to reuse their buffer, or for any sink with a disk + // buffer being rebuilt, we need to explicitly wait for them to finish processing so we + // can reclaim ownership of those resources/buffers. + // + // Changed sinks with disk buffers that are NOT being reused must also be + // waited on so we can ensure the disk buffer file lock is released before + // the replacement buffer is built. + let changed_disk_buffer_sinks = diff.sinks.to_change.iter().filter(|key| { + !reuse_buffers.contains(*key) + && self + .config + .sink(key) + .is_some_and(|s| s.buffer.has_disk_stage()) + }); + let wait_for_sinks = conflicting_sinks .chain(reuse_buffers.iter().cloned()) + .chain(changed_disk_buffer_sinks.cloned()) .collect::>(); // First, we remove any inputs to removed sinks so they can naturally shut down. @@ -631,15 +645,19 @@ impl RunningTopology { })) .collect::>(); + let mut buffer_release_barriers: HashMap> = HashMap::new(); for key in &sinks_to_change { debug!(component_id = %key, "Changing sink."); - if reuse_buffers.contains(key) { - self.detach_triggers - .remove(key) - .unwrap() - .into_inner() - .cancel(); + // Cancel the detach trigger for all changed sinks so the sink's input + // stream terminates and the sink task can complete. Without this, the + // old sink task never exits and the reload stalls at `previous.await`. + self.detach_triggers + .remove(key) + .unwrap() + .into_inner() + .cancel(); + if reuse_buffers.contains(key) { // We explicitly clone the input side of the buffer and store it so we don't lose // it when we remove the inputs below. // @@ -651,6 +669,18 @@ impl RunningTopology { // info about which sinks are having their buffers reused and treat them differently // at other stages. buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone()); + } else { + // For changed sinks with disk buffers that are NOT being reused, take + // the writer-drop notification receivers before we remove the input. + // We await these after the old sink tasks complete to guarantee all + // disk buffer locks are released before we build the replacement. + let rxs = match self.inputs.get(key) { + Some(s) => s.take_buffer_release_barriers().await, + None => Vec::new(), + }; + if !rxs.is_empty() { + buffer_release_barriers.insert((*key).clone(), rxs); + } } self.remove_inputs(key, diff, new_config).await; } @@ -694,6 +724,14 @@ impl RunningTopology { buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx))))); } + + // Wait for all old disk buffer writers to be fully released (by + // the fanout) so file locks are dropped before we build a replacement. + if let Some(rxs) = buffer_release_barriers.remove(key) { + for rx in rxs { + let _ = rx.await; + } + } } } diff --git a/src/topology/test/reload.rs b/src/topology/test/reload.rs index 907be19f6ec0f..7b2db20d4f337 100644 --- a/src/topology/test/reload.rs +++ b/src/topology/test/reload.rs @@ -1,7 +1,7 @@ use std::{ collections::HashSet, net::{SocketAddr, TcpListener}, - num::NonZeroU64, + num::{NonZeroU64, NonZeroUsize}, time::Duration, }; @@ -9,7 +9,7 @@ use futures::StreamExt; use tokio::time::sleep; use tokio_stream::wrappers::UnboundedReceiverStream; use vector_lib::{ - buffers::{BufferConfig, BufferType, WhenFull}, + buffers::{BufferConfig, BufferType, MemoryBufferSize, WhenFull}, config::ComponentKey, }; @@ -293,7 +293,6 @@ async fn topology_readd_input() { #[tokio::test] async fn topology_reload_component() { test_util::trace_init(); - let (_guard, address_0) = next_addr(); let mut old_config = Config::builder(); @@ -320,6 +319,138 @@ async fn topology_reload_component() { } } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn topology_disk_buffer_config_change_does_not_stall() { + // Changing a disk buffer's configuration on a running sink (e.g. via in-situ + // config edit) must not stall the reload. Previously, the detach trigger was + // only cancelled for sinks whose buffers were being reused, so sinks with + // changed disk buffer configs would never have their input stream terminated, + // causing the reload to hang indefinitely. + test_util::trace_init(); + + let (_guard, address) = next_addr(); + + let data_dir = temp_dir(); + std::fs::create_dir(&data_dir).unwrap(); + + let mut old_config = Config::builder(); + old_config.global.data_dir = Some(data_dir); + old_config.add_source("in", internal_metrics_source()); + old_config.add_sink("out", &["in"], prom_exporter_sink(address, 1)); + + let sink_key = ComponentKey::from("out"); + old_config.sinks[&sink_key].buffer = BufferConfig::Single(BufferType::DiskV2 { + max_size: NonZeroU64::new(268435488).unwrap(), + when_full: WhenFull::Block, + }); + + // Change only the disk buffer's max_size. + let mut new_config = old_config.clone(); + new_config.sinks[&sink_key].buffer = BufferConfig::Single(BufferType::DiskV2 { + max_size: NonZeroU64::new(536870912).unwrap(), + when_full: WhenFull::Block, + }); + + let (mut topology, crash) = start_topology(old_config.build().unwrap(), true).await; + let mut crash_stream = UnboundedReceiverStream::new(crash); + + tokio::select! { + _ = wait_for_tcp(address) => {}, + _ = crash_stream.next() => panic!("topology crashed before reload"), + } + + // Simulate an in-situ config edit: the config watcher would put the changed + // sink into components_to_reload, which excludes it from reuse_buffers. + topology.extend_reload_set(HashSet::from_iter(vec![sink_key])); + + let reload_result = tokio::time::timeout( + Duration::from_secs(5), + topology.reload_config_and_respawn(new_config.build().unwrap(), Default::default()), + ) + .await; + + assert!( + reload_result.is_ok(), + "Reload stalled: changing a disk buffer config should not cause the reload to hang" + ); + reload_result.unwrap().unwrap(); + + // Verify the new sink is running. + tokio::select! { + _ = wait_for_tcp(address) => {}, + _ = crash_stream.next() => panic!("topology crashed after reload"), + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn topology_disk_buffer_config_change_chained_does_not_stall() { + // Same as above but with a chained memory → disk overflow buffer to verify + // that the writer-drop notification is collected from overflow stages too. + test_util::trace_init(); + + let (_guard, address) = next_addr(); + + let data_dir = temp_dir(); + std::fs::create_dir(&data_dir).unwrap(); + + let memory_stage = BufferType::Memory { + size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(100).unwrap()), + when_full: WhenFull::Overflow, + }; + + let mut old_config = Config::builder(); + old_config.global.data_dir = Some(data_dir); + old_config.add_source("in", internal_metrics_source()); + old_config.add_sink("out", &["in"], prom_exporter_sink(address, 1)); + + let sink_key = ComponentKey::from("out"); + old_config.sinks[&sink_key].buffer = BufferConfig::Chained(vec![ + memory_stage, + BufferType::DiskV2 { + max_size: NonZeroU64::new(268435488).unwrap(), + when_full: WhenFull::Block, + }, + ]); + + // Change only the disk overflow stage's max_size. + let mut new_config = old_config.clone(); + new_config.sinks[&sink_key].buffer = BufferConfig::Chained(vec![ + memory_stage, + BufferType::DiskV2 { + max_size: NonZeroU64::new(536870912).unwrap(), + when_full: WhenFull::Block, + }, + ]); + + let (mut topology, crash) = start_topology(old_config.build().unwrap(), true).await; + let mut crash_stream = UnboundedReceiverStream::new(crash); + + tokio::select! { + _ = wait_for_tcp(address) => {}, + _ = crash_stream.next() => panic!("topology crashed before reload"), + } + + topology.extend_reload_set(HashSet::from_iter(vec![sink_key])); + + let reload_result = tokio::time::timeout( + Duration::from_secs(5), + topology.reload_config_and_respawn(new_config.build().unwrap(), Default::default()), + ) + .await; + + assert!( + reload_result.is_ok(), + "Reload stalled: changing a chained disk buffer config should not cause the reload to hang" + ); + reload_result.unwrap().unwrap(); + + // Verify the new sink is running. + tokio::select! { + _ = wait_for_tcp(address) => {}, + _ = crash_stream.next() => panic!("topology crashed after reload"), + } +} + async fn reload_sink_test( old_config: Config, new_config: Config,