Bounded Ring Buffer (Robotics)
in the wild Every ROS topic, every UART driver, every IMU sample stream: a fixed-capacity FIFO where producers can't be allowed to block consumers.
Problem
Implement a fixed-capacity ring buffer for a single producer / single consumer scenario. The buffer holds at most capacity items. Operations:
push(x)— try to enqueue. Returnstrueon success,falseif full (caller decides whether to drop, block, or overwrite).pop()— try to dequeue and return the oldest item. Returns the item or a "not present" sentinel if empty.len()— current item count.
All operations must be O(1) with no allocations after construction. No shifting allowed.
Examples
new RingBuffer(capacity=3)
push(1) → true buffer: [1] len=1
push(2) → true buffer: [1, 2] len=2
push(3) → true buffer: [1, 2, 3] len=3
push(4) → false (full) buffer: [1, 2, 3] len=3
pop() → 1 buffer: [2, 3] len=2
push(4) → true (wraps!) buffer: [2, 3, 4] len=3
pop() → 2 buffer: [3, 4] len=2
Why this is the canonical robotics-flavored queue problem
Every sensor driver, every networking layer, every audio/video subsystem on every robot you'll work on contains a ring buffer. The pattern: producers (interrupt handlers, camera frames, IMU ticks) drop new data into the back; consumers (control loops, logging, network stacks) pull from the front. Producer and consumer often run at very different rates, on different cores, sometimes in interrupt context where blocking is forbidden.
The ring buffer hits the spec with three properties no other queue gives you:
- Bounded memory. Allocated once, never grows. Critical for real-time / embedded.
- O(1) per op with zero allocation.
pushandpopare pointer arithmetic plus one modular index. - Wait-free in single-producer-single-consumer mode. No locks needed (with atomic indices) — the producer only writes
tail, consumer only writeshead, neither blocks the other.
The whole structure is just an array of size capacity plus two integers (head, tail) that advance modulo capacity. When tail == head the buffer is empty; when (tail + 1) % capacity == head it's full.
Hints
Hint 1 — array + two indices
Allocate array[capacity]. Keep head (oldest item position) and tail (next free slot). On push, write to array[tail] and advance tail. On pop, read array[head] and advance head. Both advance modulo capacity.
Hint 2 — distinguishing empty from full
If you only track head and tail, you can't tell empty (head == tail) from full (head == tail after the buffer wrapped completely). Two common fixes:
- Sacrifice one slot. Define full as
(tail + 1) % capacity == head. Capacity effectively becomescapacity - 1, but the check is branch-free. - Track length separately. Maintain
count; empty iffcount == 0, full iffcount == capacity. One extra integer.
Hint 3 — wrap with modulo
After advancing an index: idx = (idx + 1) % capacity. If capacity is a power of 2, use idx & (capacity - 1) — same result, faster, no division. (This is why every kernel ring buffer rounds its capacity up to a power of 2.)
Solution
class RingBuffer:
def __init__(self, capacity: int):
self.buf = [None] * capacity
self.cap = capacity
self.head = 0 # index of oldest item
self.tail = 0 # next write position
self.count = 0
def push(self, x: int) -> bool:
if self.count == self.cap:
return False
self.buf[self.tail] = x
self.tail = (self.tail + 1) % self.cap
self.count += 1
return True
def pop(self) -> int | None:
if self.count == 0:
return None
x = self.buf[self.head]
self.head = (self.head + 1) % self.cap
self.count -= 1
return x
def __len__(self) -> int:
return self.count
# Trace through the example
rb = RingBuffer(3)
for v in [1, 2, 3]:
ok = rb.push(v)
print(f"push {v} → {ok}, len={len(rb)}")
print(f"push 4 → {rb.push(4)} (expected False — full)")
print(f"pop → {rb.pop()}")
print(f"push 4 (wraps!) → {rb.push(4)}")
print(f"pop → {rb.pop()}")class RingBuffer {
constructor(capacity) {
this.buf = new Array(capacity).fill(undefined);
this.cap = capacity;
this.head = 0;
this.tail = 0;
this.count = 0;
}
push(x) {
if (this.count === this.cap) return false;
this.buf[this.tail] = x;
this.tail = (this.tail + 1) % this.cap;
this.count++;
return true;
}
pop() {
if (this.count === 0) return null;
const x = this.buf[this.head];
this.head = (this.head + 1) % this.cap;
this.count--;
return x;
}
get length() { return this.count; }
}
const rb = new RingBuffer(3);
for (const v of [1, 2, 3]) console.log(`push ${v} → ${rb.push(v)}, len=${rb.length}`);
console.log(`push 4 → ${rb.push(4)} (expected false — full)`);
console.log(`pop → ${rb.pop()}`);
console.log(`push 4 (wraps!) → ${rb.push(4)}`);
console.log(`pop → ${rb.pop()}`);pub struct RingBuffer<T: Copy + Default> {
buf: Vec<T>,
cap: usize,
head: usize,
tail: usize,
count: usize,
}
impl<T: Copy + Default> RingBuffer<T> {
pub fn new(capacity: usize) -> Self {
Self {
buf: vec![T::default(); capacity],
cap: capacity,
head: 0, tail: 0, count: 0,
}
}
pub fn push(&mut self, x: T) -> bool {
if self.count == self.cap { return false; }
self.buf[self.tail] = x;
self.tail = (self.tail + 1) % self.cap;
self.count += 1;
true
}
pub fn pop(&mut self) -> Option<T> {
if self.count == 0 { return None; }
let x = self.buf[self.head];
self.head = (self.head + 1) % self.cap;
self.count -= 1;
Some(x)
}
pub fn len(&self) -> usize { self.count }
}
fn main() {
let mut rb: RingBuffer<i32> = RingBuffer::new(3);
for v in [1, 2, 3] { println!("push {} → {}, len={}", v, rb.push(v), rb.len()); }
println!("push 4 → {} (expected false — full)", rb.push(4));
println!("pop → {:?}", rb.pop());
println!("push 4 (wraps!) → {}", rb.push(4));
println!("pop → {:?}", rb.pop());
}#include <iostream>
#include <optional>
#include <vector>
template <typename T>
class RingBuffer {
std::vector<T> buf_;
size_t cap_;
size_t head_ = 0;
size_t tail_ = 0;
size_t count_ = 0;
public:
explicit RingBuffer(size_t capacity)
: buf_(capacity), cap_(capacity) {}
bool push(T x) {
if (count_ == cap_) return false;
buf_[tail_] = x;
tail_ = (tail_ + 1) % cap_;
++count_;
return true;
}
std::optional<T> pop() {
if (count_ == 0) return std::nullopt;
T x = buf_[head_];
head_ = (head_ + 1) % cap_;
--count_;
return x;
}
size_t len() const { return count_; }
};
int main() {
RingBuffer<int> rb(3);
for (int v : {1, 2, 3})
std::cout << "push " << v << " → " << (rb.push(v) ? "true" : "false") << ", len=" << rb.len() << "\n";
std::cout << "push 4 → " << (rb.push(4) ? "true" : "false") << " (expected false — full)\n";
std::cout << "pop → " << (rb.pop() ? std::to_string(*rb.pop()) : std::string("nullopt")) << "\n";
auto v = rb.pop();
std::cout << "(consumed above) len=" << rb.len() << "\n";
std::cout << "push 4 (wraps!) → " << (rb.push(4) ? "true" : "false") << "\n";
std::cout << "pop → " << (rb.pop() ? std::to_string(*rb.pop()) : std::string("nullopt")) << "\n";
(void)v;
}// The C version is what ends up in actual firmware. Static storage,
// no malloc after init, no implicit allocations. Replace int with
// your sample type (an IMU struct, an audio frame, etc.).
#include <stdio.h>
#include <stdbool.h>
#define CAP 3
static int buf[CAP];
static int head = 0;
static int tail = 0;
static int count = 0;
static bool push(int x) {
if (count == CAP) return false;
buf[tail] = x;
tail = (tail + 1) % CAP;
count++;
return true;
}
// Out-param sentinel: pop returns true if a value was available.
static bool pop_(int* out) {
if (count == 0) return false;
*out = buf[head];
head = (head + 1) % CAP;
count--;
return true;
}
int main(void) {
for (int v = 1; v <= 3; v++)
printf("push %d → %s, len=%d\n", v, push(v) ? "true" : "false", count);
printf("push 4 → %s (expected false — full)\n", push(4) ? "true" : "false");
int got;
pop_(&got); printf("pop → %d\n", got);
printf("push 4 (wraps!) → %s\n", push(4) ? "true" : "false");
pop_(&got); printf("pop → %d\n", got);
return 0;
}The C version is closest to what actually runs on a robot — static storage, no malloc after boot, no implicit allocations. Replace int with your sample struct (a 32-byte IMU packet, a pointer to a camera frame) and you have the kernel buffer pattern every modern operating system uses.
Complexity
- Time
- O(1) for push, pop, len — pure pointer arithmetic plus one modular index.
- Space
- O(capacity) — allocated once at construction; never grows.
- Why no shifts
- An array-backed FIFO that didn't wrap would have to shift every remaining element on every pop —
O(n)per op. The wrap-around is what makes the ring buffer worth the bookkeeping.
In the wild
- ROS topic queues. Every
rospy.Publisher(queue_size=N)allocates a ring buffer of size N. If consumers can't keep up, the oldest message is dropped. - Linux kernel
kfifo—include/linux/kfifo.h, used by USB drivers, audio subsystem, debug tracers. Power-of-2 capacity for the& (cap-1)trick. - TCP receive buffer. A ring buffer of arrival data; consumed by
recv()calls. - UART / SPI / I2C peripheral buffers — DMA writes new bytes into the buffer; foreground task reads them out.
- Audio APIs. PortAudio, JACK, ALSA all expose ring buffers as the producer/consumer handoff between hardware-interrupt and application threads.
Variations
- Lock-free SPSC (single-producer-single-consumer). Use atomic head and tail with memory ordering — the producer only writes tail, the consumer only writes head, so no locking needed. Used in real-time audio and many kernel paths.
- Multi-producer / multi-consumer. Needs a lock, or a more sophisticated CAS-based design (LMAX Disruptor, DPDK's
rte_ring). Trickier. - Overwrite-on-full mode. Instead of returning false, the new push overwrites the oldest item — useful for "always have the most recent N samples" telemetry.
- Bipartite (two-region) buffer. Used when consumers want a contiguous slice — provides a
read_slice()that may span 0 or 1 wrap points.