Add config for tokio blocking pool options

This allows adjusting the blocking thread pool in the pingora server's
runtime using configuration.
This commit is contained in:
Edward Wang
2026-03-16 14:11:47 -07:00
committed by Kevin Guthrie
parent 1cfc731540
commit b994854728
4 changed files with 173 additions and 23 deletions
+1 -1
View File
@@ -1 +1 @@
784d7b1a2f93b5d4f3f506a58c695509b043262c
36ff6052538251c45f2b9fb2d86ae58872875fe9
+45 -1
View File
@@ -116,6 +116,15 @@ pub struct ServerConf {
/// The retry interval is 1 second between attempts.
/// If not set, defaults to 5 retries.
pub upgrade_sock_connect_accept_max_retries: Option<usize>,
/// The maximum number of threads in each runtime's blocking thread pool.
///
/// The blocking pool handles [`tokio::task::spawn_blocking`] tasks.
/// When not set, the tokio default (512) is used.
pub max_blocking_threads: Option<usize>,
/// How long, in seconds, idle blocking threads are kept alive before being shut down.
///
/// When not set, the tokio default (10 seconds) is used.
pub blocking_threads_ttl_seconds: Option<u64>,
}
impl Default for ServerConf {
@@ -144,6 +153,8 @@ impl Default for ServerConf {
graceful_shutdown_timeout_seconds: None,
max_retries: DEFAULT_MAX_RETRIES,
upgrade_sock_connect_accept_max_retries: None,
max_blocking_threads: None,
blocking_threads_ttl_seconds: None,
}
}
}
@@ -248,7 +259,9 @@ impl ServerConf {
}
pub fn validate(self) -> Result<Self> {
// TODO: do the validation
if self.max_blocking_threads == Some(0) {
return Error::e_explain(ReadError, "max_blocking_threads must be greater than zero");
}
Ok(self)
}
@@ -311,6 +324,8 @@ mod tests {
graceful_shutdown_timeout_seconds: None,
max_retries: 1,
upgrade_sock_connect_accept_max_retries: None,
max_blocking_threads: None,
blocking_threads_ttl_seconds: None,
};
// cargo test -- --nocapture not_a_test_i_cannot_write_yaml_by_hand
println!("{}", conf.to_yaml());
@@ -349,4 +364,33 @@ version: 1
assert_eq!(DEFAULT_MAX_RETRIES, conf.max_retries);
assert_eq!("/tmp/pingora.pid", conf.pid_file);
}
#[test]
fn test_zero_max_blocking_threads_is_rejected() {
init_log();
let conf_str = r#"
---
version: 1
max_blocking_threads: 0
"#;
let result = ServerConf::from_yaml(conf_str);
assert!(
result.is_err(),
"max_blocking_threads: 0 should fail validation"
);
}
#[test]
fn test_valid_max_blocking_threads() {
init_log();
let conf_str = r#"
---
version: 1
max_blocking_threads: 64
blocking_threads_ttl_seconds: 30
"#;
let conf = ServerConf::from_yaml(conf_str).unwrap();
assert_eq!(Some(64), conf.max_blocking_threads);
assert_eq!(Some(30), conf.blocking_threads_ttl_seconds);
}
}
+21 -9
View File
@@ -27,7 +27,7 @@ use daemon::daemonize;
use daggy::NodeIndex;
use log::{debug, error, info, warn};
use parking_lot::Mutex;
use pingora_runtime::Runtime;
use pingora_runtime::{BlockingPoolOpts, Runtime, RuntimeBuilder};
use pingora_timeout::fast_timeout;
#[cfg(feature = "sentry")]
use sentry::ClientOptions;
@@ -378,11 +378,13 @@ impl Server {
listeners_per_fd: usize,
ready_notifier: ServiceReadyNotifier,
dependency_watches: Vec<ServiceReadyWatch>,
blocking_opts: BlockingPoolOpts,
) -> Runtime
// NOTE: we need to keep the runtime outside async since
// otherwise the runtime will be dropped.
{
let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
let service_runtime =
Server::create_runtime(service.name(), threads, work_stealing, blocking_opts);
let service_name = service.name().to_string();
service_runtime.get_handle().spawn(async move {
// Wait for all dependencies to be ready
@@ -670,6 +672,11 @@ impl Server {
panic!("Daemonizing under windows is not supported");
}
let blocking_opts = BlockingPoolOpts {
max_threads: conf.max_blocking_threads,
thread_keep_alive: conf.blocking_threads_ttl_seconds.map(Duration::from_secs),
};
// Holds tuples of runtimes and their service name.
let mut runtimes: Vec<(Runtime, String)> = Vec::new();
@@ -743,13 +750,14 @@ impl Server {
self.configuration.listener_tasks_per_fd,
ready_notifier,
dependency_watches,
blocking_opts.clone(),
);
runtimes.push((runtime, name));
}
// blocked on main loop so that it runs forever
// Only work steal runtime can use block_on()
let server_runtime = Server::create_runtime("Server", 1, true);
let server_runtime = Server::create_runtime("Server", 1, true, BlockingPoolOpts::default());
#[cfg(unix)]
let shutdown_type = server_runtime
.get_handle()
@@ -818,11 +826,15 @@ impl Server {
.ok();
}
fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
if work_steal {
Runtime::new_steal(threads, name)
} else {
Runtime::new_no_steal(threads, name)
}
fn create_runtime(
name: &str,
threads: usize,
work_steal: bool,
blocking_opts: BlockingPoolOpts,
) -> Runtime {
RuntimeBuilder::new(threads, name)
.work_steal(work_steal)
.blocking_pool_opts(blocking_opts)
.build()
}
}
+106 -12
View File
@@ -32,6 +32,22 @@ use thread_local::ThreadLocal;
use tokio::runtime::{Builder, Handle};
use tokio::sync::oneshot::{channel, Sender};
/// Configuration options for the blocking thread pool used by the runtime.
///
/// These options control the behavior of the blocking thread pool that handles
/// [`tokio::task::spawn_blocking`] tasks.
#[derive(Debug, Clone, Default)]
pub struct BlockingPoolOpts {
/// The maximum number of threads in the blocking thread pool.
///
/// When not set, the tokio default (512) is used.
pub max_threads: Option<usize>,
/// The duration that idle blocking threads are kept alive before being shut down.
///
/// When not set, the tokio default (10 seconds) is used.
pub thread_keep_alive: Option<Duration>,
}
/// Pingora async multi-threaded runtime
///
/// The `Steal` flavor is effectively tokio multi-threaded runtime.
@@ -42,22 +58,95 @@ pub enum Runtime {
NoSteal(NoStealRuntime),
}
/// Apply [`BlockingPoolOpts`] to a tokio [`Builder`].
fn apply_blocking_opts(builder: &mut Builder, opts: &BlockingPoolOpts) {
if let Some(max) = opts.max_threads {
builder.max_blocking_threads(max);
}
if let Some(ttl) = opts.thread_keep_alive {
builder.thread_keep_alive(ttl);
}
}
/// Builder for constructing a [`Runtime`].
///
/// # Example
///
/// ```
/// use pingora_runtime::{RuntimeBuilder, BlockingPoolOpts};
/// use std::time::Duration;
///
/// let rt = RuntimeBuilder::new(4, "my-service")
/// .blocking_pool_opts(BlockingPoolOpts {
/// max_threads: Some(64),
/// thread_keep_alive: Some(Duration::from_secs(30)),
/// })
/// .build();
/// ```
pub struct RuntimeBuilder {
threads: usize,
name: String,
work_steal: bool,
blocking_pool_opts: BlockingPoolOpts,
}
impl RuntimeBuilder {
/// Create a new builder with the given number of worker threads and runtime name.
///
/// Work stealing is enabled by default.
pub fn new(threads: usize, name: &str) -> Self {
Self {
threads,
name: name.to_string(),
work_steal: true,
blocking_pool_opts: BlockingPoolOpts::default(),
}
}
/// Set whether work stealing is enabled.
///
/// When `true` (the default), a tokio multi-thread runtime is used.
/// When `false`, a pool of single-threaded tokio runtimes is used instead.
pub fn work_steal(mut self, enabled: bool) -> Self {
self.work_steal = enabled;
self
}
/// Set the [`BlockingPoolOpts`] for the runtime's blocking thread pool.
pub fn blocking_pool_opts(mut self, opts: BlockingPoolOpts) -> Self {
self.blocking_pool_opts = opts;
self
}
/// Build the [`Runtime`].
pub fn build(self) -> Runtime {
if self.work_steal {
let mut builder = Builder::new_multi_thread();
builder
.enable_all()
.worker_threads(self.threads)
.thread_name(&self.name);
apply_blocking_opts(&mut builder, &self.blocking_pool_opts);
Runtime::Steal(builder.build().unwrap())
} else {
Runtime::NoSteal(NoStealRuntime::new(
self.threads,
&self.name,
self.blocking_pool_opts,
))
}
}
}
impl Runtime {
/// Create a `Steal` flavor runtime. This just a regular tokio runtime
pub fn new_steal(threads: usize, name: &str) -> Self {
Self::Steal(
Builder::new_multi_thread()
.enable_all()
.worker_threads(threads)
.thread_name(name)
.build()
.unwrap(),
)
RuntimeBuilder::new(threads, name).build()
}
/// Create a `NoSteal` flavor runtime. This is backed by multiple tokio current-thread runtime
pub fn new_no_steal(threads: usize, name: &str) -> Self {
Self::NoSteal(NoStealRuntime::new(threads, name))
RuntimeBuilder::new(threads, name).work_steal(false).build()
}
/// Return the &[Handle] of the [Runtime].
@@ -109,6 +198,7 @@ type Pools = Arc<OnceCell<Box<[Handle]>>>;
pub struct NoStealRuntime {
threads: usize,
name: String,
blocking_opts: BlockingPoolOpts,
// Lazily init the runtimes so that they are created after pingora
// daemonize itself. Otherwise the runtime threads are lost.
pools: Pools,
@@ -116,12 +206,13 @@ pub struct NoStealRuntime {
}
impl NoStealRuntime {
/// Create a new [NoStealRuntime]. Panic if `threads` is 0
pub fn new(threads: usize, name: &str) -> Self {
/// Create a new [`NoStealRuntime`] with blocking pool options. Panic if `threads` is 0.
pub fn new(threads: usize, name: &str, blocking_opts: BlockingPoolOpts) -> Self {
assert!(threads != 0);
NoStealRuntime {
threads,
name: name.to_string(),
blocking_opts,
pools: Arc::new(OnceCell::new()),
controls: OnceCell::new(),
}
@@ -131,7 +222,10 @@ impl NoStealRuntime {
let mut pools = Vec::with_capacity(self.threads);
let mut controls = Vec::with_capacity(self.threads);
for _ in 0..self.threads {
let rt = Builder::new_current_thread().enable_all().build().unwrap();
let mut builder = Builder::new_current_thread();
builder.enable_all();
apply_blocking_opts(&mut builder, &self.blocking_opts);
let rt = builder.build().unwrap();
let handler = rt.handle().clone();
let (tx, rx) = channel::<Duration>();
let pools_ref = self.pools.clone();