aboutsummaryrefslogtreecommitdiff
path: root/src/stats_task.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stats_task.rs')
-rw-r--r--src/stats_task.rs214
1 files changed, 214 insertions, 0 deletions
diff --git a/src/stats_task.rs b/src/stats_task.rs
new file mode 100644
index 0000000..464793a
--- /dev/null
+++ b/src/stats_task.rs
@@ -0,0 +1,214 @@
+use bollard::container::StatsOptions;
+use bollard::models::ContainerSummary;
+use bollard::Docker;
+use opentelemetry::metrics::MeterProvider;
+use opentelemetry::KeyValue;
+use opentelemetry_sdk::metrics::SdkMeterProvider;
+use std::sync::Arc;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use tokio::task::JoinHandle;
+use tokio_stream::StreamExt;
+
+// I do not enjoy taking a bunch of Rcs but tokio needs ownership so fine.
+pub fn launch_stats_task(
+ container: ContainerSummary,
+ docker: Arc<Docker>,
+ meter_provider: Arc<SdkMeterProvider>,
+) -> JoinHandle<()> {
+ tokio::spawn(async move {
+ // extract some container info
+ let container_id = container.id.unwrap();
+ let container_name = container
+ .names
+ .iter()
+ .flatten()
+ .next()
+ .map(|n| n.trim_start_matches("/").to_owned());
+
+ let meter_name = "cspy_".to_string() + container_id.as_str();
+ // lol 'static moment
+ let meter_name = &*Box::leak(meter_name.into_boxed_str());
+
+ let mut stats_stream = docker.stats(
+ container_id.as_str(),
+ Some(StatsOptions {
+ stream: true,
+ one_shot: false,
+ }),
+ );
+
+ // drop the first read
+ loop {
+ match stats_stream.next().await {
+ None => return,
+ Some(Ok(_)) => break,
+ Some(Err(err)) => {
+ // TODO: use json logging or syslog so loki can understand this lol
+ println!("Failed to get stats for container {container_id}!: {err:?}");
+ }
+ }
+ }
+
+ // container labels shared for all metrics
+ let mut shared_labels = vec![
+ KeyValue::new("id", container_id.to_owned()),
+ KeyValue::new(
+ "image",
+ container.image.unwrap_or(container.image_id.unwrap()),
+ ),
+ ];
+
+ if let Some(name) = container_name {
+ shared_labels.push(KeyValue::new("name", name));
+ }
+
+ if let Some(docker_labels) = &container.labels {
+ for (key, value) in docker_labels {
+ shared_labels.push(KeyValue::new(
+ "container_label_".to_string() + key,
+ value.clone(),
+ ))
+ }
+ }
+
+ // free space and make mutable
+ shared_labels.shrink_to_fit();
+ let shared_labels = &shared_labels[..];
+
+ //println!("Starting reporting for container: {shared_labels:?}");
+
+ // create meters
+ let meter = meter_provider.meter(meter_name);
+
+ let meter_container_cpu_usage_seconds_total = meter
+ .f64_counter("container_cpu_usage_seconds_total")
+ .with_unit("s")
+ .with_description("Cumulative cpu time consumed")
+ .build();
+ let meter_container_cpu_user_seconds_total = meter
+ .f64_counter("container_cpu_user_seconds_total")
+ .with_unit("s")
+ .with_description("Cumulative userland cpu time consumed")
+ .build();
+ let meter_container_cpu_system_seconds_total = meter
+ .f64_counter("container_cpu_system_seconds_total")
+ .with_unit("s")
+ .with_description("Cumulative kernel cpu time consumed")
+ .build();
+
+ let meter_container_cpu_cfs_periods_total = meter
+ .u64_counter("container_cpu_cfs_periods_total")
+ .with_description("Number of elapsed enforcement period intervals")
+ .build();
+ let meter_container_cpu_cfs_throttled_periods_total = meter
+ .u64_counter("container_cpu_cfs_throttled_periods_total")
+ .with_description("Number of throttled period intervals")
+ .build();
+ let meter_container_cpu_cfs_throttled_seconds_total = meter
+ .f64_counter("container_cpu_cfs_throttled_seconds_total")
+ .with_unit("s")
+ .with_description("Total time duration the container has been throttled")
+ .build();
+
+ let meter_container_fs_reads_bytes_total = meter
+ .u64_counter("container_fs_reads_bytes_total")
+ .with_unit("B")
+ .with_description("Cumulative bytes read")
+ .build();
+ let meter_container_fs_writes_bytes_total = meter
+ .u64_counter("container_fs_writes_bytes_total")
+ .with_unit("B")
+ .with_description("Cumulative bytes written")
+ .build();
+
+ let meter_container_last_seen = meter
+ .u64_gauge("container_last_seen")
+ .with_unit("s")
+ .with_description("Last time this container was seen by ContainerSpy")
+ .build();
+
+ while let Some(val) = stats_stream.next().await {
+ if let Ok(stats) = val {
+ meter_container_cpu_usage_seconds_total.add(
+ cpu_delta_from_docker(
+ stats.cpu_stats.cpu_usage.total_usage,
+ stats.precpu_stats.cpu_usage.total_usage,
+ )
+ .as_secs_f64(),
+ shared_labels,
+ );
+
+ meter_container_cpu_user_seconds_total.add(
+ cpu_delta_from_docker(
+ stats.cpu_stats.cpu_usage.usage_in_usermode,
+ stats.precpu_stats.cpu_usage.usage_in_usermode,
+ )
+ .as_secs_f64(),
+ shared_labels,
+ );
+
+ meter_container_cpu_system_seconds_total.add(
+ cpu_delta_from_docker(
+ stats.cpu_stats.cpu_usage.usage_in_kernelmode,
+ stats.precpu_stats.cpu_usage.usage_in_kernelmode,
+ )
+ .as_secs_f64(),
+ shared_labels,
+ );
+
+ meter_container_cpu_cfs_periods_total.add(
+ stats.cpu_stats.throttling_data.periods - stats.precpu_stats.throttling_data.periods,
+ shared_labels,
+ );
+
+ meter_container_cpu_cfs_throttled_periods_total.add(
+ stats.cpu_stats.throttling_data.throttled_periods
+ - stats.precpu_stats.throttling_data.throttled_periods,
+ shared_labels,
+ );
+
+ meter_container_cpu_cfs_throttled_seconds_total.add(
+ cpu_delta_from_docker(stats.cpu_stats.throttling_data.throttled_time,
+ stats.precpu_stats.throttling_data.throttled_time).as_secs_f64(),
+ shared_labels,
+ );
+
+ // other blkio_stats values only exist on cgroups v1 so don't bother.
+ // io_service_bytes_recursive exists only on cgroups v1.
+ // storage_stats only exists on windows.
+ if let Some(service_bytes_rec) = stats.blkio_stats.io_service_bytes_recursive {
+ println!("{service_bytes_rec:?}"); // todo: remove
+
+ for entry in &service_bytes_rec {
+ match entry.op.as_str() {
+ "read" =>
+ meter_container_fs_reads_bytes_total.add(entry.value, shared_labels),
+ "write" =>
+ meter_container_fs_writes_bytes_total.add(entry.value, shared_labels),
+ _ => println!("Unknown service_bytes_recursive entry type {}", entry.op)
+ }
+ }
+ }
+
+ meter_container_last_seen.record(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), shared_labels);
+ } else {
+ // failed to get stats, log as such:
+ // TODO: use json logging or syslog so loki can understand this lol
+ println!(
+ "Failed to get stats for container {container_id}!: {:?}",
+ val.unwrap_err()
+ );
+ }
+ }
+ })
+}
+
+fn cpu_delta_from_docker(cpu_usage: u64, precpu_usage: u64) -> Duration {
+ let delta = cpu_usage - precpu_usage;
+
+ // https://docs.docker.com/reference/api/engine/version/v1.48/#tag/Container/operation/ContainerStats
+ // see response schema > cpu_stats > cpu_usage > total_usage
+ let delta_ns = if cfg!(windows) { delta * 100 } else { delta };
+
+ Duration::from_nanos(delta_ns)
+}