Employing Senders and Receivers to Tame Concurrency in Embedded Systems

Michael Caisse

Exercise

Warm-up

From this side of the room, count your position in the row.

Are you odd or even?

girl_with_pearl_earring.jpeg

Everyone

  • If told a colour, think of opposite colour
  • Count to 9
  • Respond with opposite colour
  • If told a number, subtract favourite number
  • Count to 6
  • Respond with new number

You are Odd

  • Your hotel room number + favourite number
  • Count to 17
  • Tell the person to your right

You are Even

  • Think of favourite colour
  • Count to 19
  • Tell the person to your right

bank.jpeg

STM32L432KC

Development Board

nucleo-32.png

TIM1

Interrupts

Challenges

  • Polling inefficiencies
  • Missed events
  • Power concerns

Challenges

  • Polling inefficiencies
  • Missed events
  • Power concerns

Peripherials

  • 26 GPIO
  • 14 communication interfaces (USB, SAI, I2C, CAN, …)
  • Quad SPI memory interface
  • 11 Timers
  • 3 cap-sense channels
  • 1 ADC
  • 2 DAC

Interrupts

NVIC Table

Interrupt Context

ISR Guidelines

  • Return quickly from ISR
  • Do not do work, capture state and pass it on

Working with Interrupt Service Routines

Minor Activity

Contained within the ISR

We have work!

Things are getting complex

Message Decode

Message Decode

Message Decode

Message Decode

Message Decode

Message Decode

No more twist

What is the problem?

Asynchronous

Ancient Greek: σύγχρονος

Parallelism

Comes from Hardware

Concurrency

Is a problem of associating independent execution steps with their proper state.

Simple Proxy

Events versus State

Bringing state to events

How does that work for you?

ROOM

Bringing events to the state.

The machine

Senders and Receivers

Building our world view

Stepping back as a user

  • Sender Factories
  • Sender Adapters
  • Sender Consumers

Sender Factories

Functions that return senders.

  • just
  • just_result_of
  • just_error
  • just_error_result_of
  • just_stopped
auto sndr = async::just(42, 17);

Sender Factories

Functions that return senders.

  • just
  • just_result_of
  • just_error
  • just_error_result_of
  • just_stopped
auto sndr = async::just_result_of(
              [] { return 42; },
              [] { do_something(); },
              [] { return 17; });

Sender Factories

Functions that return senders.

  • schedule
auto sndr = sched.schedule();

Sender Adapters

Take one or more senders and returns a sender that is the composition.

  • continue_on
  • start_on
  • let_value
  • let_error
  • let_stopped
  • then
  • sequence
  • upon_error
  • upon_stopped
  • repeat
  • repeat_n
  • repeat_until
  • retry
  • retry_until
  • split
  • when_any
  • when_all

Sender Consumers

Functions that take senders and start the work.

  • start_detached
  • start_detached_unstoppable
  • sync_wait

Rebranded slide here

Employing Senders to Tame Concurrency in Embedded Systems

Uniform Interface

  • Values
  • Errors
  • Cancellation

Composition

auto comp =
    s1.schedule()
  | async::then([] { return 42; })
  | async::continue_on(s2)
  | async::then([] (int i) { return std::to_string(i); })
  ;

auto r = comp | async::sync_wait();

auto [str] = r.value_or(std::make_tuple(""_s));

Bringing the Event to the State

auto fetch_temp =
    s1.schedule()
  | async::seq(d1.send(get_temperature))
  | d1.on_msg(match("cmd"_f = 0x32))
  | async::then(
      [](auto msg) { return msg["temp"_f]; } )
  | async::let_value(
      [](auto t) { return display.update_temp(t); })
  | delay(1s)
  | async::repeat()
  ;

auto t =
  fetch_temp | async::start_detached_unstoppable();

Intel Baremetal Sender Library

github_sr.png

https://github.com/intel/cpp-baremetal-senders-and-receivers

Embedded Hello World

led_blink_320.gif

Blinky!

Starting Off

int main() {
  initialize_board();

  while(true) {
    set_led(true);
    sleep();
    set_led(false);
    sleep();
  }
}

Starting Off

volatile std::uint32_t sleep_temp = 0;
void sleep() {
  for (int i=0; i<5; ++i) {
    for (int j=0; j<0xffff; ++j) {
      sleep_temp = sleep_temp + 1;
    }
  }
}

