Implement stateful processing

In the previous tutorials, we have seen how to manipulate data using processing nodes declared with the HUX_DECLARE_PROCESSING construct. Such nodes are stateless, in the sense that they always produce the same output for a given input. In other words, such nodes are blind with respect to the history of received inputs.

In more complex applications, however, it might be useful to define stateful processing nodes. A stateful processing node has a state which is preserved across subsequent executions of the node’s logic. This allows a processing node to keep track of information from previous executions and to act accordingly. To declare a stateful processing node the language provides the HUX_DECLARE_STATEFUL_PROCESSING construct:

/*
 * stateful_node     : the stateful processing node label
 * state_t           : the type of the desired state
 * input_channel     : the processing node input channel
 * {}                : custom processing node logic
 */
HUX_DECLARE_STATEFUL_PROCESSING(stateful_node, state_t, input_channel, {});

Every stateful processing node, within its custom logic can access the special hux_state variable which can be read and written and whose data is preserved across subsequent execution of the processing node.

The type state_t of the hux_state variable can be any valid C++ type (e.g.: float, hux::int32_t, bool, an array, a struct). On application start, the hux_state variable is automatically zero-initialized by default. When using a user-defined C++ struct as a state type, it is also possible to set a specific initial state using default member initializers within the struct declaration.

Alarm with state example

The following example shows how to use a simple stateful processing node to set an alarm when more than 3 consecutive temperature samples are out of threshold:

#include <huxon/lang.hpp>
#include <huxon/sensors/STMicroelectronics.hpp>

/* Declare an environmental sensor (temperature and humidity) */
HUX_DECLARE_SENSOR(env_sensor, hux::sensors::STMicroelectronics::HTS221);

/* Send the temperature source of the environmental sensor through a channel */
HUX_DECLARE_CHANNEL(temperature_ch, merge, env_sensor.get_temperature());

/* Define the state type for the alarm stateful processing node */
struct alarm_state_t {
    hux::uint32_t bad_samples_cnt = 0; /* initialized to 0 by default */
    bool in_alarm = false; /* initialized to false by default */
};

/* Declare the stateful processing node to detect alarms from temperature data */
HUX_DECLARE_STATEFUL_PROCESSING(alarm, alarm_state_t, temperature_ch, {

    const float TEMPERATURE_THS = 50;
    const hux::uint32_t BAD_SAMPLES_THS = 3;

    /* Check if we received a good temperature sample */
    bool is_good_temperature = hux_input < TEMPERATURE_THS;

    /* If good temperature sample:
     *  - reset alarm
     *  - reset bad samples counter
     * If bad temperature sample:
     *  - increase bad samples counter
     *  - if too many bad samples set the alarm
     *
     * NOTE: 'hux_state' is of type alarm_state_t, changes to 'hux_state'
     * will be seen by subsequent executions of this processing node
     * */
    if(is_good_temperature) {
        hux_state.bad_samples_cnt = 0;
        hux_state.in_alarm = false;
    } else {
        hux_state.bad_samples_cnt++;
        if(hux_state.bad_samples_cnt > BAD_SAMPLES_THS) {
            hux_state.in_alarm = true;
        }
    }

    return hux_state.in_alarm;
});

/* Send alarm data through a channel and define the application output */
HUX_DECLARE_CHANNEL(alarm_ch, merge, alarm);

HUX_DECLARE_PROCESSING(output, alarm_ch, {
    HUX_DECLARE_OUTPUT_VALUE(alarm_out, Boolean, "alarm", hux_input);
    return alarm_out;
});

HUX_REGISTER_OUTPUT(output);

Compute over a window of samples

A common use case for stateful processing nodes is to implement logic that operates on a window of the last N samples of a channel. This is particularly useful for implementing signal processing filters, such as low/high pass filters or to implement delays.

To keep the last N samples of a channel, we can use a stateful processing node. The following code shows how to implement a simple low pass averaging filter over the acceleration of an inertial sensor:

#include <huxon/lang.hpp>
#include <huxon/sensors/STMicroelectronics.hpp>

#include <array>

/* Declare an inertial sensor with simulation data loaded from file */
HUX_DECLARE_SIMULATION_DATA(imu_sim_dataset,
    hux::simulation::load_csv<float, float, float, hux::uint64_t>("dataset.csv", ";")
);
HUX_DECLARE_SENSOR(env_sensor, hux::sensors::STMicroelectronics::IIS2DLPC, imu_sim_dataset);

/* Send the X axis acceleration of the inertial sensor through a channel */
HUX_DECLARE_CHANNEL(acc_x_ch, merge, env_sensor.get_accX());

