From a81caddd33d015fb84bc3a180698c2757a31e8b8 Mon Sep 17 00:00:00 2001 From: Louis Vallat Date: Sat, 2 Jul 2022 00:06:07 +0200 Subject: [PATCH] Initial commit Signed-off-by: Louis Vallat --- Cargo.toml | 14 +++++ src/main.rs | 145 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 159 insertions(+) create mode 100644 Cargo.toml create mode 100644 src/main.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..9ab65df --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "lb-influx-connector" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +influxdb2 = "0.1.0" +futures = "0.3" +tokio = { version = "1", features = ["full"] } +livebox = "0.9" +env_logger = "0.9.0" +log = "0.4.17" diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..b77b21a --- /dev/null +++ b/src/main.rs @@ -0,0 +1,145 @@ + +use std::time::{Duration, Instant}; + +use futures::prelude::*; +use influxdb2::models::DataPoint; +use livebox::{devices::Device, status::Status, wan::WANConfiguration, client::Client}; +use log::{debug, info, error}; +use tokio::join; + + +#[tokio::main] +async fn main() { + env_logger::init(); + info!("Starting application."); + let host = std::env::var("INFLUXDB_HOST") + .expect("Environment variable 'INFLUXDB_HOST' not set."); + let org = std::env::var("INFLUXDB_ORG") + .expect("Environment variable 'INFLUXDB_ORG' not set."); + let token = std::env::var("INFLUXDB_TOKEN") + .expect("Environment variable 'INFLUXDB_TOKEN' not set."); + let bucket = std::env::var("INFLUXDB_BUCKET") + .expect("Environment variable 'INFLUXDB_BUCKET' not set."); + let router_password = std::env::var("ROUTER_PASSWORD") + .expect("Environment variable 'ROUTER_PASSWORD' not set."); + let hostname = std::env::var("HOSTNAME") + .expect("Environment variable 'HOSTNAME' not set."); + + + let i_client = influxdb2::Client::new(host, org, token); + let mut r_client = Client::new(router_password.as_str()); + + loop { + info!("Loop start."); + let start = Instant::now(); + r_client.login().await; + let (status, wan_config, devices) = join!( + r_client.get_status(), r_client.get_wan_config(), r_client.get_devices()); + r_client.logout().await; + debug!("Requests took {:.2?} to perform.", start.elapsed()); + let now = Instant::now(); + let mut datapoints = vec![]; + + debug!("Transforming objects to datapoints."); + let (mut vec_status, mut vec_wan_config, mut vec_devices) = join!( + status_to_datapoints(&status, &hostname), + wan_config_to_datapoints(&wan_config, &hostname), + devices_to_datapoints(&devices, &hostname) + ); + datapoints.append(&mut vec_status); + datapoints.append(&mut vec_wan_config); + datapoints.append(&mut vec_devices); + debug!("Took {:.2?}.", now.elapsed()); + + debug!("Sending {} datapoints to InfluxDB.", datapoints.len()); + let now = Instant::now(); + i_client.write(bucket.as_str(), stream::iter(datapoints)).await + .unwrap(); + debug!("Datapoints sent. Took {:.2?}.", now.elapsed()); + info!("End of loop. Took {:.2?}. Waiting for 60 seconds.", start.elapsed()); + std::thread::sleep(Duration::from_secs(5)); + } +} + +async fn status_to_datapoints(status: &Status, hostname: &String) -> Vec { + return vec![ + DataPoint::builder("status").tag("hostname", hostname) + .field("uptime", status.up_time as i64).build().unwrap(), + DataPoint::builder("status").tag("hostname", hostname) + .field("product_class", status.product_class.clone()).build().unwrap(), + DataPoint::builder("status").tag("hostname", hostname) + .field("hardware_version", status.hardware_version.clone()).build().unwrap(), + DataPoint::builder("status").tag("hostname", hostname) + .field("software_version", status.software_version.clone()).build().unwrap(), + DataPoint::builder("status").tag("hostname", hostname) + .field("rescue_version", status.rescue_version.clone()).build().unwrap(), + DataPoint::builder("status").tag("hostname", hostname) + .field("external_ip_address", status.external_ip_address.clone()).build().unwrap(), + DataPoint::builder("status").tag("hostname", hostname) + .field("number_of_reboots", status.number_of_reboots as i64).build().unwrap(), + DataPoint::builder("status").tag("hostname", hostname) + .field("upgrade_occurred", status.upgrade_occurred).build().unwrap(), + DataPoint::builder("status").tag("hostname", hostname) + .field("reset_occurred", status.reset_occurred.clone()).build().unwrap(), + DataPoint::builder("status").tag("hostname", hostname) + .field("base_mac", status.base_mac.clone()).build().unwrap(), + ]; +} + +async fn wan_config_to_datapoints(wan_config: &WANConfiguration, hostname: &String) + -> Vec { + return vec![ + DataPoint::builder("wan_config").tag("hostname", hostname) + .field("wan_state", wan_config.wan_state.clone()).build().unwrap(), + DataPoint::builder("wan_config").tag("hostname", hostname) + .field("link_type", wan_config.link_type.clone()).build().unwrap(), + DataPoint::builder("wan_config").tag("hostname", hostname) + .field("link_state", wan_config.link_state.clone()).build().unwrap(), + DataPoint::builder("wan_config").tag("hostname", hostname) + .field("mac_address", wan_config.mac_address.clone()).build().unwrap(), + DataPoint::builder("wan_config").tag("hostname", hostname) + .field("protocol", wan_config.protocol.clone()).build().unwrap(), + DataPoint::builder("wan_config").tag("hostname", hostname) + .field("connection_state", wan_config.connection_state.clone()).build().unwrap(), + DataPoint::builder("wan_config").tag("hostname", hostname) + .field("last_connection_error", wan_config.last_connection_error.clone()).build().unwrap(), + DataPoint::builder("wan_config").tag("hostname", hostname) + .field("ip_address", wan_config.ip_address.clone()).build().unwrap(), + DataPoint::builder("wan_config").tag("hostname", hostname) + .field("remote_gateway", wan_config.remote_gateway.clone()).build().unwrap(), + DataPoint::builder("wan_config").tag("hostname", hostname) + .field("dns_servers", wan_config.dns_servers.clone()).build().unwrap(), + DataPoint::builder("wan_config").tag("hostname", hostname) + .field("ipv6_address", wan_config.ipv6_address.clone()).build().unwrap(), + DataPoint::builder("wan_config").tag("hostname", hostname) + .field("ipv6_delegated_prefix", wan_config.ipv6_delegated_prefix.clone()).build().unwrap(), + ]; +} + +async fn devices_to_datapoints(devices: &Vec, hostname: &String) + -> Vec { + let mut datapoints = vec![]; + for d in devices { + datapoints.append(&mut vec![ + DataPoint::builder("devices").tag("hostname", hostname) + .field("key", d.key.clone()).build().unwrap(), + DataPoint::builder("devices").tag("hostname", hostname) + .field("name", d.name.clone()).build().unwrap(), + DataPoint::builder("devices").tag("hostname", hostname) + .field("discovery_source", d.discovery_source.clone()).build().unwrap(), + DataPoint::builder("devices").tag("hostname", hostname) + .field("active", d.active).build().unwrap(), + DataPoint::builder("devices").tag("hostname", hostname) + .field("device_type", d.device_type.clone()).build().unwrap(), + DataPoint::builder("devices").tag("hostname", hostname) + .field("tags", d.tags.clone()).build().unwrap(), + DataPoint::builder("devices").tag("hostname", hostname) + .field("ip_address", d.ip_address.clone().unwrap_or("N/A".to_string())).build().unwrap(), + DataPoint::builder("devices").tag("hostname", hostname) + .field("ssid", d.ssid.clone().unwrap_or("N/A".to_string())).build().unwrap(), + DataPoint::builder("devices").tag("hostname", hostname) + .field("channel", d.channel.unwrap_or(0) as i64).build().unwrap(), + ]); + } + return datapoints; +}