aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHazel Atkinson <yellowsink@riseup.net>2025-04-05 12:57:51 +0100
committerHazel Atkinson <yellowsink@riseup.net>2025-04-05 12:57:51 +0100
commita9b39124dd281a6c8c0606a600f83fe11a229fa6 (patch)
treeca6a4c7926b458856577c4b952ba369c0d674421 /src
parent01e8739edec3979a32c94df3eced77b04db83c26 (diff)
downloadcontainerspy-a9b39124dd281a6c8c0606a600f83fe11a229fa6.tar.gz
containerspy-a9b39124dd281a6c8c0606a600f83fe11a229fa6.tar.bz2
containerspy-a9b39124dd281a6c8c0606a600f83fe11a229fa6.zip
start on real statsu collection
Diffstat (limited to 'src')
-rw-r--r--src/main.rs126
1 files changed, 100 insertions, 26 deletions
diff --git a/src/main.rs b/src/main.rs
index 9763138..3100cb4 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,31 +1,18 @@
-use std::time::Duration;
+use std::{collections::BTreeMap, time::Duration};
use anyhow::Result;
-use bollard::Docker;
+use bollard::{container::StatsOptions, Docker};
use config::CONFIG;
-use opentelemetry::{metrics::MeterProvider, KeyValue};
use opentelemetry_otlp::{MetricExporter, Protocol, WithExportConfig};
-use opentelemetry_sdk::{metrics::SdkMeterProvider, resource::{ResourceDetector, SdkProvidedResourceDetector}, Resource};
+use opentelemetry_sdk::metrics::SdkMeterProvider;
+use opentelemetry::metrics::MeterProvider;
+use tokio::task::JoinHandle;
+use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
mod config;
-#[tokio::main(flavor = "current_thread")]
-async fn main() -> Result<()> {
- // open a docker connection
- let docker =
- if let Some(path) = &CONFIG.docker_socket {
- Docker::connect_with_socket(path, 60, bollard::API_DEFAULT_VERSION)?
- }
- else {
- Docker::connect_with_local_defaults()?
- };
-
- let info = docker.info().await?;
-
- println!("Connected to Docker Daemon version {:?}", info.server_version);
-
- // connect the OTLP exporter
+fn setup_otlp() -> Result<SdkMeterProvider> {
let metric_exporter =
match CONFIG.otlp_protocol {
Protocol::HttpBinary | Protocol::HttpJson => {
@@ -54,9 +41,24 @@ async fn main() -> Result<()> {
},
};
- let meter_provider = SdkMeterProvider::builder()
- .with_periodic_exporter(metric_exporter)
- .build();
+ Ok(SdkMeterProvider::builder()
+ .with_periodic_exporter(metric_exporter)
+ .build())
+}
+
+#[tokio::main(flavor = "current_thread")]
+async fn main() -> Result<()> {
+ // open a docker connection
+ let docker =
+ if let Some(path) = &CONFIG.docker_socket {
+ Docker::connect_with_socket(path, 60, bollard::API_DEFAULT_VERSION)?
+ }
+ else {
+ Docker::connect_with_local_defaults()?
+ };
+
+ // connect the OTLP exporter
+ let meter_provider = setup_otlp()?;
// fetch-report loop with graceful shutdown
let shutdown_token = CancellationToken::new();
@@ -67,17 +69,89 @@ async fn main() -> Result<()> {
st2.cancel();
});
- let mut interval = tokio::time::interval(Duration::from_secs(1));
+ let mut container_search_interval = tokio::time::interval(Duration::from_secs(1));
+
+ let mut tasks: BTreeMap<String, JoinHandle<()>> = BTreeMap::new();
loop {
tokio::select! {
- _ = interval.tick() => {}
+ _ = container_search_interval.tick() => {}
_ = shutdown_token.cancelled() => { break }
}
let list_res = docker.list_containers::<String>(None).await?;
- println!("{list_res:?}");
+
+ let container_ids: Vec<_> = list_res.into_iter().filter_map(|c| c.id).collect();
+ container_ids.sort();
+
+ let mut to_remove = Vec::new();
+
+ for (cont, handle) in &tasks {
+ // funny O(n^2) loop
+ if container_ids.binary_search(cont).is_err() {
+ handle.abort();
+ to_remove.push(cont);
+ }
+ }
+
+ for cont in to_remove {
+ tasks.remove(cont);
+ }
+
+ // now, add any new ones
+ for cont in &container_ids {
+ if !tasks.contains_key(cont) {
+ tasks.insert(cont.clone(), launch_stats_task());
+ }
+ }
}
+ /* let list_res = docker.list_containers::<String>(None).await?;
+
+ let cont_name = list_res[0].id.as_ref().unwrap().as_str();
+
+ // df takes a moment so also select on it
+ let mut df =
+ /* tokio::select! {
+ df = docker.df() => { df }
+ _ = shutdown_token.cancelled() => { break }
+ }; */
+ docker.stats(cont_name/* .trim_start_matches("/") */, Some(StatsOptions {
+ stream: true,
+ one_shot: false
+ }));
+
+ // drop the first one
+ df.next().await;
+
+ while let Some(v) = df.next().await {
+ let v = v?;
+ println!("{v:?}");
+ }
+ */
Ok(())
}
+
+fn launch_stats_task<'a>(container_id: &str, docker: &Docker, meter_provider: &impl MeterProvider) -> JoinHandle<()> {
+ tokio::spawn(async move {
+ let mut stats_stream =
+ docker.stats(container_id, Some(StatsOptions {
+ stream: true,
+ one_shot: false
+ }));
+
+ // drop the first one
+ stats_stream.next().await;
+
+ while let Some(val) = stats_stream.next().await {
+ if let Ok(stats) = val {
+
+ }
+ else {
+ // failed to get stats, log as such:
+ // TODO: use json logging or syslog so loki can understand this lol
+ println!("Failed to get stats for container {container_id}!: {:?}", val.unwrap_err());
+ }
+ }
+ })
+}