From a9b39124dd281a6c8c0606a600f83fe11a229fa6 Mon Sep 17 00:00:00 2001 From: Hazel Atkinson Date: Sat, 5 Apr 2025 12:57:51 +0100 Subject: start on real statsu collection --- Cargo.lock | 1 + Cargo.toml | 1 + output.txt | 6 +++ src/main.rs | 126 +++++++++++++++++++++++++++++++++++++++++++++++------------- 4 files changed, 108 insertions(+), 26 deletions(-) create mode 100644 output.txt diff --git a/Cargo.lock b/Cargo.lock index d298384..4243f48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -215,6 +215,7 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry_sdk", "tokio", + "tokio-stream", "tokio-util", ] diff --git a/Cargo.toml b/Cargo.toml index bb06a9a..62f4f60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,4 +11,5 @@ opentelemetry = { version = "0.29.1", features = ["metrics"] } opentelemetry-otlp = { version = "0.29.0", features = ["grpc-tonic"] } opentelemetry_sdk = { version = "0.29.0", features = ["metrics"] } tokio = { version = "1.44.1", features = ["macros", "signal"] } +tokio-stream = "0.1.17" tokio-util = "0.7.14" diff --git a/output.txt b/output.txt new file mode 100644 index 0000000..2645cb8 --- /dev/null +++ b/output.txt @@ -0,0 +1,6 @@ +Connected to Docker Daemon version Some("28.0.4") +Some(["/peaceful_dijkstra"]) +Stats { read: "2025-04-05T11:28:22.692535502Z", preread: "0001-01-01T00:00:00Z", num_procs: 0, pids_stats: PidsStats { current: Some(14), limit: Some(16617) }, network: None, networks: Some({"eth0": NetworkStats { rx_dropped: 0, rx_bytes: 73311, rx_errors: 0, tx_packets: 3, tx_dropped: 0, rx_packets: 846, tx_errors: 0, tx_bytes: 126 }}), memory_stats: MemoryStats { stats: Some(V2(MemoryStatsStatsV2 { anon: 22941696, file: 65019904, kernel_stack: 229376, slab: 354288, sock: 0, shmem: 0, file_mapped: 20066304, file_dirty: 0, file_writeback: 0, anon_thp: 20971520, inactive_anon: 921600, active_anon: 22020096, inactive_file: 67497984, active_file: 1331200, unevictable: 0, slab_reclaimable: 118400, slab_unreclaimable: 235888, pgfault: 6977, pgmajfault: 28, workingset_refault: 0, workingset_activate: 0, workingset_nodereclaim: 0, pgrefill: 73, pgscan: 7176, pgsteal: 6921, pgactivate: 82, pgdeactivate: 0, pglazyfree: 0, pglazyfreed: 0, thp_fault_alloc: 1, thp_collapse_alloc: 35 })), max_usage: None, usage: Some(92590080), failcnt: None, limit: Some(14602190848), commit: None, commit_peak: None, commitbytes: None, commitpeakbytes: None, privateworkingset: None }, blkio_stats: BlkioStats { io_service_bytes_recursive: Some([BlkioStatsEntry { major: 259, minor: 0, op: "read", value: 93347840 }, BlkioStatsEntry { major: 259, minor: 0, op: "write", value: 4096 }]), io_serviced_recursive: None, io_queue_recursive: None, io_service_time_recursive: None, io_wait_time_recursive: None, io_merged_recursive: None, io_time_recursive: None, sectors_recursive: None }, cpu_stats: CPUStats { cpu_usage: CPUUsage { percpu_usage: None, usage_in_usermode: 293061000, total_usage: 430277000, usage_in_kernelmode: 137215000 }, system_cpu_usage: Some(105448320000000), online_cpus: Some(8), throttling_data: ThrottlingData { periods: 0, throttled_periods: 0, throttled_time: 0 } }, precpu_stats: CPUStats { cpu_usage: CPUUsage { percpu_usage: None, usage_in_usermode: 0, total_usage: 0, usage_in_kernelmode: 0 }, system_cpu_usage: None, online_cpus: None, throttling_data: ThrottlingData { periods: 0, throttled_periods: 0, throttled_time: 0 } }, storage_stats: StorageStats { read_count_normalized: None, read_size_bytes: None, write_count_normalized: None, write_size_bytes: None }, name: "/peaceful_dijkstra", id: "3379e4311956bf383ee055667820f5c9c4790f274c4cd2608f8e4af2eb5e36c7" } +Stats { read: "2025-04-05T11:28:23.695024439Z", preread: "2025-04-05T11:28:22.692535502Z", num_procs: 0, pids_stats: PidsStats { current: Some(14), limit: Some(16617) }, network: None, networks: Some({"eth0": NetworkStats { rx_dropped: 0, rx_bytes: 73390, rx_errors: 0, tx_packets: 3, tx_dropped: 0, rx_packets: 847, tx_errors: 0, tx_bytes: 126 }}), memory_stats: MemoryStats { stats: Some(V2(MemoryStatsStatsV2 { anon: 22941696, file: 65019904, kernel_stack: 229376, slab: 354288, sock: 0, shmem: 0, file_mapped: 20066304, file_dirty: 0, file_writeback: 0, anon_thp: 20971520, inactive_anon: 921600, active_anon: 22020096, inactive_file: 67497984, active_file: 1331200, unevictable: 0, slab_reclaimable: 118400, slab_unreclaimable: 235888, pgfault: 6977, pgmajfault: 28, workingset_refault: 0, workingset_activate: 0, workingset_nodereclaim: 0, pgrefill: 73, pgscan: 7176, pgsteal: 6921, pgactivate: 82, pgdeactivate: 0, pglazyfree: 0, pglazyfreed: 0, thp_fault_alloc: 1, thp_collapse_alloc: 35 })), max_usage: None, usage: Some(92590080), failcnt: None, limit: Some(14602190848), commit: None, commit_peak: None, commitbytes: None, commitpeakbytes: None, privateworkingset: None }, blkio_stats: BlkioStats { io_service_bytes_recursive: Some([BlkioStatsEntry { major: 259, minor: 0, op: "read", value: 93347840 }, BlkioStatsEntry { major: 259, minor: 0, op: "write", value: 4096 }]), io_serviced_recursive: None, io_queue_recursive: None, io_service_time_recursive: None, io_wait_time_recursive: None, io_merged_recursive: None, io_time_recursive: None, sectors_recursive: None }, cpu_stats: CPUStats { cpu_usage: CPUUsage { percpu_usage: None, usage_in_usermode: 293061000, total_usage: 430277000, usage_in_kernelmode: 137215000 }, system_cpu_usage: Some(105456300000000), online_cpus: Some(8), throttling_data: ThrottlingData { periods: 0, throttled_periods: 0, throttled_time: 0 } }, precpu_stats: CPUStats { cpu_usage: CPUUsage { percpu_usage: None, usage_in_usermode: 293061000, total_usage: 430277000, usage_in_kernelmode: 137215000 }, system_cpu_usage: Some(105448320000000), online_cpus: Some(8), throttling_data: ThrottlingData { periods: 0, throttled_periods: 0, throttled_time: 0 } }, storage_stats: StorageStats { read_count_normalized: None, read_size_bytes: None, write_count_normalized: None, write_size_bytes: None }, name: "/peaceful_dijkstra", id: "3379e4311956bf383ee055667820f5c9c4790f274c4cd2608f8e4af2eb5e36c7" } +Stats { read: "2025-04-05T11:28:24.698495438Z", preread: "2025-04-05T11:28:23.695024439Z", num_procs: 0, pids_stats: PidsStats { current: Some(14), limit: Some(16617) }, network: None, networks: Some({"eth0": NetworkStats { rx_dropped: 0, rx_bytes: 73390, rx_errors: 0, tx_packets: 3, tx_dropped: 0, rx_packets: 847, tx_errors: 0, tx_bytes: 126 }}), memory_stats: MemoryStats { stats: Some(V2(MemoryStatsStatsV2 { anon: 22941696, file: 65019904, kernel_stack: 229376, slab: 354288, sock: 0, shmem: 0, file_mapped: 20066304, file_dirty: 0, file_writeback: 0, anon_thp: 20971520, inactive_anon: 921600, active_anon: 22020096, inactive_file: 67497984, active_file: 1331200, unevictable: 0, slab_reclaimable: 118400, slab_unreclaimable: 235888, pgfault: 6977, pgmajfault: 28, workingset_refault: 0, workingset_activate: 0, workingset_nodereclaim: 0, pgrefill: 73, pgscan: 7176, pgsteal: 6921, pgactivate: 82, pgdeactivate: 0, pglazyfree: 0, pglazyfreed: 0, thp_fault_alloc: 1, thp_collapse_alloc: 35 })), max_usage: None, usage: Some(92590080), failcnt: None, limit: Some(14602190848), commit: None, commit_peak: None, commitbytes: None, commitpeakbytes: None, privateworkingset: None }, blkio_stats: BlkioStats { io_service_bytes_recursive: Some([BlkioStatsEntry { major: 259, minor: 0, op: "read", value: 93347840 }, BlkioStatsEntry { major: 259, minor: 0, op: "write", value: 4096 }]), io_serviced_recursive: None, io_queue_recursive: None, io_service_time_recursive: None, io_wait_time_recursive: None, io_merged_recursive: None, io_time_recursive: None, sectors_recursive: None }, cpu_stats: CPUStats { cpu_usage: CPUUsage { percpu_usage: None, usage_in_usermode: 293061000, total_usage: 430277000, usage_in_kernelmode: 137215000 }, system_cpu_usage: Some(105464360000000), online_cpus: Some(8), throttling_data: ThrottlingData { periods: 0, throttled_periods: 0, throttled_time: 0 } }, precpu_stats: CPUStats { cpu_usage: CPUUsage { percpu_usage: None, usage_in_usermode: 293061000, total_usage: 430277000, usage_in_kernelmode: 137215000 }, system_cpu_usage: Some(105456300000000), online_cpus: Some(8), throttling_data: ThrottlingData { periods: 0, throttled_periods: 0, throttled_time: 0 } }, storage_stats: StorageStats { read_count_normalized: None, read_size_bytes: None, write_count_normalized: None, write_size_bytes: None }, name: "/peaceful_dijkstra", id: "3379e4311956bf383ee055667820f5c9c4790f274c4cd2608f8e4af2eb5e36c7" } +Stats { read: "2025-04-05T11:28:25.700769535Z", preread: "2025-04-05T11:28:24.698495438Z", num_procs: 0, pids_stats: PidsStats { current: Some(14), limit: Some(16617) }, network: None, networks: Some({"eth0": NetworkStats { rx_dropped: 0, rx_bytes: 73469, rx_errors: 0, tx_packets: 3, tx_dropped: 0, rx_packets: 848, tx_errors: 0, tx_bytes: 126 }}), memory_stats: MemoryStats { stats: Some(V2(MemoryStatsStatsV2 { anon: 22941696, file: 65019904, kernel_stack: 229376, slab: 354288, sock: 0, shmem: 0, file_mapped: 20066304, file_dirty: 0, file_writeback: 0, anon_thp: 20971520, inactive_anon: 921600, active_anon: 22020096, inactive_file: 67497984, active_file: 1331200, unevictable: 0, slab_reclaimable: 118400, slab_unreclaimable: 235888, pgfault: 6977, pgmajfault: 28, workingset_refault: 0, workingset_activate: 0, workingset_nodereclaim: 0, pgrefill: 73, pgscan: 7176, pgsteal: 6921, pgactivate: 82, pgdeactivate: 0, pglazyfree: 0, pglazyfreed: 0, thp_fault_alloc: 1, thp_collapse_alloc: 35 })), max_usage: None, usage: Some(92590080), failcnt: None, limit: Some(14602190848), commit: None, commit_peak: None, commitbytes: None, commitpeakbytes: None, privateworkingset: None }, blkio_stats: BlkioStats { io_service_bytes_recursive: Some([BlkioStatsEntry { major: 259, minor: 0, op: "read", value: 93347840 }, BlkioStatsEntry { major: 259, minor: 0, op: "write", value: 4096 }]), io_serviced_recursive: None, io_queue_recursive: None, io_service_time_recursive: None, io_wait_time_recursive: None, io_merged_recursive: None, io_time_recursive: None, sectors_recursive: None }, cpu_stats: CPUStats { cpu_usage: CPUUsage { percpu_usage: None, usage_in_usermode: 293061000, total_usage: 430277000, usage_in_kernelmode: 137215000 }, system_cpu_usage: Some(105472350000000), online_cpus: Some(8), throttling_data: ThrottlingData { periods: 0, throttled_periods: 0, throttled_time: 0 } }, precpu_stats: CPUStats { cpu_usage: CPUUsage { percpu_usage: None, usage_in_usermode: 293061000, total_usage: 430277000, usage_in_kernelmode: 137215000 }, system_cpu_usage: Some(105464360000000), online_cpus: Some(8), throttling_data: ThrottlingData { periods: 0, throttled_periods: 0, throttled_time: 0 } }, storage_stats: StorageStats { read_count_normalized: None, read_size_bytes: None, write_count_normalized: None, write_size_bytes: None }, name: "/peaceful_dijkstra", id: "3379e4311956bf383ee055667820f5c9c4790f274c4cd2608f8e4af2eb5e36c7" } diff --git a/src/main.rs b/src/main.rs index 9763138..3100cb4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,31 +1,18 @@ -use std::time::Duration; +use std::{collections::BTreeMap, time::Duration}; use anyhow::Result; -use bollard::Docker; +use bollard::{container::StatsOptions, Docker}; use config::CONFIG; -use opentelemetry::{metrics::MeterProvider, KeyValue}; use opentelemetry_otlp::{MetricExporter, Protocol, WithExportConfig}; -use opentelemetry_sdk::{metrics::SdkMeterProvider, resource::{ResourceDetector, SdkProvidedResourceDetector}, Resource}; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry::metrics::MeterProvider; +use tokio::task::JoinHandle; +use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; mod config; -#[tokio::main(flavor = "current_thread")] -async fn main() -> Result<()> { - // open a docker connection - let docker = - if let Some(path) = &CONFIG.docker_socket { - Docker::connect_with_socket(path, 60, bollard::API_DEFAULT_VERSION)? - } - else { - Docker::connect_with_local_defaults()? - }; - - let info = docker.info().await?; - - println!("Connected to Docker Daemon version {:?}", info.server_version); - - // connect the OTLP exporter +fn setup_otlp() -> Result { let metric_exporter = match CONFIG.otlp_protocol { Protocol::HttpBinary | Protocol::HttpJson => { @@ -54,9 +41,24 @@ async fn main() -> Result<()> { }, }; - let meter_provider = SdkMeterProvider::builder() - .with_periodic_exporter(metric_exporter) - .build(); + Ok(SdkMeterProvider::builder() + .with_periodic_exporter(metric_exporter) + .build()) +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<()> { + // open a docker connection + let docker = + if let Some(path) = &CONFIG.docker_socket { + Docker::connect_with_socket(path, 60, bollard::API_DEFAULT_VERSION)? + } + else { + Docker::connect_with_local_defaults()? + }; + + // connect the OTLP exporter + let meter_provider = setup_otlp()?; // fetch-report loop with graceful shutdown let shutdown_token = CancellationToken::new(); @@ -67,17 +69,89 @@ async fn main() -> Result<()> { st2.cancel(); }); - let mut interval = tokio::time::interval(Duration::from_secs(1)); + let mut container_search_interval = tokio::time::interval(Duration::from_secs(1)); + + let mut tasks: BTreeMap> = BTreeMap::new(); loop { tokio::select! { - _ = interval.tick() => {} + _ = container_search_interval.tick() => {} _ = shutdown_token.cancelled() => { break } } let list_res = docker.list_containers::(None).await?; - println!("{list_res:?}"); + + let container_ids: Vec<_> = list_res.into_iter().filter_map(|c| c.id).collect(); + container_ids.sort(); + + let mut to_remove = Vec::new(); + + for (cont, handle) in &tasks { + // funny O(n^2) loop + if container_ids.binary_search(cont).is_err() { + handle.abort(); + to_remove.push(cont); + } + } + + for cont in to_remove { + tasks.remove(cont); + } + + // now, add any new ones + for cont in &container_ids { + if !tasks.contains_key(cont) { + tasks.insert(cont.clone(), launch_stats_task()); + } + } } + /* let list_res = docker.list_containers::(None).await?; + + let cont_name = list_res[0].id.as_ref().unwrap().as_str(); + + // df takes a moment so also select on it + let mut df = + /* tokio::select! { + df = docker.df() => { df } + _ = shutdown_token.cancelled() => { break } + }; */ + docker.stats(cont_name/* .trim_start_matches("/") */, Some(StatsOptions { + stream: true, + one_shot: false + })); + + // drop the first one + df.next().await; + + while let Some(v) = df.next().await { + let v = v?; + println!("{v:?}"); + } + */ Ok(()) } + +fn launch_stats_task<'a>(container_id: &str, docker: &Docker, meter_provider: &impl MeterProvider) -> JoinHandle<()> { + tokio::spawn(async move { + let mut stats_stream = + docker.stats(container_id, Some(StatsOptions { + stream: true, + one_shot: false + })); + + // drop the first one + stats_stream.next().await; + + while let Some(val) = stats_stream.next().await { + if let Ok(stats) = val { + + } + else { + // failed to get stats, log as such: + // TODO: use json logging or syslog so loki can understand this lol + println!("Failed to get stats for container {container_id}!: {:?}", val.unwrap_err()); + } + } + }) +} -- cgit