1- use std:: { collections:: HashMap , num:: NonZeroUsize , path:: PathBuf } ;
1+ use std:: { collections:: HashMap , num:: NonZeroUsize , path:: PathBuf , sync :: Arc } ;
22
33use futures:: StreamExt ;
44#[ cfg( feature = "enterprise" ) ]
55use futures_util:: future:: BoxFuture ;
6+ use futures_util:: FutureExt ;
67use once_cell:: race:: OnceNonZeroUsize ;
78use tokio:: {
89 runtime:: { self , Runtime } ,
9- sync:: mpsc,
10+ sync:: { mpsc, Mutex } ,
1011} ;
1112use tokio_stream:: wrappers:: UnboundedReceiverStream ;
1213
1314#[ cfg( feature = "enterprise" ) ]
1415use crate :: config:: enterprise:: {
15- attach_enterprise_components, report_configuration, report_on_reload , EnterpriseError ,
16- EnterpriseMetadata , EnterpriseReporter ,
16+ attach_enterprise_components, report_configuration, EnterpriseError , EnterpriseMetadata ,
17+ EnterpriseReporter ,
1718} ;
19+ #[ cfg( not( windows) ) ]
20+ use crate :: control_server:: ControlServer ;
1821#[ cfg( not( feature = "enterprise-tests" ) ) ]
1922use crate :: metrics;
2023#[ cfg( windows) ]
@@ -23,21 +26,17 @@ use crate::service;
2326use crate :: { api, internal_events:: ApiStarted } ;
2427use crate :: {
2528 cli:: { handle_config_errors, Color , LogFormat , Opts , RootOpts , SubCommand } ,
26- config:: { self } ,
27- generate, generate_schema, graph, heartbeat, list,
29+ config, generate, generate_schema, graph, heartbeat, list,
2830 signal:: { self , SignalTo } ,
29- topology:: { self , RunningTopology } ,
31+ topology:: { self , ReloadOutcome , RunningTopology , TopologyController } ,
3032 trace, unit_test, validate,
3133} ;
3234#[ cfg( feature = "api-client" ) ]
3335use crate :: { tap, top} ;
3436
3537pub static WORKER_THREADS : OnceNonZeroUsize = OnceNonZeroUsize :: new ( ) ;
3638
37- use crate :: internal_events:: {
38- VectorConfigLoadError , VectorQuit , VectorRecoveryError , VectorReloadError , VectorReloaded ,
39- VectorStarted , VectorStopped ,
40- } ;
39+ use crate :: internal_events:: { VectorQuit , VectorStarted , VectorStopped } ;
4140
4241use tokio:: sync:: broadcast:: error:: RecvError ;
4342
@@ -289,7 +288,7 @@ impl Application {
289288 #[ cfg( feature = "api" ) ]
290289 // Assigned to prevent the API terminating when falling out of scope.
291290 let api_server = if api_config. enabled {
292- use std:: sync:: { Arc , atomic:: AtomicBool } ;
291+ use std:: sync:: atomic:: AtomicBool ;
293292
294293 let api_server = api:: Server :: start ( topology. config ( ) , topology. watch ( ) , Arc :: < AtomicBool > :: clone ( & topology. running ) ) ;
295294
@@ -313,7 +312,7 @@ impl Application {
313312 None
314313 } ;
315314
316- let mut topology_controller = TopologyController {
315+ let topology_controller = TopologyController {
317316 topology,
318317 config_paths,
319318 require_healthy : opts. require_healthy ,
@@ -322,18 +321,43 @@ impl Application {
322321 #[ cfg( feature = "api" ) ]
323322 api_server,
324323 } ;
324+ let topology_controller = Arc :: new ( Mutex :: new ( topology_controller) ) ;
325+
326+ // If the relevant ENV var is set, start up the control server
327+ #[ cfg( not( windows) ) ]
328+ let control_server_pieces = if let Ok ( path) = std:: env:: var ( "VECTOR_CONTROL_SOCKET_PATH" ) {
329+ let ( shutdown_trigger, tripwire) = stream_cancel:: Tripwire :: new ( ) ;
330+ match ControlServer :: bind ( path, Arc :: clone ( & topology_controller) , tripwire) {
331+ Ok ( control_server) => {
332+ let server_handle = tokio:: spawn ( control_server. run ( ) ) ;
333+ Some ( ( shutdown_trigger, server_handle) )
334+ }
335+ Err ( error) => {
336+ error ! ( message = "Error binding control server." , %error) ;
337+ // TODO: We should exit non-zero here, but `Application::run` isn't set up
338+ // that way, and we'd need to push everything up to the API server start
339+ // into `Application::prepare`.
340+ return
341+ }
342+ }
343+ } else {
344+ None
345+ } ;
325346
326347 let signal = loop {
327348 tokio:: select! {
328349 signal = signal_rx. recv( ) => {
329350 match signal {
330351 Ok ( SignalTo :: ReloadFromConfigBuilder ( config_builder) ) => {
352+ let mut topology_controller = topology_controller. lock( ) . await ;
331353 let new_config = config_builder. build( ) . map_err( handle_config_errors) . ok( ) ;
332- if let Some ( signal ) = topology_controller. reload( new_config) . await {
333- break signal ;
354+ if let ReloadOutcome :: FatalError = topology_controller. reload( new_config) . await {
355+ break SignalTo :: Shutdown ;
334356 }
335357 }
336358 Ok ( SignalTo :: ReloadFromDisk ) => {
359+ let mut topology_controller = topology_controller. lock( ) . await ;
360+
337361 // Reload paths
338362 if let Some ( paths) = config:: process_paths( & opts. config_paths_with_formats( ) ) {
339363 topology_controller. config_paths = paths;
@@ -344,8 +368,8 @@ impl Application {
344368 . await
345369 . map_err( handle_config_errors) . ok( ) ;
346370
347- if let Some ( signal ) = topology_controller. reload( new_config) . await {
348- break signal ;
371+ if let ReloadOutcome :: FatalError = topology_controller. reload( new_config) . await {
372+ break SignalTo :: Shutdown ;
349373 }
350374 } ,
351375 Err ( RecvError :: Lagged ( amt) ) => warn!( "Overflow, dropped {} signals." , amt) ,
@@ -355,11 +379,25 @@ impl Application {
355379 }
356380 // Trigger graceful shutdown if a component crashed, or all sources have ended.
357381 _ = graceful_crash. next( ) => break SignalTo :: Shutdown ,
358- _ = topology_controller. sources_finished( ) => break SignalTo :: Shutdown ,
382+ _ = sources_finished( Arc :: clone( & topology_controller) ) => {
383+ info!( "All sources have finished." ) ;
384+ break SignalTo :: Shutdown
385+ } ,
359386 else => unreachable!( "Signal streams never end" ) ,
360387 }
361388 } ;
362389
390+ // Shut down the control server, if running
391+ #[ cfg( not( windows) ) ]
392+ if let Some ( ( shutdown_trigger, server_handle) ) = control_server_pieces {
393+ drop ( shutdown_trigger) ;
394+ server_handle. await . expect ( "control server task panicked" ) . expect ( "control server error" ) ;
395+ }
396+
397+ // Once any control server has stopped, we'll have the only reference to the topology
398+ // controller and can safely remove it from the Arc/Mutex to shut down the topology.
399+ let topology_controller = Arc :: try_unwrap ( topology_controller) . expect ( "fail to unwrap topology controller" ) . into_inner ( ) ;
400+
363401 match signal {
364402 SignalTo :: Shutdown => {
365403 emit ! ( VectorStopped ) ;
@@ -383,78 +421,36 @@ impl Application {
383421 }
384422}
385423
386- struct TopologyController {
387- topology : RunningTopology ,
388- config_paths : Vec < config:: ConfigPath > ,
389- require_healthy : Option < bool > ,
390- #[ cfg( feature = "enterprise" ) ]
391- enterprise_reporter : Option < EnterpriseReporter < BoxFuture < ' static , ( ) > > > ,
392- #[ cfg( feature = "api" ) ]
393- api_server : Option < api:: Server > ,
394- }
395-
396- impl TopologyController {
397- async fn reload ( & mut self , new_config : Option < config:: Config > ) -> Option < SignalTo > {
398- if new_config. is_none ( ) {
399- emit ! ( VectorConfigLoadError ) ;
400- return None ;
401- }
402- let mut new_config = new_config. unwrap ( ) ;
403-
404- new_config
405- . healthchecks
406- . set_require_healthy ( self . require_healthy ) ;
407-
408- #[ cfg( feature = "enterprise" ) ]
409- // Augment config to enable observability within Datadog, if applicable.
410- match EnterpriseMetadata :: try_from ( & new_config) {
411- Ok ( metadata) => {
412- if let Some ( e) = report_on_reload (
413- & mut new_config,
414- metadata,
415- self . config_paths . clone ( ) ,
416- self . enterprise_reporter . as_ref ( ) ,
417- ) {
418- self . enterprise_reporter = Some ( e) ;
419- }
420- }
421- Err ( err) => {
422- if let EnterpriseError :: MissingApiKey = err {
423- emit ! ( VectorReloadError ) ;
424- return None ;
425- }
426- }
427- }
428-
429- match self . topology . reload_config_and_respawn ( new_config) . await {
430- Ok ( true ) => {
431- #[ cfg( feature = "api" ) ]
432- // Pass the new config to the API server.
433- if let Some ( ref api_server) = self . api_server {
434- api_server. update_config ( self . topology . config ( ) ) ;
435- }
436-
437- emit ! ( VectorReloaded {
438- config_paths: & self . config_paths
439- } )
440- }
441- Ok ( false ) => emit ! ( VectorReloadError ) ,
442- // Trigger graceful shutdown for what remains of the topology
443- Err ( ( ) ) => {
444- emit ! ( VectorReloadError ) ;
445- emit ! ( VectorRecoveryError ) ;
446- return Some ( SignalTo :: Shutdown ) ;
447- }
424+ // The `sources_finished` method on `RunningTopology` only considers sources that are currently
425+ // running at the time the method is called. This presents a problem when the set of running
426+ // sources can change while we are waiting on the resulting future to resolve.
427+ //
428+ // This function resolves that issue by waiting in two stages. The first is the usual asynchronous
429+ // wait for the future to complete. When it does, we know that all of the sources that existed when
430+ // the future was built have finished, but we don't know if that's because they were replaced as
431+ // part of a reload (in which case we don't want to return yet). To differentiate, we acquire the
432+ // lock on the topology, create a new future, and check whether it resolves immediately or not. If
433+ // it does resolve, we know all sources are truly finished because we held the lock during the
434+ // check, preventing anyone else from adding new sources. If it does not resolve, that indicates
435+ // that new sources have been added since our original call and we should start the process over to
436+ // continue waiting.
437+ async fn sources_finished ( mutex : Arc < Mutex < TopologyController > > ) {
438+ loop {
439+ // Do an initial async wait while the topology is running, making sure not the hold the
440+ // mutex lock while we wait on sources to finish.
441+ let initial = {
442+ let tc = mutex. lock ( ) . await ;
443+ tc. topology . sources_finished ( )
444+ } ;
445+ initial. await ;
446+
447+ // Once the initial signal is tripped, hold lock on the topology while checking again. This
448+ // ensures that no other task is adding new sources.
449+ let top = mutex. lock ( ) . await ;
450+ if top. topology . sources_finished ( ) . now_or_never ( ) . is_some ( ) {
451+ return ;
452+ } else {
453+ continue ;
448454 }
449-
450- None
451- }
452-
453- async fn sources_finished ( & self ) {
454- self . topology . sources_finished ( ) . await ;
455- }
456-
457- async fn stop ( self ) {
458- self . topology . stop ( ) . await ;
459455 }
460456}
0 commit comments