int main() {
  initialize_board();

  while(true) {
    set_led(true);
    sleep();
    set_led(false);
    sleep();
  }
}

Starting Off

using cv_ptr_uint32_t = volatile std::uint32_t * const;
using cv_ptr_uint8_t  = volatile std::uint8_t * const;

inline void rmw(cv_ptr_uint32_t address,
                uint32_t mask, uint32_t value) {
  *address = (*address & ~mask) | (value & mask);
}

constexpr int GPIOx_ODR = 0x14;
auto GPIOB_BASE = (cv_ptr_uint8_t)(0x4800'0400);
auto GPIOB_ODR  = (cv_ptr_uint32_t)(GPIOB_BASE + GPIOx_ODR);
constexpr std::uint32_t LED_MASK = 0x0000'0008;

void set_led(bool v) {
  rmw(GPIOB_ODR, LED_MASK, v ? LED_MASK : 0x0);
}

int main() {
  /* ... */
}

  • The Sleep Heater
  • Bit Twiddling not Composable
  • No True Sleep
int main() {
  initialize_board();

  while(true) {
    set_led(true);
    sleep();
    set_led(false);
    sleep();
  }
}

Introducing Senders to Blinky

Compose

int main() {
  initialize_board();

  async::sender auto blinky =
      async::just()
    | async::then([](){ set_led(true); })
    | async::then([](){ sleep(); })
    | async::then([](){ set_led(false); })
    | async::then([](){ sleep(); })
    ;

  while(true) {
    blinky | async::sync_wait();
  }
}

Async Function Composition

int main() {
  initialize_board();

  auto on_cycle =
      async::then([](){ set_led(true); })
    | async::then([](){ sleep(); })
    ;

  auto off_cycle =
      async::then([](){ set_led(false); })
    | async::then([](){ sleep(); })
    ;

  async::sender auto blinky =
    async::just() | on_cycle | off_cycle;

  while(true) {
    blinky | async::sync_wait();
  }
}

Add Scheduler

Use async::fixed_priority_scheduler

  • Allows N priorities
  • Works with async::task_manager

Add Scheduler

Need to:

  • Define a concurrency policy
  • Provide method to signale there is work
  • Provide mechanism to run priority items

Concurrency Policy

struct concurrency_policy {
  template <typename = void,
            stdx::invocable F, stdx::predicate... Pred>
     requires(sizeof...(Pred) < 2)
  static inline
  auto call_in_critical_section(F &&f, auto &&...pred)
    -> decltype(auto) {
    while (true) {
      disable_interrupts_lock lock{};
      if ((... and pred())) {
        return std::forward<F>(f)();
      }
    }
  }
};

template <>
inline auto conc::injected_policy<> = concurrency_policy{};

Fixed Priority Scheduler

Notified there is work to do.

struct interrupt_scheduler {
  static auto schedule(async::priority_t p) -> void {
    if (p==0) {
      trigger_interrupt(56);
    }
  }
};

using task_manager_t =
  async::priority_task_manager<interrupt_scheduler, 2>;

template <>
inline auto async::injected_task_manager<> = task_manager_t{};

Fixed Priority Scheduler

Perform the work:

extern "C" {
  // taking this interrupt over for our scheduler
  // DMA2_CH1 is interrupt 56 for our target
  inline void DMA2_CH1_Handler(void) {
    async::task_mgr::service_tasks<0>();
  }
}

Scheduled Blinky

int main() {
  /* ... */

  async::sender auto blinky =
      async::fixed_priority_scheduler<0>{}.schedule()
    | on_cycle | off_cycle
    ;

  auto s = blinky | async::repeat() | async::start_detatched();

  while(true) {
    async::task_mgr::service_tasks<1>();
  }
}

hourglass.jpeg

Sleep Heater

space_heater.jpeg

async::time_scheduler

Work that will be run after a specified duration

auto s = async::time_scheduler{10ms}; // after a duration of 10ms

Provide HAL

namespace {
  struct hal {
    using time_point_t =
      std::chrono::local_time<
        std::chrono::duration<
          std::uint32_t, std::ratio<1, 16'000'000>
        >
      >;

    using task_t = async::timer_task<time_point_t>;

    /* static methods implementing HAL */
  };
} // namespace

Provide HAL

namespace {
  struct hal {
    /* time_point_t and task_t aliases */

    static auto enable() -> void { enable_timer(); }
    static auto disable() -> void { disable_timer(); }

    static auto set_event_time(time_point_t tp) -> void {
      start_timer(tp.time_since_epoch().count());
    }

    static auto now() -> time_point_t {
      return
        time_point_t{
          time_point_t::duration{get_timer_value()}
        };
    }
  };

  using timer_manager_t = async::generic_timer_manager<hal>;
} // namespace

time point from duration

namespace async::timer_mgr {
  template <typename Rep, typename Period>
  struct time_point_for<std::chrono::duration<Rep, Period>> {
    using type = hal::time_point_t;
  };
}

Setup Timer ISR

extern "C" {
  void TIM2_Handler(void) {
    rmw(TIM2_SR, CC1IF_MASK, 0x00);
    async::timer_mgr::service_task();
  }
}

Inject the timer manager

// time_scheduler will use this timer_manager
template <>
inline auto async::injected_timer_manager<> = timer_manager_t{};

Using Time Scheduler

async::sender auto blinky =
    async::just()
  | async::then([](){ set_led(true); })
  | async::continue_on(async::timer_scheduler{1s})
  | async::then([](){ set_led(false); })
  | async::continue_on(async::timer_scheduler{300ms})
  ;

auto s = blinky | async::repeat() | async::start_detached();

Using Time Scheduler

auto delay = [](auto v) {
  return async::continue_on(async::time_scheduler{v});
};

auto led_on = async::then([](){ set_led(true); });
auto led_off = async::then([](){ set_led(false); });
auto on_cycle  = led_on  | delay(1s);
auto off_cycle = led_off | delay(300ms);

async::sender auto blinky =
    async::just()
  | on_cycle
  | off_cycle
  ;

auto s = blinky | async::repeat() | async::start_detached();

auto sched1 = async::fixed_priority_scheduler<1>{};

auto delay_then_transfer = [](auto v, auto s) {
  return
      async::continue_on(async::time_scheduler{v})
    | async::continue_on(s)
    ;
};

auto led_on = async::then([](){ set_led(true); });
auto led_off = async::then([](){ set_led(false); });
auto on_cycle  = led_on  | delay_then_transfer(1s, sched1);
auto off_cycle = led_off | delay_then_transfer(300ms, sched1);

async::sender auto blinky =
    sched1.schedule()
  | on_cycle
  | off_cycle
  ;

auto s = blinky | async::repeat() | async::start_detached();

groov

groov

groov_github.png

https://github.com/intel/generic-register-operation-optimizer

not groov-y

constexpr int GPIOx_MODER      = 0x00;
constexpr int GPIOx_OTYPER     = 0x04;
constexpr int GPIOx_OSPEEDR    = 0x08;
constexpr int GPIOx_PUPDR      = 0x0c;
constexpr int GPIOx_ODR        = 0x14;

auto GPIOB_BASE    = (volatile std::uint8_t * const)(0x4800'0400);
auto GPIOB_MODER   = (volatile std::uint32_t * const)(GPIOB_BASE + GPIOx_MODER);
auto GPIOB_OTYPER  = (volatile std::uint32_t * const)(GPIOB_BASE + GPIOx_OTYPER);
auto GPIOB_OSPEEDR = (volatile std::uint32_t * const)(GPIOB_BASE + GPIOx_OSPEEDR);
auto GPIOB_PUPDR   = (volatile std::uint32_t * const)(GPIOB_BASE + GPIOx_PUPDR);
auto GPIOB_ODR     = (volatile std::uint32_t * const)(GPIOB_BASE + GPIOx_ODR);

// setting for PB3 as output                       14  12   10 9 8  7 6 5 4  3 2 1 0
constexpr std::uint32_t GPIOB3_MODER_VALUE   = 0b00000000'00000000'00000000'01000000;
constexpr std::uint32_t GPIOB3_OSPEEDR_VALUE = 0b00000000'00000000'00000000'00000000;
constexpr std::uint32_t GPIOB3_PUPDR_VALUE   = 0b00000000'00000000'00000000'00000000;
//                                                                              3210
constexpr std::uint32_t GPIOB3_OTYPER_VALUE  = 0b00000000'00000000'00000000'00000000;

// PB3 2bit value mask - items taking two bits     14  12   10 9 8  7 6 5 4  3 2 1 0
constexpr std::uint32_t GPIOB3_2BIT_MASK     = 0b00000000'00000000'00000000'11000000;
// PB3 1bit value mask - items taking one bit                                   3210
constexpr std::uint32_t GPIOB3_1BIT_MASK     = 0b00000000'00000000'00000000'00001000;

rmw(GPIOB_MODER  , GPIOB3_2BIT_MASK, GPIOB3_MODER_VALUE);
rmw(GPIOB_OTYPER , GPIOB3_1BIT_MASK, GPIOB3_OTYPER_VALUE);
rmw(GPIOB_OSPEEDR, GPIOB3_2BIT_MASK, GPIOB3_OSPEEDR_VALUE);
rmw(GPIOB_PUPDR  , GPIOB3_2BIT_MASK, GPIOB3_PUPDR_VALUE);

groov-y

constexpr auto gpiob_config =
  stm32::gpiob(
    "moder.3"_f   = stm32::gpio::mode::output,
    "otyper.3"_f  = stm32::gpio::outtype::push_pull,
    "ospeedr.3"_f = stm32::gpio::speed::low_speed,
    "pupdr.3"_f   = stm32::gpio::pupd::none
  );

async::just(gpiob_config) | groov::write | async::sync_wait();

groov

void set_led(bool v) {

  groov::write(stm32::gpiob("odr.3"_f=v))
    | async::sync_wait();

}

groov

void set_led(bool v) {

  async::just(stm32::gpiob("odr.3"_f=v))
    | groov::write
    | async::sync_wait()
    ;
}

Using senders to make a sender

Sender Blinky

Putting it together

auto delay = [](auto v) {
  return
      async::continue_on(async::time_scheduler{v});
};

auto led_on  = groov::write(stm32::gpiob("odr.3"_f=true));
auto led_off = groov::write(stm32::gpiob("odr.3"_f=false));

auto on_cycle  = led_on  | delay(1s);
auto off_cycle = led_off | delay(300ms);

async::sender auto blinky =
    on_cycle
  | async::seq(off_cycle)
  ;

auto s = blinky | async::repeat() | async::start_detached();

Logging

Simple Logging

Standard

inline auto log(std::span<const char> info) {
  if (info.size() == 0) return;

  auto iter = info.begin();
  auto end_iter = info.end();

  auto ser_out = (volatile std::uint32_t * const)(0x40004428);
  auto isr = (volatile std::uint32_t * const)(0x4000441c);

  while (iter != end_iter) {
    *ser_out = *iter++;
    while((*isr & (1<<7)) == 0);
  }
}

Simple Logging

inline auto log(std::span<const char> info) {
  if (info.size() == 0) return;
  auto iter = info.begin();

  async::sender auto wait_txe =
      groov::read(stm32::usart2 / "isr.TXE"_f)
    | async::repeat_until(
          [](auto v)->bool{return v;}
      )
    ;

  async::just_result_of([&iter]() { return *iter++; })
    | async::then(
        [](auto v){ return stm32::usart2("tdr.TDR"_f = v); } )
    | groov::write
    | async::seq(wait_txe)
    | async::repeat_n(info.size())
    | async::sync_wait()
    ;
}

Interrupt Scheduler

fixed_priority_scheduler

  • Notified when a task is queued
  • Call service_task()

interrupt_scheduler

  • Only one handler registered
  • Call service_task()

Usage

extern "C" {
  void TIM2_Handler(void) {
    async::interrupt_mgr::interrupt_task_manager<Timer2Interrupt>::service_task();
  }
}

Usage

auto timer_interrupt =
    async::interrupt_scheduler<Timer2Interrupt>{}.schedule()
  | async::seq(groov::write(stm32::tim2("sr.CC1IF"_f = false)))
  | async::then([](){
      async::timer_mgr::service_task();
    })
  | async::repeat()
  | async::start_detached()
  ;

Scheduler

The event handling building block

Messaging

Sender Summary

How did it work out?

  • Continuation Passing Style (CPS) can be verbose
  • Escaping the Monad

  • Local reasoning
  • Channels for values and errors
  • Cancellation is part of design

Thank you

Questions?