/* Define a type to hold a window of at most N float samples.
 * We use a template parameter "N" so that we can easily instantiate
 * the type with the desired window size as needed
 * */
template<hux::uint64_t N>
struct window_t {
    std::array<float, N> window;
    hux::uint64_t cur_size = 0; /* initialized to 0 by default */
};

/* Declare a stateful node to compute an averaging filter over the X axis acceleration.
 * The node uses a state of type window_t<8> to hold the last 8 acceleration samples.
 * */
HUX_DECLARE_STATEFUL_PROCESSING(avg_filter, window_t<8>, acc_x_ch, {

    /* Use the size() method of std::array to retrieve the window size N */
    const hux::uint64_t window_size = hux_state.window.size();

    if(hux_state.cur_size == window_size) {
        /* If the window is full, shift all samples to the left by one position,
         * the least recent sample at position 0 gets overwritten and a free slot is
         * created at position hux_state.cur_size - 1
         * */
        for(hux::uint64_t i = 0; i < hux_state.cur_size - 1; i++) {
            hux_state.window[i] = hux_state.window[i + 1];
        }
    } else {
        /* If the window is not full, increase the current size */
        hux_state.cur_size++;
    }

    /* store the current acceleration sample at the end of the window */
    hux_state.window[hux_state.cur_size - 1] = hux_input;

    /* At this point we have a window of samples stored in:
     *   hux_state.window[0]
     *   hux_state.window[1]
     *     ...
     *   hux_state.window[hux_state.cur_size - 1]
     * Where hux_state.window[0] is the oldest sample and
     * hux_state.window[hux_state.cur_size - 1] is the most recent sample.
     * After the initial fill-up stage, the window will contain exactly window_size samples.
     *
     * We can now compute the output of the averaging filter as the average of the samples
     * within the window:
     * */
    float filter_out = 0.0f;
    for(hux::uint64_t i = 0; i < hux_state.cur_size; i++) {
        filter_out += hux_state.window[i];
    }
    filter_out = filter_out / hux_state.cur_size;

    return filter_out;
});

/* Send filter data through a channel and prepare the application output */
HUX_DECLARE_CHANNEL(avg_filter_ch, merge, avg_filter);

HUX_DECLARE_PROCESSING(output, avg_filter_ch, {
    HUX_DECLARE_OUTPUT_VALUE(alarm_out, Float, "low_pass_acc_x", hux_input);
    return alarm_out;
});

HUX_REGISTER_OUTPUT(output);

Note

When dealing with large window sizes, a more efficient approach is to use a circular buffer. This removes the overhead of shifting the elements of the array at each execution of the processing node.

The following is an example of simulation data stored in dataset.csv:

accX;           accY;           accZ;           timestamp
-0.011006;      0.017531;       -1.020886;      1653408559000
0.454593;       0.010787;       -0.974768;      1653408559038
0.826860;       0.009695;       -1.015692;      1653408559076
0.989387;       0.018663;       -0.989055;      1653408559115
0.913467;       0.007103;       -0.981601;      1653408559153
0.687052;       -0.007337;      -1.002648;      1653408559192
0.225544;       -0.000822;      -0.991091;      1653408559230
-0.259760;      0.003179;       -0.965981;      1653408559269
-0.675087;      -0.017757;      -0.961462;      1653408559307
-0.959974;      -0.012362;      -0.971334;      1653408559346
-0.971041;      -0.024752;      -0.972259;      1653408559384
-0.803155;      0.001559;       -0.970502;      1653408559423
-0.481639;      -0.010084;      -0.963446;      1653408559461
0.008945;       -0.008212;      -0.963033;      1653408559500
0.466608;       0.006772;       -0.960363;      1653408559538
0.845139;       -0.009987;      -0.968539;      1653408559576
1.001948;       -0.017546;      -0.972334;      1653408559615
0.932987;       0.013879;       -0.935979;      1653408559653
0.670063;       -0.012978;      -0.944050;      1653408559692
0.250723;       -0.018183;      -0.953633;      1653408559730
-0.253977;      -0.000580;      -0.969291;      1653408559769
-0.650887;      0.018779;       -0.957099;      1653408559807
-0.925542;      0.010482;       -0.921970;      1653408559846
-0.968980;      0.023544;       -0.961882;      1653408559884
-0.800433;      -0.011731;      -0.950111;      1653408559923
-0.444572;      0.015571;       -0.926836;      1653408559961
-0.006043;      0.024095;       -0.955827;      1653408560000