diff --git a/.bleep b/.bleep index 304d62ed..0036e04f 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -784d7b1a2f93b5d4f3f506a58c695509b043262c +36ff6052538251c45f2b9fb2d86ae58872875fe9 diff --git a/pingora-core/src/server/configuration/mod.rs b/pingora-core/src/server/configuration/mod.rs index 020c90fb..8ab02bf3 100644 --- a/pingora-core/src/server/configuration/mod.rs +++ b/pingora-core/src/server/configuration/mod.rs @@ -116,6 +116,15 @@ pub struct ServerConf { /// The retry interval is 1 second between attempts. /// If not set, defaults to 5 retries. pub upgrade_sock_connect_accept_max_retries: Option, + /// The maximum number of threads in each runtime's blocking thread pool. + /// + /// The blocking pool handles [`tokio::task::spawn_blocking`] tasks. + /// When not set, the tokio default (512) is used. + pub max_blocking_threads: Option, + /// How long, in seconds, idle blocking threads are kept alive before being shut down. + /// + /// When not set, the tokio default (10 seconds) is used. + pub blocking_threads_ttl_seconds: Option, } impl Default for ServerConf { @@ -144,6 +153,8 @@ impl Default for ServerConf { graceful_shutdown_timeout_seconds: None, max_retries: DEFAULT_MAX_RETRIES, upgrade_sock_connect_accept_max_retries: None, + max_blocking_threads: None, + blocking_threads_ttl_seconds: None, } } } @@ -248,7 +259,9 @@ impl ServerConf { } pub fn validate(self) -> Result { - // TODO: do the validation + if self.max_blocking_threads == Some(0) { + return Error::e_explain(ReadError, "max_blocking_threads must be greater than zero"); + } Ok(self) } @@ -311,6 +324,8 @@ mod tests { graceful_shutdown_timeout_seconds: None, max_retries: 1, upgrade_sock_connect_accept_max_retries: None, + max_blocking_threads: None, + blocking_threads_ttl_seconds: None, }; // cargo test -- --nocapture not_a_test_i_cannot_write_yaml_by_hand println!("{}", conf.to_yaml()); @@ -349,4 +364,33 @@ version: 1 assert_eq!(DEFAULT_MAX_RETRIES, conf.max_retries); assert_eq!("/tmp/pingora.pid", conf.pid_file); } + + #[test] + fn test_zero_max_blocking_threads_is_rejected() { + init_log(); + let conf_str = r#" +--- +version: 1 +max_blocking_threads: 0 + "#; + let result = ServerConf::from_yaml(conf_str); + assert!( + result.is_err(), + "max_blocking_threads: 0 should fail validation" + ); + } + + #[test] + fn test_valid_max_blocking_threads() { + init_log(); + let conf_str = r#" +--- +version: 1 +max_blocking_threads: 64 +blocking_threads_ttl_seconds: 30 + "#; + let conf = ServerConf::from_yaml(conf_str).unwrap(); + assert_eq!(Some(64), conf.max_blocking_threads); + assert_eq!(Some(30), conf.blocking_threads_ttl_seconds); + } } diff --git a/pingora-core/src/server/mod.rs b/pingora-core/src/server/mod.rs index 406c0d0c..a1c3efd4 100644 --- a/pingora-core/src/server/mod.rs +++ b/pingora-core/src/server/mod.rs @@ -27,7 +27,7 @@ use daemon::daemonize; use daggy::NodeIndex; use log::{debug, error, info, warn}; use parking_lot::Mutex; -use pingora_runtime::Runtime; +use pingora_runtime::{BlockingPoolOpts, Runtime, RuntimeBuilder}; use pingora_timeout::fast_timeout; #[cfg(feature = "sentry")] use sentry::ClientOptions; @@ -378,11 +378,13 @@ impl Server { listeners_per_fd: usize, ready_notifier: ServiceReadyNotifier, dependency_watches: Vec, + blocking_opts: BlockingPoolOpts, ) -> Runtime // NOTE: we need to keep the runtime outside async since // otherwise the runtime will be dropped. { - let service_runtime = Server::create_runtime(service.name(), threads, work_stealing); + let service_runtime = + Server::create_runtime(service.name(), threads, work_stealing, blocking_opts); let service_name = service.name().to_string(); service_runtime.get_handle().spawn(async move { // Wait for all dependencies to be ready @@ -670,6 +672,11 @@ impl Server { panic!("Daemonizing under windows is not supported"); } + let blocking_opts = BlockingPoolOpts { + max_threads: conf.max_blocking_threads, + thread_keep_alive: conf.blocking_threads_ttl_seconds.map(Duration::from_secs), + }; + // Holds tuples of runtimes and their service name. let mut runtimes: Vec<(Runtime, String)> = Vec::new(); @@ -743,13 +750,14 @@ impl Server { self.configuration.listener_tasks_per_fd, ready_notifier, dependency_watches, + blocking_opts.clone(), ); runtimes.push((runtime, name)); } // blocked on main loop so that it runs forever // Only work steal runtime can use block_on() - let server_runtime = Server::create_runtime("Server", 1, true); + let server_runtime = Server::create_runtime("Server", 1, true, BlockingPoolOpts::default()); #[cfg(unix)] let shutdown_type = server_runtime .get_handle() @@ -818,11 +826,15 @@ impl Server { .ok(); } - fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime { - if work_steal { - Runtime::new_steal(threads, name) - } else { - Runtime::new_no_steal(threads, name) - } + fn create_runtime( + name: &str, + threads: usize, + work_steal: bool, + blocking_opts: BlockingPoolOpts, + ) -> Runtime { + RuntimeBuilder::new(threads, name) + .work_steal(work_steal) + .blocking_pool_opts(blocking_opts) + .build() } } diff --git a/pingora-runtime/src/lib.rs b/pingora-runtime/src/lib.rs index a0468f4f..396eef32 100644 --- a/pingora-runtime/src/lib.rs +++ b/pingora-runtime/src/lib.rs @@ -32,6 +32,22 @@ use thread_local::ThreadLocal; use tokio::runtime::{Builder, Handle}; use tokio::sync::oneshot::{channel, Sender}; +/// Configuration options for the blocking thread pool used by the runtime. +/// +/// These options control the behavior of the blocking thread pool that handles +/// [`tokio::task::spawn_blocking`] tasks. +#[derive(Debug, Clone, Default)] +pub struct BlockingPoolOpts { + /// The maximum number of threads in the blocking thread pool. + /// + /// When not set, the tokio default (512) is used. + pub max_threads: Option, + /// The duration that idle blocking threads are kept alive before being shut down. + /// + /// When not set, the tokio default (10 seconds) is used. + pub thread_keep_alive: Option, +} + /// Pingora async multi-threaded runtime /// /// The `Steal` flavor is effectively tokio multi-threaded runtime. @@ -42,22 +58,95 @@ pub enum Runtime { NoSteal(NoStealRuntime), } +/// Apply [`BlockingPoolOpts`] to a tokio [`Builder`]. +fn apply_blocking_opts(builder: &mut Builder, opts: &BlockingPoolOpts) { + if let Some(max) = opts.max_threads { + builder.max_blocking_threads(max); + } + if let Some(ttl) = opts.thread_keep_alive { + builder.thread_keep_alive(ttl); + } +} + +/// Builder for constructing a [`Runtime`]. +/// +/// # Example +/// +/// ``` +/// use pingora_runtime::{RuntimeBuilder, BlockingPoolOpts}; +/// use std::time::Duration; +/// +/// let rt = RuntimeBuilder::new(4, "my-service") +/// .blocking_pool_opts(BlockingPoolOpts { +/// max_threads: Some(64), +/// thread_keep_alive: Some(Duration::from_secs(30)), +/// }) +/// .build(); +/// ``` +pub struct RuntimeBuilder { + threads: usize, + name: String, + work_steal: bool, + blocking_pool_opts: BlockingPoolOpts, +} + +impl RuntimeBuilder { + /// Create a new builder with the given number of worker threads and runtime name. + /// + /// Work stealing is enabled by default. + pub fn new(threads: usize, name: &str) -> Self { + Self { + threads, + name: name.to_string(), + work_steal: true, + blocking_pool_opts: BlockingPoolOpts::default(), + } + } + + /// Set whether work stealing is enabled. + /// + /// When `true` (the default), a tokio multi-thread runtime is used. + /// When `false`, a pool of single-threaded tokio runtimes is used instead. + pub fn work_steal(mut self, enabled: bool) -> Self { + self.work_steal = enabled; + self + } + + /// Set the [`BlockingPoolOpts`] for the runtime's blocking thread pool. + pub fn blocking_pool_opts(mut self, opts: BlockingPoolOpts) -> Self { + self.blocking_pool_opts = opts; + self + } + + /// Build the [`Runtime`]. + pub fn build(self) -> Runtime { + if self.work_steal { + let mut builder = Builder::new_multi_thread(); + builder + .enable_all() + .worker_threads(self.threads) + .thread_name(&self.name); + apply_blocking_opts(&mut builder, &self.blocking_pool_opts); + Runtime::Steal(builder.build().unwrap()) + } else { + Runtime::NoSteal(NoStealRuntime::new( + self.threads, + &self.name, + self.blocking_pool_opts, + )) + } + } +} + impl Runtime { /// Create a `Steal` flavor runtime. This just a regular tokio runtime pub fn new_steal(threads: usize, name: &str) -> Self { - Self::Steal( - Builder::new_multi_thread() - .enable_all() - .worker_threads(threads) - .thread_name(name) - .build() - .unwrap(), - ) + RuntimeBuilder::new(threads, name).build() } /// Create a `NoSteal` flavor runtime. This is backed by multiple tokio current-thread runtime pub fn new_no_steal(threads: usize, name: &str) -> Self { - Self::NoSteal(NoStealRuntime::new(threads, name)) + RuntimeBuilder::new(threads, name).work_steal(false).build() } /// Return the &[Handle] of the [Runtime]. @@ -109,6 +198,7 @@ type Pools = Arc>>; pub struct NoStealRuntime { threads: usize, name: String, + blocking_opts: BlockingPoolOpts, // Lazily init the runtimes so that they are created after pingora // daemonize itself. Otherwise the runtime threads are lost. pools: Pools, @@ -116,12 +206,13 @@ pub struct NoStealRuntime { } impl NoStealRuntime { - /// Create a new [NoStealRuntime]. Panic if `threads` is 0 - pub fn new(threads: usize, name: &str) -> Self { + /// Create a new [`NoStealRuntime`] with blocking pool options. Panic if `threads` is 0. + pub fn new(threads: usize, name: &str, blocking_opts: BlockingPoolOpts) -> Self { assert!(threads != 0); NoStealRuntime { threads, name: name.to_string(), + blocking_opts, pools: Arc::new(OnceCell::new()), controls: OnceCell::new(), } @@ -131,7 +222,10 @@ impl NoStealRuntime { let mut pools = Vec::with_capacity(self.threads); let mut controls = Vec::with_capacity(self.threads); for _ in 0..self.threads { - let rt = Builder::new_current_thread().enable_all().build().unwrap(); + let mut builder = Builder::new_current_thread(); + builder.enable_all(); + apply_blocking_opts(&mut builder, &self.blocking_opts); + let rt = builder.build().unwrap(); let handler = rt.handle().clone(); let (tx, rx) = channel::(); let pools_ref = self.pools.clone();