aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/stats_task.rs205
1 files changed, 170 insertions, 35 deletions
diff --git a/src/stats_task.rs b/src/stats_task.rs
index 47e93e2..f3144a7 100644
--- a/src/stats_task.rs
+++ b/src/stats_task.rs
@@ -1,11 +1,13 @@
-use std::mem::MaybeUninit;
-use bollard::container::{BlkioStatsEntry, MemoryStatsStats, MemoryStatsStatsV1, StatsOptions};
+use std::borrow::Cow;
+use std::collections::HashMap;
+use bollard::container::{BlkioStatsEntry, MemoryStatsStats, MemoryStatsStatsV1, Stats, StatsOptions};
use bollard::models::ContainerSummary;
use bollard::Docker;
use opentelemetry::metrics::MeterProvider;
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::SdkMeterProvider;
-use std::sync::Arc;
+use std::mem::MaybeUninit;
+use std::sync::{Arc, LazyLock, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;
@@ -45,17 +47,22 @@ pub fn launch_stats_task(
loop {
match stats_stream.next().await {
None => return,
- Some(Ok(st)) => { first_read = MaybeUninit::new(st); break },
+ Some(Ok(st)) => {
+ first_read = MaybeUninit::new(st);
+ break;
+ },
Some(Err(err)) => {
// TODO: use json logging or syslog so loki can understand this lol
println!("Failed to get stats for container {container_id}!: {err:?}");
- }
+ },
}
}
// I'm going to rust jail!
let first_read = unsafe { first_read.assume_init() };
- let mut last_io_stats = first_read.blkio_stats.io_service_bytes_recursive;
+ let Stats { blkio_stats, networks: mut last_net_stats, .. } = first_read;
+
+ let mut last_io_stats = blkio_stats.io_service_bytes_recursive;
// container labels shared for all metrics
let mut shared_labels = vec![
@@ -134,13 +141,53 @@ pub fn launch_stats_task(
.with_description("Last time this container was seen by ContainerSpy")
.build();
+ // memory stats go here
+
+ let meter_container_network_receive_bytes_total = meter
+ .u64_counter("container_network_receive_bytes_total")
+ .with_unit("By")
+ .with_description("Cumulative count of bytes received")
+ .build();
+ #[cfg(not(windows))]
+ let meter_container_network_receive_errors_total = meter
+ .u64_counter("container_network_receive_errors_total")
+ .with_description("Cumulative count of errors encountered while receiving")
+ .build();
+ let meter_container_network_receive_packets_dropped_total = meter
+ .u64_counter("container_network_receive_packets_dropped_total")
+ .with_description("Cumulative count of packets dropped while receiving")
+ .build();
+ let meter_container_network_receive_packets_total = meter
+ .u64_counter("container_network_receive_packets_total")
+ .with_description("Cumulative count of packets received")
+ .build();
+
+ let meter_container_network_transmit_bytes_total = meter
+ .u64_counter("container_network_transmit_bytes_total")
+ .with_unit("By")
+ .with_description("Cumulative count of bytes transmitted")
+ .build();
+ #[cfg(not(windows))]
+ let meter_container_network_transmit_errors_total = meter
+ .u64_counter("container_network_transmit_errors_total")
+ .with_description("Cumulative count of errors encountered while transmitting")
+ .build();
+ let meter_container_network_transmit_packets_dropped_total = meter
+ .u64_counter("container_network_transmit_packets_dropped_total")
+ .with_description("Cumulative count of packets dropped while transmitting")
+ .build();
+ let meter_container_network_transmit_packets_total = meter
+ .u64_counter("container_network_transmit_packets_total")
+ .with_description("Cumulative count of packets transmitted")
+ .build();
+
while let Some(val) = stats_stream.next().await {
if let Ok(stats) = val {
-
// when a container exits, instead of a None we get sent Ok()s with zeroes in it forever, horror
if stats.cpu_stats.cpu_usage.total_usage == 0 {
- if stats.precpu_stats.cpu_usage.total_usage != 0 { break; }
- else {
+ if stats.precpu_stats.cpu_usage.total_usage != 0 {
+ break;
+ } else {
// last time was ALSO a zero, so this MIGHT actually be (SOMEHOW?) legit,
// so just loop around again, and wait for the main task to abort() this worker task instead!
// which it will if this container died, or if we are gonna get real stats later, it won't...
@@ -188,8 +235,11 @@ pub fn launch_stats_task(
);
meter_container_cpu_cfs_throttled_seconds_total.add(
- cpu_delta_from_docker(stats.cpu_stats.throttling_data.throttled_time,
- stats.precpu_stats.throttling_data.throttled_time).as_secs_f64(),
+ cpu_delta_from_docker(
+ stats.cpu_stats.throttling_data.throttled_time,
+ stats.precpu_stats.throttling_data.throttled_time,
+ )
+ .as_secs_f64(),
shared_labels,
);
@@ -199,7 +249,6 @@ pub fn launch_stats_task(
if let Some(service_bytes_rec) = stats.blkio_stats.io_service_bytes_recursive {
// need to calculate deltas for this
if let Some(last) = &last_io_stats {
-
let (last_r, last_w) = get_rw_totals(last);
let (curr_r, curr_w) = get_rw_totals(&service_bytes_rec);
@@ -210,7 +259,13 @@ pub fn launch_stats_task(
last_io_stats = Some(service_bytes_rec);
}
- meter_container_last_seen.record(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), shared_labels);
+ meter_container_last_seen.record(
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_secs(),
+ shared_labels,
+ );
// cgroups values references:
// - https://github.com/docker/cli/blob/91cbde67/cli/command/container/stats_helpers.go#L230-L231
@@ -218,39 +273,90 @@ pub fn launch_stats_task(
// - https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html
if let Some(all_usage) = stats.memory_stats.usage {
- if cfg!(windows) {
- // todo
- // i have no way to test cgroups v2 so only work on v1 - see readme for more info
- } else if let Some(MemoryStatsStats::V2(v2stats)) = stats.memory_stats.stats {
- // container_memory_cache
+ if cfg!(windows) {
+ // todo
+ // i have no way to test cgroups v2 so only work on v1 - see readme for more info
+ } else if let Some(MemoryStatsStats::V2(v2stats)) = stats.memory_stats.stats {
+ // container_memory_cache
+ // container_memory_failcnt only on cgroups v1
- // container_memory_failcnt only on cgroups v1
+ // container_memory_failures_total
+ v2stats.pgfault; // label failure_type=pgfault
+ v2stats.pgmajfault; // label failure_type=pgmajfault
- // container_memory_failures_total
- v2stats.pgfault; // label failure_type=pgfault
- v2stats.pgmajfault; // label failure_type=pgmajfault
+ // container_memory_mapped_file
+ v2stats.file; // includes tmpfs
- // container_memory_mapped_file
- v2stats.file; // includes tmpfs
+ // container_memory_max_usage_bytes only on cgroups v1
- // container_memory_max_usage_bytes only on cgroups v1
+ // container_memory_migrate
- // container_memory_migrate
+ // container_memory_numa_pages omitted cause its hard :<
+ // container_memory_rss: may need recalcing
- // container_memory_numa_pages omitted cause its hard :<
+ // container_memory_swap: can't get
- // container_memory_rss: may need recalcing
+ // container_memory_usage_bytes: how?
- // container_memory_swap: can't get
+ // container_memory_working_set_bytes: not reported
+ }
+ }
- // container_memory_usage_bytes: how?
-
- // container_memory_working_set_bytes: not reported
+ // networking
+ // TODO: what is stats.network? is it populated on windows?
+ if let Some(net) = &stats.networks {
+ if let Some(last_net_stats) = &last_net_stats {
+ for (interface, this_inter) in net {
+ // try to get last
+ if let Some(last_this_inter) = last_net_stats.get(interface) {
+ // net labels
+ let mut net_labels = Vec::with_capacity(shared_labels.len() + 1);
+ net_labels.extend_from_slice(shared_labels);
+ net_labels.push(KeyValue::new("interface", interface.clone()));
+ let net_labels = &net_labels.into_boxed_slice()[..];
+
+ meter_container_network_receive_bytes_total.add(
+ this_inter.rx_bytes - last_this_inter.rx_bytes,
+ net_labels
+ );
+ meter_container_network_transmit_bytes_total.add(
+ this_inter.tx_bytes - last_this_inter.tx_bytes,
+ net_labels
+ );
+ #[cfg(not(windows))]
+ meter_container_network_receive_errors_total.add(
+ this_inter.rx_errors - last_this_inter.rx_errors,
+ net_labels
+ );
+ #[cfg(not(windows))]
+ meter_container_network_transmit_errors_total.add(
+ this_inter.tx_errors - last_this_inter.tx_errors,
+ net_labels
+ );
+ meter_container_network_receive_packets_dropped_total.add(
+ this_inter.rx_dropped - last_this_inter.rx_dropped,
+ net_labels
+ );
+ meter_container_network_transmit_packets_dropped_total.add(
+ this_inter.tx_dropped - last_this_inter.tx_dropped,
+ net_labels
+ );
+ meter_container_network_receive_packets_total.add(
+ this_inter.rx_packets - last_this_inter.rx_packets,
+ net_labels
+ );
+ meter_container_network_transmit_packets_total.add(
+ this_inter.tx_packets - last_this_inter.tx_packets,
+ net_labels
+ );
+ }
}
}
- } else {
+ }
+ last_net_stats = stats.networks;
+ } else {
// failed to get stats, log as such:
// TODO: use json logging or syslog so loki can understand this lol
println!(
@@ -280,9 +386,38 @@ fn get_rw_totals<'a>(iter: impl IntoIterator<Item = &'a BlkioStatsEntry>) -> (u6
match entry.op.as_str() {
"read" => read += entry.value,
"write" => write += entry.value,
- _ => println!("Unknown service_bytes_recursive entry type {}", entry.op)
+ _ => println!("Unknown service_bytes_recursive entry type {}", entry.op),
}
}
(read, write)
-} \ No newline at end of file
+}
+
+
+// LMAO i built this entire string pool around the idea of needing &'static str but turns out i can just use owned strings
+// guuuh okay whatever that's fine i guess, i'll keep this around just in case i need it -- sink
+
+/*
+// labels have to have 'static values so i have to make a string pool or i'll leak ram, eugh
+// technically this does mean each possible kv combo is never dropped, but we only have one copy in ram at all times
+// Arc would also work, but that would require a lot of refcounting for a count we know will NEVER hit zero
+// so just use a Cow that borrows a leaked box instead.
+// I checked, OtelString can either be owned (from String), borrowed (from Cow<'static, str>), or refcounted (Arc<str>).
+
+static LABEL_POOL: LazyLock<RwLock<HashMap<(Cow<str>, Cow<str>), KeyValue>>> = LazyLock::new(|| RwLock::new(HashMap::new()));
+fn pool_kv(key: &str, val: &str) -> KeyValue {
+ let leaked_k = &*Box::leak(key.to_string().into_boxed_str());
+ let leaked_v = &*Box::leak(val.to_string().into_boxed_str());
+
+ let cows = (Cow::from(leaked_k), Cow::from(leaked_v));
+
+ if let Some(kv) = LABEL_POOL.read().unwrap().get(&cows) {
+ // this should borrow the same value thanks to OtelString::Borrowed :)
+ kv.clone()
+ } else {
+ // we know upfront that the cow is borrowed, so just clone it
+ let kv = KeyValue::new(cows.0.clone(), cows.1.clone());
+ LABEL_POOL.write().unwrap().insert(cows, kv.clone());
+ kv
+ }
+}*/ \ No newline at end of file