aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHazel Atkinson <yellowsink@riseup.net>2025-04-05 16:35:16 +0100
committerHazel Atkinson <yellowsink@riseup.net>2025-04-05 16:35:40 +0100
commit9423b0788e386bc663cda5ddc4ce27c1021e6fef (patch)
tree979efc0d01440c586a1aae797022fecde4daba21 /src
parenta9b39124dd281a6c8c0606a600f83fe11a229fa6 (diff)
downloadcontainerspy-9423b0788e386bc663cda5ddc4ce27c1021e6fef.tar.gz
containerspy-9423b0788e386bc663cda5ddc4ce27c1021e6fef.tar.bz2
containerspy-9423b0788e386bc663cda5ddc4ce27c1021e6fef.zip
working collection for ONE metric but without any labels etc
Diffstat (limited to 'src')
-rw-r--r--src/main.rs88
1 files changed, 46 insertions, 42 deletions
diff --git a/src/main.rs b/src/main.rs
index 3100cb4..1af5cba 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,11 +1,11 @@
-use std::{collections::BTreeMap, time::Duration};
+use std::{collections::BTreeMap, sync::Arc, time::Duration};
use anyhow::Result;
use bollard::{container::StatsOptions, Docker};
use config::CONFIG;
+use opentelemetry::metrics::MeterProvider;
use opentelemetry_otlp::{MetricExporter, Protocol, WithExportConfig};
use opentelemetry_sdk::metrics::SdkMeterProvider;
-use opentelemetry::metrics::MeterProvider;
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
@@ -19,7 +19,6 @@ fn setup_otlp() -> Result<SdkMeterProvider> {
let builder = MetricExporter::builder().with_http().with_protocol(CONFIG.otlp_protocol);
let builder =
if let Some(e) = &CONFIG.otlp_endpoint {
- println!("{e}");
builder.with_endpoint(e)
} else {
builder
@@ -49,16 +48,17 @@ fn setup_otlp() -> Result<SdkMeterProvider> {
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
// open a docker connection
- let docker =
+ let docker = Arc::new(
if let Some(path) = &CONFIG.docker_socket {
Docker::connect_with_socket(path, 60, bollard::API_DEFAULT_VERSION)?
}
else {
Docker::connect_with_local_defaults()?
- };
+ }
+ );
// connect the OTLP exporter
- let meter_provider = setup_otlp()?;
+ let meter_provider = Arc::new(setup_otlp()?);
// fetch-report loop with graceful shutdown
let shutdown_token = CancellationToken::new();
@@ -79,73 +79,77 @@ async fn main() -> Result<()> {
_ = shutdown_token.cancelled() => { break }
}
- let list_res = docker.list_containers::<String>(None).await?;
+ let containers = docker.list_containers::<String>(None).await?;
+ let mut containers: Vec<_> = containers.into_iter().filter(|c| c.id.is_some()).collect();
- let container_ids: Vec<_> = list_res.into_iter().filter_map(|c| c.id).collect();
- container_ids.sort();
+ containers.sort_by(|a, b| a.id.as_ref().unwrap().cmp(b.id.as_ref().unwrap()));
let mut to_remove = Vec::new();
for (cont, handle) in &tasks {
// funny O(n^2) loop
- if container_ids.binary_search(cont).is_err() {
+ if containers.binary_search_by(|c| c.id.as_ref().unwrap().cmp(cont)).is_err() {
handle.abort();
- to_remove.push(cont);
+ to_remove.push(cont.clone());
}
}
- for cont in to_remove {
- tasks.remove(cont);
+ for cont in to_remove.into_iter() {
+ tasks.remove(&cont);
}
// now, add any new ones
- for cont in &container_ids {
- if !tasks.contains_key(cont) {
- tasks.insert(cont.clone(), launch_stats_task());
+ for cont in &containers {
+ let id_string = cont.id.as_ref().unwrap();
+ if !tasks.contains_key(id_string) {
+ // all this string cloning hurts me
+ tasks.insert(id_string.clone(), launch_stats_task(id_string.clone(), docker.clone(), meter_provider.clone()));
}
}
}
- /* let list_res = docker.list_containers::<String>(None).await?;
-
- let cont_name = list_res[0].id.as_ref().unwrap().as_str();
-
- // df takes a moment so also select on it
- let mut df =
- /* tokio::select! {
- df = docker.df() => { df }
- _ = shutdown_token.cancelled() => { break }
- }; */
- docker.stats(cont_name/* .trim_start_matches("/") */, Some(StatsOptions {
- stream: true,
- one_shot: false
- }));
+ // abort all stats tasks
+ for task in tasks.into_values() {
+ task.abort();
+ }
- // drop the first one
- df.next().await;
+ println!("clean shutdown.");
- while let Some(v) = df.next().await {
- let v = v?;
- println!("{v:?}");
- }
- */
Ok(())
}
-fn launch_stats_task<'a>(container_id: &str, docker: &Docker, meter_provider: &impl MeterProvider) -> JoinHandle<()> {
+// I do not enjoy taking a bunch of Rcs but tokio needs ownership so fine.
+fn launch_stats_task(container_id: String, docker: Arc<Docker>, meter_provider: Arc<SdkMeterProvider>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut stats_stream =
- docker.stats(container_id, Some(StatsOptions {
+ docker.stats(container_id.as_str(), Some(StatsOptions {
stream: true,
one_shot: false
}));
- // drop the first one
- stats_stream.next().await;
+ // drop the first read
+ loop {
+ match stats_stream.next().await {
+ None => return,
+ Some(Ok(_)) => break,
+ Some(Err(err)) => {
+ // TODO: use json logging or syslog so loki can understand this lol
+ println!("Failed to get stats for container {container_id}!: {err:?}");
+ }
+ }
+ }
+
+ // container labels
+ let labels = [];
+
+ // create meters
+ let meter_container_cpu_usage_seconds_total = meter_provider.meter("test_meter").f64_counter("container_cpu_usage_seconds_total").with_unit("s").with_description("Cumulative cpu time consumed").build();
while let Some(val) = stats_stream.next().await {
if let Ok(stats) = val {
-
+ meter_container_cpu_usage_seconds_total.add(
+ Duration::from_nanos(stats.cpu_stats.cpu_usage.total_usage - stats.precpu_stats.cpu_usage.total_usage).as_secs_f64(),
+ &labels);
}
else {
// failed to get stats, log as such: