mirror of
https://github.com/cloudflare/pingora.git
synced 2026-05-15 09:50:42 +00:00
Record discovery and build durations in LoadBalancer::update()
This commit is contained in:
@@ -1 +1 @@
|
||||
36ff6052538251c45f2b9fb2d86ae58872875fe9
|
||||
8b0d1e8979a5ee7e344efe112147e13ae84da55e
|
||||
|
||||
@@ -32,7 +32,7 @@ use std::hash::{Hash, Hasher};
|
||||
use std::io::Result as IoResult;
|
||||
use std::net::ToSocketAddrs;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
mod background;
|
||||
pub mod discovery;
|
||||
@@ -303,6 +303,15 @@ impl Backends {
|
||||
}
|
||||
}
|
||||
|
||||
/// Timing information from the most recent [`LoadBalancer::update`] call.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct UpdateTimings {
|
||||
/// Time spent in [`ServiceDiscovery::discover`].
|
||||
pub discovery_duration: Duration,
|
||||
/// Time spent building the selection algorithm and storing the updated backends.
|
||||
pub build_duration: Duration,
|
||||
}
|
||||
|
||||
/// A [LoadBalancer] instance contains the service discovery, health check and backend selection
|
||||
/// all together.
|
||||
///
|
||||
@@ -317,6 +326,11 @@ where
|
||||
|
||||
config: Option<S::Config>,
|
||||
|
||||
/// Timing information from the most recent [`update`](Self::update) call.
|
||||
///
|
||||
/// `None` until the first successful update completes.
|
||||
last_update_timing: ArcSwap<Option<UpdateTimings>>,
|
||||
|
||||
/// How frequent the health check logic (if set) should run.
|
||||
///
|
||||
/// If `None`, the health check logic will only run once at the beginning.
|
||||
@@ -366,6 +380,7 @@ where
|
||||
backends,
|
||||
selector,
|
||||
config: config_opt,
|
||||
last_update_timing: ArcSwap::new(Arc::new(None)),
|
||||
health_check_frequency: None,
|
||||
update_frequency: None,
|
||||
parallel_health_check: false,
|
||||
@@ -381,18 +396,37 @@ where
|
||||
///
|
||||
/// This function will be called every `update_frequency` if this [LoadBalancer] instance
|
||||
/// is running as a background service.
|
||||
///
|
||||
/// On success, the timing information from this call is stored and can be
|
||||
/// retrieved via [`last_update_timing`](Self::last_update_timing).
|
||||
pub async fn update(&self) -> Result<()> {
|
||||
use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
|
||||
|
||||
let build_nanos = AtomicU64::new(0);
|
||||
let total_start = Instant::now();
|
||||
|
||||
self.backends
|
||||
.update(|backends| {
|
||||
let build_start = Instant::now();
|
||||
let selector = if let Some(config) = &self.config {
|
||||
S::build_with_config(&backends, config)
|
||||
} else {
|
||||
S::build(&backends)
|
||||
};
|
||||
|
||||
self.selector.store(Arc::new(selector))
|
||||
self.selector.store(Arc::new(selector));
|
||||
build_nanos.store(build_start.elapsed().as_nanos() as u64, Relaxed);
|
||||
})
|
||||
.await
|
||||
.await?;
|
||||
|
||||
let total = total_start.elapsed();
|
||||
let build = Duration::from_nanos(build_nanos.load(Relaxed));
|
||||
|
||||
self.last_update_timing.store(Arc::new(Some(UpdateTimings {
|
||||
discovery_duration: total.saturating_sub(build),
|
||||
build_duration: build,
|
||||
})));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return the first healthy [Backend] according to the selection algorithm and the
|
||||
@@ -442,6 +476,13 @@ where
|
||||
pub fn backends(&self) -> &Backends {
|
||||
&self.backends
|
||||
}
|
||||
|
||||
/// Return the timing information from the most recent successful [`update`](Self::update) call.
|
||||
///
|
||||
/// Returns `None` if [`update`](Self::update) has never completed successfully.
|
||||
pub fn last_update_timing(&self) -> Option<UpdateTimings> {
|
||||
**self.last_update_timing.load()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -602,6 +643,39 @@ mod test {
|
||||
assert!(!backends.ready(&bad));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lb_update_stores_timing() {
|
||||
let discovery = discovery::Static::default();
|
||||
let b1 = Backend::new("1.1.1.1:80").unwrap();
|
||||
let b2 = Backend::new("1.0.0.1:80").unwrap();
|
||||
discovery.add(b1.clone());
|
||||
discovery.add(b2.clone());
|
||||
|
||||
let lb = LoadBalancer::<selection::RoundRobin>::from_backends(Backends::new(Box::new(
|
||||
discovery,
|
||||
)));
|
||||
|
||||
// Before first update, timing should be None
|
||||
assert!(lb.last_update_timing().is_none());
|
||||
|
||||
lb.update().await.unwrap();
|
||||
|
||||
// After update, timing should be populated
|
||||
let timing = lb
|
||||
.last_update_timing()
|
||||
.expect("timing should be Some after update");
|
||||
assert!(timing.discovery_duration > Duration::ZERO);
|
||||
assert!(timing.build_duration > Duration::ZERO);
|
||||
|
||||
// Backends should be populated
|
||||
let backend = lb.backends().get_backend();
|
||||
assert!(backend.contains(&b1));
|
||||
assert!(backend.contains(&b2));
|
||||
|
||||
// Selection should work
|
||||
assert!(lb.select(b"test", 10).is_some());
|
||||
}
|
||||
|
||||
mod thread_safety {
|
||||
use super::*;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user