Ring Buffer Lock-Free : Communication Inter-Threads Temps Réel sur Raspberry Pi

Raspberry Pi 4 avec patch temps réel et communication inter-threads

Vous avez un kernel temps réel mais vos threads se bloquent encore ?

Dans mon article précédent sur le kernel temps réel, nous avons configuré un Raspberry Pi 4 avec RT_PREEMPT pour obtenir des latences garanties sous les 100 µs.

Mais un kernel temps réel ne suffit pas : il faut aussi éviter les mécanismes de synchronisation qui bloquent.

Aujourd’hui, je vous présente les ring buffers lock-free, la solution pour une communication inter-threads sans aucun blocage, parfaite pour le temps réel strict.

Le problème des mutex et sémaphores en temps réel

Scénario catastrophe : l’inversion de priorité

Prenons le cas de deux threads :

Thread Priorité Rôle
Thread A 80 (haute) Master EtherCAT, cycle de 1 ms
Thread B 60 (basse) Communication UDP avec PC

Les deux threads partagent des données via un mutex. Voici ce qui peut se passer :

1
2
3
4
5
6
t=0ms   : Thread B (prio 60) acquiert le mutex
t=0.5ms : Thread A (prio 80) se réveille, tente d'acquérir le mutex
→ BLOQUÉ en attendant B !
t=1.5ms : Thread B libère enfin le mutex
t=1.5ms : Thread A peut continuer...
→ Trop tard, deadline manquée !

C’est l’inversion de priorité : le thread haute priorité est bloqué par un thread basse priorité. En temps réel, c’est inacceptable.

Les coûts cachés des verrous

Même avec des mécanismes avancés (priority inheritance), les verrous ont des inconvénients majeurs :

Mécanisme Problème principal Impact temps réel
Mutex Inversion de priorité Latence imprévisible
Sémaphore Appels système (syscalls) Latence de plusieurs µs
Spinlock Gaspillage CPU (busy-wait) Mauvais sur multi-core
Priority inheritance Complexe, overhead Latence réduite mais présente

Le constat : Aucun verrou ne garantit zéro blocage ni latence constante en O(1).

La solution : Ring Buffer Lock-Free

Principe fondamental

Un ring buffer lock-free (Single Producer Single Consumer - SPSC) repose sur deux règles simples :

UN SEUL producteur écrit dans le buffer
UN SEUL consommateur lit depuis le buffer

Avec ces contraintes, on peut utiliser des indices atomiques sans aucun verrou :

1
2
3
4
5
6
7
8
9
10
11
12
13
                    write_idx (atomique)


┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│ D │ E │ │ │ │ A │ B │ C │
└─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────┘


read_idx (atomique)

• Le producteur écrit à write_idx puis l'incrémente atomiquement
• Le consommateur lit à read_idx puis l'incrémente atomiquement
• ZÉRO verrou, ZÉRO syscall, ZÉRO blocage

Pourquoi ça fonctionne ?

La magie réside dans les opérations atomiques C11 :

  1. memory_order_acquire : Garantit que les lectures ne se réordonnent pas avant
  2. memory_order_release : Garantit que les écritures ne se réordonnent pas après
  3. memory_order_relaxed : Lecture simple sans contrainte de synchronisation

Ces garanties sont matérielles (barrières mémoire CPU), pas logicielles. Résultat : O(1) constant, zéro blocage.

Avantages pour le temps réel

Caractéristique Ring Buffer Lock-Free Mutex/Sémaphore
Blocage Jamais Toujours possible
Inversion de priorité Impossible Risque élevé
Latence O(1) constante Imprévisible
Appels système Zéro Chaque lock/unlock
Overhead ~10 ns ~1-5 µs
Complexité Simple Nécessite gestion d’erreurs

Implémentation détaillée en C

Structure de données

Voici l’implémentation complète du ring buffer :

1
2
3
4
5
6
7
8
9
10
11
12
13
#define RING_SIZE 64
#define MSG_SIZE 64

typedef struct {
char message[MSG_SIZE];
int counter;
} message_t;

typedef struct {
message_t items[RING_SIZE];
_Alignas(64) atomic_size_t write_idx; // Alignement cache line
_Alignas(64) atomic_size_t read_idx; // Évite false sharing
} ringbuf_t;

