diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/stats_task.rs | 205 |
1 files changed, 170 insertions, 35 deletions
diff --git a/src/stats_task.rs b/src/stats_task.rs index 47e93e2..f3144a7 100644 --- a/src/stats_task.rs +++ b/src/stats_task.rs @@ -1,11 +1,13 @@ -use std::mem::MaybeUninit; -use bollard::container::{BlkioStatsEntry, MemoryStatsStats, MemoryStatsStatsV1, StatsOptions}; +use std::borrow::Cow; +use std::collections::HashMap; +use bollard::container::{BlkioStatsEntry, MemoryStatsStats, MemoryStatsStatsV1, Stats, StatsOptions}; use bollard::models::ContainerSummary; use bollard::Docker; use opentelemetry::metrics::MeterProvider; use opentelemetry::KeyValue; use opentelemetry_sdk::metrics::SdkMeterProvider; -use std::sync::Arc; +use std::mem::MaybeUninit; +use std::sync::{Arc, LazyLock, RwLock}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::task::JoinHandle; use tokio_stream::StreamExt; @@ -45,17 +47,22 @@ pub fn launch_stats_task( loop { match stats_stream.next().await { None => return, - Some(Ok(st)) => { first_read = MaybeUninit::new(st); break }, + Some(Ok(st)) => { + first_read = MaybeUninit::new(st); + break; + }, Some(Err(err)) => { // TODO: use json logging or syslog so loki can understand this lol println!("Failed to get stats for container {container_id}!: {err:?}"); - } + }, } } // I'm going to rust jail! let first_read = unsafe { first_read.assume_init() }; - let mut last_io_stats = first_read.blkio_stats.io_service_bytes_recursive; + let Stats { blkio_stats, networks: mut last_net_stats, .. } = first_read; + + let mut last_io_stats = blkio_stats.io_service_bytes_recursive; // container labels shared for all metrics let mut shared_labels = vec![ @@ -134,13 +141,53 @@ pub fn launch_stats_task( .with_description("Last time this container was seen by ContainerSpy") .build(); + // memory stats go here + + let meter_container_network_receive_bytes_total = meter + .u64_counter("container_network_receive_bytes_total") + .with_unit("By") + .with_description("Cumulative count of bytes received") + .build(); + #[cfg(not(windows))] + let meter_container_network_receive_errors_total = meter + .u64_counter("container_network_receive_errors_total") + .with_description("Cumulative count of errors encountered while receiving") + .build(); + let meter_container_network_receive_packets_dropped_total = meter + .u64_counter("container_network_receive_packets_dropped_total") + .with_description("Cumulative count of packets dropped while receiving") + .build(); + let meter_container_network_receive_packets_total = meter + .u64_counter("container_network_receive_packets_total") + .with_description("Cumulative count of packets received") + .build(); + + let meter_container_network_transmit_bytes_total = meter + .u64_counter("container_network_transmit_bytes_total") + .with_unit("By") + .with_description("Cumulative count of bytes transmitted") + .build(); + #[cfg(not(windows))] + let meter_container_network_transmit_errors_total = meter + .u64_counter("container_network_transmit_errors_total") + .with_description("Cumulative count of errors encountered while transmitting") + .build(); + let meter_container_network_transmit_packets_dropped_total = meter + .u64_counter("container_network_transmit_packets_dropped_total") + .with_description("Cumulative count of packets dropped while transmitting") + .build(); + let meter_container_network_transmit_packets_total = meter + .u64_counter("container_network_transmit_packets_total") + .with_description("Cumulative count of packets transmitted") + .build(); + while let Some(val) = stats_stream.next().await { if let Ok(stats) = val { - // when a container exits, instead of a None we get sent Ok()s with zeroes in it forever, horror if stats.cpu_stats.cpu_usage.total_usage == 0 { - if stats.precpu_stats.cpu_usage.total_usage != 0 { break; } - else { + if stats.precpu_stats.cpu_usage.total_usage != 0 { + break; + } else { // last time was ALSO a zero, so this MIGHT actually be (SOMEHOW?) legit, // so just loop around again, and wait for the main task to abort() this worker task instead! // which it will if this container died, or if we are gonna get real stats later, it won't... @@ -188,8 +235,11 @@ pub fn launch_stats_task( ); meter_container_cpu_cfs_throttled_seconds_total.add( - cpu_delta_from_docker(stats.cpu_stats.throttling_data.throttled_time, - stats.precpu_stats.throttling_data.throttled_time).as_secs_f64(), + cpu_delta_from_docker( + stats.cpu_stats.throttling_data.throttled_time, + stats.precpu_stats.throttling_data.throttled_time, + ) + .as_secs_f64(), shared_labels, ); @@ -199,7 +249,6 @@ pub fn launch_stats_task( if let Some(service_bytes_rec) = stats.blkio_stats.io_service_bytes_recursive { // need to calculate deltas for this if let Some(last) = &last_io_stats { - let (last_r, last_w) = get_rw_totals(last); let (curr_r, curr_w) = get_rw_totals(&service_bytes_rec); @@ -210,7 +259,13 @@ pub fn launch_stats_task( last_io_stats = Some(service_bytes_rec); } - meter_container_last_seen.record(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), shared_labels); + meter_container_last_seen.record( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(), + shared_labels, + ); // cgroups values references: // - https://github.com/docker/cli/blob/91cbde67/cli/command/container/stats_helpers.go#L230-L231 @@ -218,39 +273,90 @@ pub fn launch_stats_task( // - https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html if let Some(all_usage) = stats.memory_stats.usage { - if cfg!(windows) { - // todo - // i have no way to test cgroups v2 so only work on v1 - see readme for more info - } else if let Some(MemoryStatsStats::V2(v2stats)) = stats.memory_stats.stats { - // container_memory_cache + if cfg!(windows) { + // todo + // i have no way to test cgroups v2 so only work on v1 - see readme for more info + } else if let Some(MemoryStatsStats::V2(v2stats)) = stats.memory_stats.stats { + // container_memory_cache + // container_memory_failcnt only on cgroups v1 - // container_memory_failcnt only on cgroups v1 + // container_memory_failures_total + v2stats.pgfault; // label failure_type=pgfault + v2stats.pgmajfault; // label failure_type=pgmajfault - // container_memory_failures_total - v2stats.pgfault; // label failure_type=pgfault - v2stats.pgmajfault; // label failure_type=pgmajfault + // container_memory_mapped_file + v2stats.file; // includes tmpfs - // container_memory_mapped_file - v2stats.file; // includes tmpfs + // container_memory_max_usage_bytes only on cgroups v1 - // container_memory_max_usage_bytes only on cgroups v1 + // container_memory_migrate - // container_memory_migrate + // container_memory_numa_pages omitted cause its hard :< + // container_memory_rss: may need recalcing - // container_memory_numa_pages omitted cause its hard :< + // container_memory_swap: can't get - // container_memory_rss: may need recalcing + // container_memory_usage_bytes: how? - // container_memory_swap: can't get + // container_memory_working_set_bytes: not reported + } + } - // container_memory_usage_bytes: how? - - // container_memory_working_set_bytes: not reported + // networking + // TODO: what is stats.network? is it populated on windows? + if let Some(net) = &stats.networks { + if let Some(last_net_stats) = &last_net_stats { + for (interface, this_inter) in net { + // try to get last + if let Some(last_this_inter) = last_net_stats.get(interface) { + // net labels + let mut net_labels = Vec::with_capacity(shared_labels.len() + 1); + net_labels.extend_from_slice(shared_labels); + net_labels.push(KeyValue::new("interface", interface.clone())); + let net_labels = &net_labels.into_boxed_slice()[..]; + + meter_container_network_receive_bytes_total.add( + this_inter.rx_bytes - last_this_inter.rx_bytes, + net_labels + ); + meter_container_network_transmit_bytes_total.add( + this_inter.tx_bytes - last_this_inter.tx_bytes, + net_labels + ); + #[cfg(not(windows))] + meter_container_network_receive_errors_total.add( + this_inter.rx_errors - last_this_inter.rx_errors, + net_labels + ); + #[cfg(not(windows))] + meter_container_network_transmit_errors_total.add( + this_inter.tx_errors - last_this_inter.tx_errors, + net_labels + ); + meter_container_network_receive_packets_dropped_total.add( + this_inter.rx_dropped - last_this_inter.rx_dropped, + net_labels + ); + meter_container_network_transmit_packets_dropped_total.add( + this_inter.tx_dropped - last_this_inter.tx_dropped, + net_labels + ); + meter_container_network_receive_packets_total.add( + this_inter.rx_packets - last_this_inter.rx_packets, + net_labels + ); + meter_container_network_transmit_packets_total.add( + this_inter.tx_packets - last_this_inter.tx_packets, + net_labels + ); + } } } - } else { + } + last_net_stats = stats.networks; + } else { // failed to get stats, log as such: // TODO: use json logging or syslog so loki can understand this lol println!( @@ -280,9 +386,38 @@ fn get_rw_totals<'a>(iter: impl IntoIterator<Item = &'a BlkioStatsEntry>) -> (u6 match entry.op.as_str() { "read" => read += entry.value, "write" => write += entry.value, - _ => println!("Unknown service_bytes_recursive entry type {}", entry.op) + _ => println!("Unknown service_bytes_recursive entry type {}", entry.op), } } (read, write) -}
\ No newline at end of file +} + + +// LMAO i built this entire string pool around the idea of needing &'static str but turns out i can just use owned strings +// guuuh okay whatever that's fine i guess, i'll keep this around just in case i need it -- sink + +/* +// labels have to have 'static values so i have to make a string pool or i'll leak ram, eugh +// technically this does mean each possible kv combo is never dropped, but we only have one copy in ram at all times +// Arc would also work, but that would require a lot of refcounting for a count we know will NEVER hit zero +// so just use a Cow that borrows a leaked box instead. +// I checked, OtelString can either be owned (from String), borrowed (from Cow<'static, str>), or refcounted (Arc<str>). + +static LABEL_POOL: LazyLock<RwLock<HashMap<(Cow<str>, Cow<str>), KeyValue>>> = LazyLock::new(|| RwLock::new(HashMap::new())); +fn pool_kv(key: &str, val: &str) -> KeyValue { + let leaked_k = &*Box::leak(key.to_string().into_boxed_str()); + let leaked_v = &*Box::leak(val.to_string().into_boxed_str()); + + let cows = (Cow::from(leaked_k), Cow::from(leaked_v)); + + if let Some(kv) = LABEL_POOL.read().unwrap().get(&cows) { + // this should borrow the same value thanks to OtelString::Borrowed :) + kv.clone() + } else { + // we know upfront that the cow is borrowed, so just clone it + let kv = KeyValue::new(cows.0.clone(), cows.1.clone()); + LABEL_POOL.write().unwrap().insert(cows, kv.clone()); + kv + } +}*/
\ No newline at end of file |
