practice · medium · from Queues

Bounded Ring Buffer (Robotics)

time
O(1)
space
O(capacity)
tags
queues · ring-buffer · robotics

in the wild Every ROS topic, every UART driver, every IMU sample stream: a fixed-capacity FIFO where producers can't be allowed to block consumers.

Problem

Implement a fixed-capacity ring buffer for a single producer / single consumer scenario. The buffer holds at most capacity items. Operations:

All operations must be O(1) with no allocations after construction. No shifting allowed.

Examples

new RingBuffer(capacity=3)
push(1) → true                 buffer: [1]            len=1
push(2) → true                 buffer: [1, 2]          len=2
push(3) → true                 buffer: [1, 2, 3]       len=3
push(4) → false (full)         buffer: [1, 2, 3]       len=3
pop()   → 1                    buffer: [2, 3]          len=2
push(4) → true   (wraps!)      buffer: [2, 3, 4]       len=3
pop()   → 2                    buffer: [3, 4]          len=2

Why this is the canonical robotics-flavored queue problem

Every sensor driver, every networking layer, every audio/video subsystem on every robot you'll work on contains a ring buffer. The pattern: producers (interrupt handlers, camera frames, IMU ticks) drop new data into the back; consumers (control loops, logging, network stacks) pull from the front. Producer and consumer often run at very different rates, on different cores, sometimes in interrupt context where blocking is forbidden.

The ring buffer hits the spec with three properties no other queue gives you:

  1. Bounded memory. Allocated once, never grows. Critical for real-time / embedded.
  2. O(1) per op with zero allocation. push and pop are pointer arithmetic plus one modular index.
  3. Wait-free in single-producer-single-consumer mode. No locks needed (with atomic indices) — the producer only writes tail, consumer only writes head, neither blocks the other.

The whole structure is just an array of size capacity plus two integers (head, tail) that advance modulo capacity. When tail == head the buffer is empty; when (tail + 1) % capacity == head it's full.

Hints

Hint 1 — array + two indices

Allocate array[capacity]. Keep head (oldest item position) and tail (next free slot). On push, write to array[tail] and advance tail. On pop, read array[head] and advance head. Both advance modulo capacity.

Hint 2 — distinguishing empty from full

If you only track head and tail, you can't tell empty (head == tail) from full (head == tail after the buffer wrapped completely). Two common fixes:

  • Sacrifice one slot. Define full as (tail + 1) % capacity == head. Capacity effectively becomes capacity - 1, but the check is branch-free.
  • Track length separately. Maintain count; empty iff count == 0, full iff count == capacity. One extra integer.
Hint 3 — wrap with modulo

After advancing an index: idx = (idx + 1) % capacity. If capacity is a power of 2, use idx & (capacity - 1) — same result, faster, no division. (This is why every kernel ring buffer rounds its capacity up to a power of 2.)

Solution

class RingBuffer:
    def __init__(self, capacity: int):
        self.buf      = [None] * capacity
        self.cap      = capacity
        self.head     = 0           # index of oldest item
        self.tail     = 0           # next write position
        self.count    = 0

    def push(self, x: int) -> bool:
        if self.count == self.cap:
            return False
        self.buf[self.tail] = x
        self.tail = (self.tail + 1) % self.cap
        self.count += 1
        return True

    def pop(self) -> int | None:
        if self.count == 0:
            return None
        x = self.buf[self.head]
        self.head = (self.head + 1) % self.cap
        self.count -= 1
        return x

    def __len__(self) -> int:
        return self.count

# Trace through the example
rb = RingBuffer(3)
for v in [1, 2, 3]:
    ok = rb.push(v)
    print(f"push {v}{ok}, len={len(rb)}")
print(f"push 4 → {rb.push(4)} (expected False — full)")
print(f"pop   → {rb.pop()}")
print(f"push 4 (wraps!) → {rb.push(4)}")
print(f"pop   → {rb.pop()}")
class RingBuffer {
  constructor(capacity) {
    this.buf   = new Array(capacity).fill(undefined);
    this.cap   = capacity;
    this.head  = 0;
    this.tail  = 0;
    this.count = 0;
  }
  push(x) {
    if (this.count === this.cap) return false;
    this.buf[this.tail] = x;
    this.tail = (this.tail + 1) % this.cap;
    this.count++;
    return true;
  }
  pop() {
    if (this.count === 0) return null;
    const x = this.buf[this.head];
    this.head = (this.head + 1) % this.cap;
    this.count--;
    return x;
  }
  get length() { return this.count; }
}

const rb = new RingBuffer(3);
for (const v of [1, 2, 3]) console.log(`push ${v} → ${rb.push(v)}, len=${rb.length}`);
console.log(`push 4 → ${rb.push(4)} (expected false — full)`);
console.log(`pop   → ${rb.pop()}`);
console.log(`push 4 (wraps!) → ${rb.push(4)}`);
console.log(`pop   → ${rb.pop()}`);
pub struct RingBuffer<T: Copy + Default> {
    buf:   Vec<T>,
    cap:   usize,
    head:  usize,
    tail:  usize,
    count: usize,
}