Points clés :

  • _Alignas(64) : Chaque index est sur sa propre ligne de cache (64 bytes sur ARM Cortex-A72)
  • Évite le false sharing : quand deux CPUs modifient des données sur la même cache line
  • atomic_size_t : Type atomique C11, opérations garanties thread-safe

Initialisation

1
2
3
4
5
6
void ringbuf_init(ringbuf_t *rb)
{
memset(rb, 0, sizeof(*rb));
atomic_store(&rb->write_idx, 0);
atomic_store(&rb->read_idx, 0);
}

Rien de spécial ici, on met tout à zéro.

Écriture (producteur)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bool ringbuf_write(ringbuf_t *rb, const message_t *msg)
{
// 1. Lire les indices (relaxed car pas de sync nécessaire ici)
size_t w = atomic_load_explicit(&rb->write_idx, memory_order_relaxed);
size_t r = atomic_load_explicit(&rb->read_idx, memory_order_acquire);

// 2. Vérifier si le buffer est plein
size_t next = (w + 1) % RING_SIZE;
if (next == r) {
return false; // Buffer plein, échec NON-BLOQUANT
}

// 3. Écrire le message
rb->items[w] = *msg;

// 4. Publier l'écriture (release garantit que le message est visible)
atomic_store_explicit(&rb->write_idx, next, memory_order_release);
return true;
}

Analyse ligne par ligne :

  • memory_order_acquire sur read_idx : On veut voir les dernières lectures du consommateur
  • Écriture normale du message : Pas besoin d’atomique, un seul producteur
  • memory_order_release sur write_idx : Garantit que le message est écrit AVANT que l’index soit publié

Lecture (consommateur)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool ringbuf_read(ringbuf_t *rb, message_t *msg)
{
// 1. Lire les indices
size_t r = atomic_load_explicit(&rb->read_idx, memory_order_relaxed);
size_t w = atomic_load_explicit(&rb->write_idx, memory_order_acquire);

// 2. Vérifier si le buffer est vide
if (r == w) {
return false; // Buffer vide, échec NON-BLOQUANT
}

// 3. Lire le message
*msg = rb->items[r];

// 4. Publier la lecture
atomic_store_explicit(&rb->read_idx, (r + 1) % RING_SIZE, memory_order_release);
return true;
}

Symétrique à l’écriture :

  • memory_order_acquire sur write_idx : On veut voir les dernières écritures du producteur
  • Lecture normale du message
  • memory_order_release sur read_idx : Publie qu’on a lu le message

Pourquoi memory_order_acquire et release ?

C’est la clé de la synchronisation lock-free :

1
2
3
4
5
6
7
8
9
PRODUCTEUR                          CONSOMMATEUR
─────────────────────────────────────────────────
rb->items[w] = message;

atomic_store(write_idx, RELEASE) ───┐

└──> atomic_load(write_idx, ACQUIRE)

message = rb->items[r];

Le RELEASE du producteur synchronise avec l’ACQUIRE du consommateur, garantissant que le message est visible.

Exemple pratique : Communication bidirectionnelle

Le programme d’exemple démontre une communication bidirectionnelle entre deux threads en utilisant deux ring buffers :

1
2
3
4
5
6
7
Thread A (prio 80)                    Thread B (prio 60)
CPU 2 CPU 3
│ │
│◄──── rb_b_to_a ◄────────────────────│
│ │
│────► rb_a_to_b ─────────────────────►│
│ │

Architecture du test

Thread Priorité CPU Période Message
Thread A 80 (SCHED_FIFO) 2 isolé 1 seconde “ping”
Thread B 60 (SCHED_FIFO) 3 isolé 3 secondes “pong”

Observation attendue : Thread A envoie 3 messages pour chaque message de B (ratio 1s/3s), sans jamais se bloquer.

Boucle principale du thread A

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
while (running) {
// 1. Lire les messages de B (NON-BLOQUANT)
message_t msg;
while (ringbuf_read(&rb_b_to_a, &msg)) {
recv_counter++;
printf("A ← B { msg: \"pong\", num: %d, tot: %d }\n",
msg.counter, recv_counter);
}

// 2. Envoyer un message à B toutes les secondes
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);

if (now.tv_sec >= next_send.tv_sec) {
send_counter++;
message_t out = { .counter = send_counter };
snprintf(out.message, MSG_SIZE, "Ping depuis A");

if (ringbuf_write(&rb_a_to_b, &out)) {
printf("A → B { msg: \"ping\", num: %d, tot: %d }\n",
out.counter, send_counter);
}

next_send.tv_sec += 1;
}

