platform/
telemetry.rs

1//! Stabilizer Telemetry Capabilities
2//!
3//! # Design
4//! Telemetry is reported regularly using an MQTT client. All telemetry is reported in SI units
5//! using standard JSON format.
6//!
7//! In order to report ADC/DAC codes generated during the DSP routines, a telemetry buffer is
8//! employed to track the latest codes. Converting these codes to SI units would result in
9//! repetitive and unnecessary calculations within the DSP routine, slowing it down and limiting
10//! sampling frequency. Instead, the raw codes are stored and the telemetry is generated as
11//! required immediately before transmission. This ensures that any slower computation required
12//! for unit conversion can be off-loaded to lower priority tasks.
13use crate::ApplicationMetadata;
14use heapless::String;
15use minimq::{
16    PubError, Publication,
17    embedded_nal::{Dns, TcpClientStack},
18    embedded_time::Clock,
19};
20use serde::Serialize;
21use smoltcp_nal::NetworkError;
22
23/// Default metadata message if formatting errors occur.
24const DEFAULT_METADATA: &str = "{\"message\":\"Truncated: See USB terminal\"}";
25
26/// The telemetry client for reporting telemetry data over MQTT.
27pub struct TelemetryClient<C: Clock, S: TcpClientStack> {
28    mqtt: minimq::Minimq<'static, S, C, minimq::broker::NamedBroker<S>>,
29    prefix: &'static str,
30    meta_published: bool,
31    metadata: &'static ApplicationMetadata,
32}
33
34impl<C: Clock, S: TcpClientStack<Error = smoltcp_nal::NetworkError> + Dns>
35    TelemetryClient<C, S>
36{
37    /// Construct a new telemetry client.
38    ///
39    /// # Args
40    /// * `mqtt` - The MQTT client
41    /// * `prefix` - The device prefix to use for MQTT telemetry reporting.
42    ///
43    /// # Returns
44    /// A new telemetry client.
45    pub fn new(
46        mqtt: minimq::Minimq<'static, S, C, minimq::broker::NamedBroker<S>>,
47        prefix: &'static str,
48        metadata: &'static ApplicationMetadata,
49    ) -> Self {
50        Self {
51            mqtt,
52            meta_published: false,
53            prefix,
54            metadata,
55        }
56    }
57
58    /// Publish telemetry over MQTT
59    ///
60    /// # Note
61    /// Telemetry is reported in a "best-effort" fashion. Failure to transmit telemetry will cause
62    /// it to be silently dropped.
63    ///
64    /// # Args
65    /// * `telemetry` - The telemetry to report
66    pub fn publish_telemetry<T: Serialize>(
67        &mut self,
68        suffix: &str,
69        telemetry: &T,
70    ) {
71        let mut topic: String<128> = self.prefix.try_into().unwrap();
72        topic.push_str(suffix).unwrap();
73        self.publish(&topic, telemetry)
74            .map_err(|e| log::error!("Telemetry publishing error: {:?}", e))
75            .ok();
76    }
77
78    pub fn publish<T: Serialize>(
79        &mut self,
80        topic: &str,
81        payload: &T,
82    ) -> Result<(), PubError<NetworkError, serde_json_core::ser::Error>> {
83        self.mqtt
84            .client()
85            .publish(minimq::Publication::new(&topic, |buf: &mut [u8]| {
86                serde_json_core::to_slice(payload, buf)
87            }))
88    }
89
90    /// Update the telemetry client
91    ///
92    /// # Note
93    /// This function is provided to force the underlying MQTT state machine to process incoming
94    /// and outgoing messages. Without this, the client will never connect to the broker. This
95    /// should be called regularly.
96    pub fn update(&mut self) {
97        match self.mqtt.poll(|_client, _topic, _message, _properties| {}) {
98            Err(minimq::Error::Network(
99                smoltcp_nal::NetworkError::TcpConnectionFailure(
100                    smoltcp_nal::smoltcp::socket::tcp::ConnectError::Unaddressable
101                ),
102            )) => {}
103
104            Err(error) => log::info!("Unexpected error: {:?}", error),
105            _ => {}
106        }
107
108        if !self.mqtt.client().is_connected() {
109            self.meta_published = false;
110            return;
111        }
112
113        // Publish application metadata
114        if !self.meta_published
115            && self.mqtt.client().can_publish(minimq::QoS::AtMostOnce)
116        {
117            let Self { mqtt, metadata, .. } = self;
118
119            let mut topic: String<128> = self.prefix.try_into().unwrap();
120            topic.push_str("/meta").unwrap();
121
122            if mqtt
123                .client()
124                .publish(Publication::new(&topic, |buf: &mut [u8]| {
125                    serde_json_core::to_slice(&metadata, buf)
126                }))
127                .is_err()
128            {
129                // Note(unwrap): We can guarantee that this message will be sent because we checked
130                // for ability to publish above.
131                mqtt.client()
132                    .publish(Publication::new(
133                        &topic,
134                        DEFAULT_METADATA.as_bytes(),
135                    ))
136                    .unwrap();
137            }
138
139            self.meta_published = true;
140        }
141    }
142}