impl<T: Copy + Default> RingBuffer<T> {
    pub fn new(capacity: usize) -> Self {
        Self {
            buf:   vec![T::default(); capacity],
            cap:   capacity,
            head:  0, tail: 0, count: 0,
        }
    }

    pub fn push(&mut self, x: T) -> bool {
        if self.count == self.cap { return false; }
        self.buf[self.tail] = x;
        self.tail = (self.tail + 1) % self.cap;
        self.count += 1;
        true
    }

    pub fn pop(&mut self) -> Option<T> {
        if self.count == 0 { return None; }
        let x = self.buf[self.head];
        self.head = (self.head + 1) % self.cap;
        self.count -= 1;
        Some(x)
    }

    pub fn len(&self) -> usize { self.count }
}

fn main() {
    let mut rb: RingBuffer<i32> = RingBuffer::new(3);
    for v in [1, 2, 3] { println!("push {} → {}, len={}", v, rb.push(v), rb.len()); }
    println!("push 4 → {} (expected false — full)", rb.push(4));
    println!("pop   → {:?}", rb.pop());
    println!("push 4 (wraps!) → {}", rb.push(4));
    println!("pop   → {:?}", rb.pop());
}
#include <iostream>
#include <optional>
#include <vector>

template <typename T>
class RingBuffer {
    std::vector<T> buf_;
    size_t         cap_;
    size_t         head_  = 0;
    size_t         tail_  = 0;
    size_t         count_ = 0;
public:
    explicit RingBuffer(size_t capacity)
        : buf_(capacity), cap_(capacity) {}

    bool push(T x) {
        if (count_ == cap_) return false;
        buf_[tail_] = x;
        tail_ = (tail_ + 1) % cap_;
        ++count_;
        return true;
    }
    std::optional<T> pop() {
        if (count_ == 0) return std::nullopt;
        T x = buf_[head_];
        head_ = (head_ + 1) % cap_;
        --count_;
        return x;
    }
    size_t len() const { return count_; }
};

int main() {
    RingBuffer<int> rb(3);
    for (int v : {1, 2, 3})
        std::cout << "push " << v << " → " << (rb.push(v) ? "true" : "false") << ", len=" << rb.len() << "\n";
    std::cout << "push 4 → " << (rb.push(4) ? "true" : "false") << " (expected false — full)\n";
    std::cout << "pop   → " << (rb.pop() ? std::to_string(*rb.pop()) : std::string("nullopt")) << "\n";
    auto v = rb.pop();
    std::cout << "(consumed above) len=" << rb.len() << "\n";
    std::cout << "push 4 (wraps!) → " << (rb.push(4) ? "true" : "false") << "\n";
    std::cout << "pop   → " << (rb.pop() ? std::to_string(*rb.pop()) : std::string("nullopt")) << "\n";
    (void)v;
}
// The C version is what ends up in actual firmware. Static storage,
// no malloc after init, no implicit allocations. Replace int with
// your sample type (an IMU struct, an audio frame, etc.).
#include <stdio.h>
#include <stdbool.h>

#define CAP 3
static int    buf[CAP];
static int    head  = 0;
static int    tail  = 0;
static int    count = 0;

static bool push(int x) {
    if (count == CAP) return false;
    buf[tail] = x;
    tail = (tail + 1) % CAP;
    count++;
    return true;
}

// Out-param sentinel: pop returns true if a value was available.
static bool pop_(int* out) {
    if (count == 0) return false;
    *out = buf[head];
    head = (head + 1) % CAP;
    count--;
    return true;
}

int main(void) {
    for (int v = 1; v <= 3; v++)
        printf("push %d%s, len=%d\n", v, push(v) ? "true" : "false", count);
    printf("push 4 → %s (expected false — full)\n", push(4) ? "true" : "false");
    int got;
    pop_(&got); printf("pop   → %d\n", got);
    printf("push 4 (wraps!) → %s\n", push(4) ? "true" : "false");
    pop_(&got); printf("pop   → %d\n", got);
    return 0;
}

The C version is closest to what actually runs on a robot — static storage, no malloc after boot, no implicit allocations. Replace int with your sample struct (a 32-byte IMU packet, a pointer to a camera frame) and you have the kernel buffer pattern every modern operating system uses.

Complexity

Time
O(1) for push, pop, len — pure pointer arithmetic plus one modular index.
Space
O(capacity) — allocated once at construction; never grows.
Why no shifts
An array-backed FIFO that didn't wrap would have to shift every remaining element on every pop — O(n) per op. The wrap-around is what makes the ring buffer worth the bookkeeping.

In the wild

Variations