// 3. Petite pause pour économiser le CPU
usleep(10000); // 10ms
}

Points importants :

  • ringbuf_read retourne immédiatement si pas de message (non-bloquant)
  • ringbuf_write retourne immédiatement si buffer plein (non-bloquant)
  • Aucun appel système de synchronisation
  • Le thread garde le contrôle total de son ordonnancement

Sortie du programme

Sortie du programme

La communication est fluide, sans blocage, avec un ratio 3:1 comme attendu.

Performance et garanties temps réel

Mesures de latence

Sur Raspberry Pi 4 avec kernel RT_PREEMPT :

Opération Latence Comparaison
ringbuf_write() ~15-20 ns Mutex: ~1-5 µs (100x plus lent)
ringbuf_read() ~15-20 ns Sémaphore: ~2-8 µs (150x plus lent)
Overhead total O(1) constant Mutex: imprévisible

Garanties temps réel

Jamais de blocage : Les opérations retournent immédiatement
Latence constante : O(1) indépendante de la charge
Pas d’inversion de priorité : Pas de verrou = pas d’inversion
Pas d’appel système : Tout en espace utilisateur
Déterminisme garanti : Comportement prévisible à 100%

Cas d’usage idéaux

Le ring buffer lock-free est parfait pour :

  • Master EtherCAT : Communication entre thread cycle PDO (1 ms) et thread réseau
  • Contrôle moteur : Commandes haute fréquence entre threads
  • Acquisition de données : Pipeline producteur/consommateur sans latence
  • Audio temps réel : Buffer de samples entre threads DSP
  • Robotique : Communication entre contrôleurs et supervision

Limites et précautions

Quand NE PAS utiliser un ring buffer lock-free

Multiple producteurs ou consommateurs : Nécessite des atomiques CAS (Compare-And-Swap), plus complexe
Besoin de notification immédiate : Le consommateur doit poller le buffer
Données de taille variable : Mieux vaut un allocateur lock-free
Priorité stricte FIFO entre >2 threads : Utiliser une queue lock-free multi-producteurs

Précautions d’implémentation

⚠️ Alignement cache line : Toujours aligner les indices atomiques sur 64 bytes
⚠️ Taille buffer = puissance de 2 : Simplifie le modulo avec un masque (% devient & 0x3F)
⚠️ Gestion buffer plein : Décider entre bloquer, abandonner, ou agrandir
⚠️ False sharing : Séparer les données read-only/write-only sur des cache lines différentes

Compilation et test du projet

Le code complet est disponible sur GitHub

Cross-compilation depuis votre PC

1
2
3
4
5
6
7
8
9
# Cloner le projet
git clone https://github.com/jeremydierx/ringbuf.git
cd ringbuf

# Cross-compiler pour Raspberry Pi 4 (aarch64)
make CROSS=1

# Déployer sur le Raspberry Pi
make CROSS=1 deploy RPI_HOST=ubuntu@10.0.0.1

Exécution sur Raspberry Pi

1
2
3
4
5
# Se connecter au Raspberry Pi
ssh ubuntu@10.0.0.1

# Exécuter le test (nécessite sudo pour SCHED_FIFO)
sudo ./exemple_ringbuf

Prérequis : Raspberry Pi 4 avec kernel RT_PREEMPT et CPUs 2-3 isolés (voir article précédent)

Conclusion

Les ring buffers lock-free sont la solution idéale pour la communication inter-threads en temps réel strict :

  1. Zéro blocage : Les threads ne s’attendent jamais
  2. Latence constante : O(1) indépendante de la charge (15-20 ns)
  3. Pas d’inversion de priorité : Pas de verrou = pas de problème
  4. Simplicité : Implémentation en ~50 lignes de C

Combinés avec un kernel RT_PREEMPT et des CPUs isolés, ils permettent d’atteindre un déterminisme total sur Raspberry Pi 4.

Le code complet, abondamment commenté, est disponible sur GitHub et peut servir de base pour vos propres projets temps réel.

Pour aller plus loin

Ressources techniques

Note : Cet article fait suite à Kernel Temps Réel sur Raspberry Pi 4 : Guide et Tutoriel C++. Si vous n’avez pas encore configuré votre Raspberry Pi avec RT_PREEMPT, commencez par là !


Jérémy @ Code Alchimie