stabilizer/net/
mod.rs

1//! Stabilizer network management module
2//!
3//! # Design
4//! The stabilizer network architecture supports numerous layers to permit transmission of
5//! telemetry (via MQTT), configuration of run-time settings (via MQTT + Miniconf), and data
6//! streaming over raw UDP/TCP sockets. This module encompasses the main processing routines
7//! related to Stabilizer networking operations.
8pub use heapless;
9pub use miniconf;
10pub use serde;
11
12pub mod data_stream;
13pub mod network_processor;
14pub mod telemetry;
15
16use crate::hardware::{
17    metadata::ApplicationMetadata, EthernetPhy, NetworkManager, NetworkStack,
18    SystemTimer,
19};
20use crate::settings::NetSettings;
21use data_stream::{DataStream, FrameGenerator, StreamTarget};
22use network_processor::NetworkProcessor;
23use telemetry::TelemetryClient;
24
25use core::fmt::Write;
26use heapless::String;
27use miniconf::{TreeDeserializeOwned, TreeKey, TreeSerialize};
28use miniconf_mqtt::minimq;
29
30pub type NetworkReference =
31    smoltcp_nal::shared::NetworkStackProxy<'static, NetworkStack>;
32
33pub struct MqttStorage {
34    telemetry: [u8; 2048],
35    settings: [u8; 1024],
36}
37
38impl Default for MqttStorage {
39    fn default() -> Self {
40        Self {
41            telemetry: [0u8; 2048],
42            settings: [0u8; 1024],
43        }
44    }
45}
46
47pub enum UpdateState {
48    NoChange,
49    Updated,
50}
51
52pub enum NetworkState {
53    SettingsChanged,
54    Updated,
55    NoChange,
56}
57
58/// A structure of Stabilizer's default network users.
59pub struct NetworkUsers<S, const Y: usize>
60where
61    S: Default + TreeDeserializeOwned + TreeSerialize + Clone,
62{
63    pub miniconf: miniconf_mqtt::MqttClient<
64        'static,
65        S,
66        NetworkReference,
67        SystemTimer,
68        minimq::broker::NamedBroker<NetworkReference>,
69        Y,
70    >,
71    pub processor: NetworkProcessor,
72    stream: DataStream,
73    generator: Option<FrameGenerator>,
74    pub telemetry: TelemetryClient,
75}
76
77impl<S, const Y: usize> NetworkUsers<S, Y>
78where
79    S: Default + TreeDeserializeOwned + TreeSerialize + TreeKey + Clone,
80{
81    /// Construct Stabilizer's default network users.
82    ///
83    /// # Args
84    /// * `stack` - The network stack that will be used to share with all network users.
85    /// * `phy` - The ethernet PHY connecting the network.
86    /// * `clock` - A `SystemTimer` implementing `Clock`.
87    /// * `app` - The name of the application.
88    /// * `net_settings` - The network-specific settings to use for the application.
89    /// * `metadata` - The application metadata
90    ///
91    /// # Returns
92    /// A new struct of network users.
93    pub fn new(
94        stack: NetworkStack,
95        phy: EthernetPhy,
96        clock: SystemTimer,
97        app: &str,
98        net_settings: &NetSettings,
99        metadata: &'static ApplicationMetadata,
100    ) -> Self {
101        let stack_manager =
102            cortex_m::singleton!(: NetworkManager = NetworkManager::new(stack))
103                .unwrap();
104
105        let processor =
106            NetworkProcessor::new(stack_manager.acquire_stack(), phy);
107
108        let prefix = cortex_m::singleton!(: String<128> = get_device_prefix(app, &net_settings.id)).unwrap();
109
110        let store =
111            cortex_m::singleton!(: MqttStorage = MqttStorage::default())
112                .unwrap();
113
114        let named_broker = minimq::broker::NamedBroker::new(
115            &net_settings.broker,
116            stack_manager.acquire_stack(),
117        )
118        .unwrap();
119        let settings = miniconf_mqtt::MqttClient::<_, _, _, _, Y>::new(
120            stack_manager.acquire_stack(),
121            prefix.as_str(),
122            clock,
123            minimq::ConfigBuilder::new(named_broker, &mut store.settings)
124                .client_id(&get_client_id(&net_settings.id, "settings"))
125                .unwrap(),
126        )
127        .unwrap();
128
129        let named_broker = minimq::broker::NamedBroker::new(
130            &net_settings.broker,
131            stack_manager.acquire_stack(),
132        )
133        .unwrap();
134        let mqtt = minimq::Minimq::new(
135            stack_manager.acquire_stack(),
136            clock,
137            minimq::ConfigBuilder::new(named_broker, &mut store.telemetry)
138                // The telemetry client doesn't receive any messages except MQTT control packets.
139                // As such, we don't need much of the buffer for RX.
140                .rx_buffer(minimq::config::BufferConfig::Maximum(100))
141                .client_id(&get_client_id(&net_settings.id, "tlm"))
142                .unwrap(),
143        );
144
145        let telemetry = TelemetryClient::new(mqtt, prefix, metadata);
146
147        let (generator, stream) =
148            data_stream::setup_streaming(stack_manager.acquire_stack());
149
150        NetworkUsers {
151            miniconf: settings,
152            processor,
153            telemetry,
154            stream,
155            generator: Some(generator),
156        }
157    }
158
159    /// Enable data streaming.
160    ///
161    /// # Args
162    /// * `format` - A unique u8 code indicating the format of the data.
163    pub fn configure_streaming(
164        &mut self,
165        format: impl Into<u8>,
166    ) -> FrameGenerator {
167        let mut generator = self.generator.take().unwrap();
168        generator.configure(format);
169        generator
170    }
171
172    /// Direct the stream to the provided remote target.
173    ///
174    /// # Args
175    /// * `remote` - The destination for the streamed data.
176    pub fn direct_stream(&mut self, remote: StreamTarget) {
177        if self.generator.is_none() {
178            self.stream.set_remote(remote);
179        }
180    }
181
182    /// Update and process all of the network users state.
183    ///
184    /// # Returns
185    /// An indication if any of the network users indicated a state change.
186    /// The SettingsChanged option contains the path of the settings that changed.
187    pub fn update(&mut self, settings: &mut S) -> NetworkState {
188        // Update the MQTT clients.
189        self.telemetry.update();
190
191        // Update the data stream.
192        if self.generator.is_none() {
193            self.stream.process();
194        }
195
196        // Poll for incoming data.
197        let poll_result = match self.processor.update() {
198            UpdateState::NoChange => NetworkState::NoChange,
199            UpdateState::Updated => NetworkState::Updated,
200        };
201
202        let res = self.miniconf.update(settings);
203        match res {
204            Ok(true) => NetworkState::SettingsChanged,
205            _ => poll_result,
206        }
207    }
208}
209
210/// Get an MQTT client ID for a client.
211///
212/// # Args
213/// * `id` - The base client ID
214/// * `mode` - The operating mode of this client. (i.e. tlm, settings)
215///
216/// # Returns
217/// A client ID that may be used for MQTT client identification.
218fn get_client_id(id: &str, mode: &str) -> String<64> {
219    let mut identifier = String::new();
220    write!(&mut identifier, "{id}-{mode}").unwrap();
221    identifier
222}
223
224/// Get the MQTT prefix of a device.
225///
226/// # Args
227/// * `app` - The name of the application that is executing.
228/// * `id` - The MQTT ID of the device.
229///
230/// # Returns
231/// The MQTT prefix used for this device.
232pub fn get_device_prefix(app: &str, id: &str) -> String<128> {
233    // Note(unwrap): The mac address + binary name must be short enough to fit into this string. If
234    // they are defined too long, this will panic and the device will fail to boot.
235    let mut prefix: String<128> = String::new();
236    write!(&mut prefix, "dt/sinara/{app}/{id}").unwrap();
237
238    prefix
239}