eink-feed/src/eink_feed_server/mpsc.zig
2025-06-19 16:55:59 +02:00

120 lines
3.8 KiB
Zig

const std = @import("std");
/// Generic slot with value type `T` for a ring buffer
pub fn Slot(comptime T: type) type {
return struct {
/// Value
value: T,
/// Slot version
version: std.atomic.Value(usize),
};
}
/// Ring buffer with value type `T` on a slice of slots
pub fn RingBuffer(comptime T: type) type {
return struct {
const Self = @This();
/// Ring buffer capacity
capacity: usize,
/// Buffer slice
buffer: []Slot(T),
/// Atomic head index of buffer (write index)
head: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
/// Tail index of buffer (read index)
tail: usize = 0,
/// Initializes ring buffer with a given buffer, throws `error.CapacityTooSmall` if the buffer capacity is less or equal than 1.
pub fn init(buffer: []Slot(T)) error{CapacityTooSmall}!Self {
if (buffer.len <= 1) {
return error.CapacityTooSmall;
}
for (buffer, 0..) |*slot, i| {
slot.version = std.atomic.Value(usize).init(i);
}
return .{
.capacity = buffer.len,
.buffer = buffer,
};
}
/// Enqueues element. Throws `error.Overflow` if no more elements fit.
pub fn enqueue(self: *Self, value: T) error{Overflow}!void {
while (true) {
// Acquire write slot
const head = self.head.load(.acquire);
const index = head % self.capacity;
const slot = &self.buffer[index];
// Check if slot has been read (empty)
const expected_version = head;
if (slot.version.load(.acquire) != expected_version) {
return error.Overflow;
}
// Compare and swap head index
if (self.head.cmpxchgStrong(
head,
head + 1,
.seq_cst,
.seq_cst,
)) |_| {
std.atomic.spinLoopHint(); // Retry again next cycle
continue;
}
// Slot versioning
slot.value = value;
slot.version.store(expected_version + 1, .release);
return;
}
}
/// Dequeues element. Throws `error.Underflow` if ring buffer is empty.
pub fn dequeue(self: *Self) error{Underflow}!T {
// Acquire read slot
const tail = self.tail;
const index = tail % self.capacity;
const slot = &self.buffer[index];
// Check is slot has been written to (full)
const expected_version = tail + 1;
if (slot.version.load(.acquire) != expected_version) {
return error.Underflow;
}
// Slot versioning
const value = slot.value;
slot.version.store(tail +% self.capacity, .release);
self.tail +%= 1;
return value;
}
/// Dequeues all stores elements into a buffer. It must at least have a size of the buffer capacity.
pub fn dequeueAll(self: *Self, buf: []T) ![]T {
if (buf.len < self.capacity) {
return error.BufferTooSmall;
}
const head = self.head.load(.acquire) % self.capacity;
var tail = self.tail % self.capacity;
var i: usize = 0;
while (true) {
const val = self.dequeue() catch break;
buf[i] = val;
i += 1;
tail = (tail + 1) % self.capacity;
if (tail == head) break;
}
return buf[0..i];
}
};
}