This commit is contained in:
Dominic Grimm 2025-06-01 12:33:53 +02:00
commit e70a04ac16
No known key found for this signature in database
19 changed files with 576 additions and 0 deletions

View file

0
src/dispatchers/root.zig Normal file
View file

24
src/main.zig Normal file
View file

@ -0,0 +1,24 @@
const std = @import("std");
const builtin = @import("builtin");
const craftflut = @import("craftflut");
pub const std_options: std.Options = .{
.log_level = if (builtin.mode == .Debug) .debug else .info,
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{
.thread_safe = true,
}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
{
const config = craftflut.Config{
.web_port = 3000,
.web_threads = 16,
};
try craftflut.start(allocator, &config);
}
}

152
src/mpsc.zig Normal file
View file

@ -0,0 +1,152 @@
const std = @import("std");
pub fn Slot(comptime T: type) type {
return struct {
value: T,
version: std.atomic.Value(usize),
};
}
pub fn RingBuffer(comptime T: type) type {
return struct {
const Self = @This();
capacity: usize,
buffer: []Slot(T),
head: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
tail: usize = 0,
pub fn init(buffer: []Slot(T)) Self {
// var buffer: [capacity]Slot(T) = undefined;
// {
// @setEvalBranchQuota(capacity * 4);
// for (0..capacity) |i| {
// buffer[i].version = std.atomic.Value(usize).init(i);
// }
// }
for (buffer, 0..) |*slot, i| {
slot.version = std.atomic.Value(usize).init(i);
}
return .{
.capacity = buffer.len,
.buffer = buffer,
};
}
pub fn enqueue(self: *Self, value: T) error{Overflow}!void {
while (true) {
const head = self.head.load(.acquire);
const index = head % self.capacity;
const slot = &self.buffer[index];
const expected_version = head;
if (slot.version.load(.acquire) != expected_version) {
return error.Overflow;
}
if (self.head.cmpxchgStrong(
head,
head + 1,
.seq_cst,
.seq_cst,
)) |_| {
std.atomic.spinLoopHint();
continue;
}
slot.value = value;
slot.version.store(expected_version + 1, .release);
return;
}
}
pub fn dequeue(self: *Self) error{Underflow}!T {
const tail = self.tail;
const index = tail % self.capacity;
const slot = &self.buffer[index];
const expected_version = tail + 1;
if (slot.version.load(.acquire) != expected_version) {
return error.Underflow;
}
const value = slot.value;
slot.version.store(tail +% self.capacity, .release);
self.tail +%= 1;
return value;
}
};
}
// pub fn RingBuffer(comptime T: type) type {
// return struct {
// const Self = @This();
// capacity: usize,
// buffer: []Slot(T),
// head: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
// tail: usize = 0,
// pub fn init(buffer: []Slot(T)) Self {
// // var buffer: [capacity]Slot(T) = undefined;
// // {
// // @setEvalBranchQuota(capacity * 4);
// // for (0..capacity) |i| {
// // buffer[i].version = std.atomic.Value(usize).init(i);
// // }
// // }
// for (buffer, 0..) |*slot, i| {
// slot.value = std.atomic.Value(usize).init(i);
// }
// return .{
// .capacity = buffer.len,
// .buffer = buffer,
// };
// }
// pub fn enqueue(self: *Self, value: T) error{Overflow}!void {
// while (true) {
// const head = self.head.load(.acquire);
// const index = head % self.capacity;
// const slot = &self.buffer[index];
// const expected_version = head;
// if (slot.version.load(.acquire) != expected_version) {
// return error.Overflow;
// }
// if (self.head.cmpxchgStrong(
// head,
// head + 1,
// .seq_cst,
// .seq_cst,
// )) |_| {
// std.atomic.spinLoopHint();
// continue;
// }
// slot.value = value;
// slot.version.store(expected_version + 1, .release);
// return;
// }
// }
// pub fn dequeue(self: *Self) error{Underflow}!T {
// const tail = self.tail;
// const index = tail % self.capacity;
// const slot = &self.buffer[index];
// const expected_version = tail + 1;
// if (slot.version.load(.acquire) != expected_version) {
// return error.Underflow;
// }
// const value = slot.value;
// slot.version.store(tail +% self.capacity, .release);
// self.tail +%= 1;
// return value;
// }
// };
// }

View file

@ -0,0 +1,10 @@
pub const BlockUpdate = struct {
pub const material_len_max = 32;
dimension: u8,
x: i32,
y: i32,
z: i32,
material: [material_len_max]u8 = undefined,
material_len: usize = 0,
};

35
src/msg_queue/queue.zig Normal file
View file

@ -0,0 +1,35 @@
const std = @import("std");
const mpsc = @import("../mpsc.zig");
pub fn Queue(comptime T: type) type {
return struct {
const Self = @This();
ring_buffer: mpsc.RingBuffer(T),
pub fn init(buffer: []mpsc.Slot(T)) Self {
return .{
.ring_buffer = mpsc.RingBuffer(T).init(buffer),
};
}
pub fn blockingEnqueue(self: *Self, value: T) void {
while (true) {
self.ring_buffer.enqueue(value) catch |err| switch (err) {
error.Overflow => {
std.atomic.spinLoopHint();
continue;
},
};
return;
}
}
pub fn dequeue(self: *Self) ?T {
return self.ring_buffer.dequeue() catch |err| switch (err) {
error.Underflow => return null,
};
}
};
}

78
src/msg_queue/root.zig Normal file
View file

@ -0,0 +1,78 @@
const std = @import("std");
const mpsc = @import("../mpsc.zig");
const queue = @import("queue.zig");
pub const messages = @import("messages.zig");
pub fn MsgQueueUnmanaged(comptime T: type) type {
return struct {
const Self = @This();
queue: queue.Queue(T),
// consumer_mutex: std.Thread.Mutex = std.Thread.Mutex{},
cond: std.Thread.Condition = std.Thread.Condition{},
pub fn init(buffer: []mpsc.Slot(T)) Self {
return .{
.queue = queue.Queue(T).init(buffer),
};
}
pub fn enqueue(self: *Self, value: T) void {
self.queue.blockingEnqueue(value);
std.log.info("enqueue: SENDING signal", .{});
self.cond.signal();
}
pub fn dequeue(self: *Self) T {
var m = std.Thread.Mutex{};
m.lock();
defer m.unlock();
while (true) {
if (self.queue.dequeue()) |val| return val;
std.log.info("dequeue: STARTING consumer condition wait", .{});
self.cond.wait(&m);
std.log.info("dequeue: RECEIVED signal", .{});
}
}
};
}
pub fn MsgQueueManaged(comptime T: type) type {
return struct {
const Self = @This();
allocator: std.mem.Allocator,
buffer: []mpsc.Slot(T),
msg_queue: MsgQueueUnmanaged(T),
pub fn init(allocator: std.mem.Allocator, capacity: usize) !Self {
const buffer = try allocator.alloc(mpsc.Slot(T), capacity);
errdefer allocator.free(buffer);
return .{
.allocator = allocator,
.buffer = buffer,
.msg_queue = MsgQueueUnmanaged(T).init(buffer),
};
}
pub fn deinit(self: *Self) void {
self.allocator.free(self.buffer);
}
pub fn unmanaged(self: *Self) *MsgQueueUnmanaged(T) {
return &self.msg_queue;
}
pub fn enqueue(self: *Self, value: T) void {
self.msg_queue.enqueue(value);
}
pub fn dequeue(self: *Self) T {
return self.msg_queue.dequeue();
}
};
}

28
src/root.zig Normal file
View file

@ -0,0 +1,28 @@
const std = @import("std");
const msg_queue = @import("msg_queue/root.zig");
const dispatchers = @import("dispatchers/root.zig");
const web = @import("web/root.zig");
pub const Config = struct {
web_port: u16,
web_threads: i16,
};
fn receiver(queue: *msg_queue.MsgQueueUnmanaged(msg_queue.messages.BlockUpdate)) void {
while (true) {
const packet = queue.dequeue();
std.log.info("packet: {}", .{packet});
}
}
pub fn start(allocator: std.mem.Allocator, config: *const Config) !void {
// var update_queue = UpdateQueue.init();
const capacity: usize = 1024;
var queue = try msg_queue.MsgQueueManaged(msg_queue.messages.BlockUpdate).init(allocator, capacity);
defer queue.deinit();
_ = try std.Thread.spawn(.{}, receiver, .{queue.unmanaged()});
try web.start(allocator, config.web_threads, config.web_port, queue.unmanaged());
}

View file

@ -0,0 +1,89 @@
const std = @import("std");
const zap = @import("zap");
const models = @import("../models/root.zig");
const stores = @import("../stores/root.zig");
const msg_queue = @import("../../msg_queue/root.zig");
const Self = @This();
pub const default_path = "block";
allocator: std.mem.Allocator,
block_update_queue: *const stores.BlockUpdateQueue,
path: []const u8,
error_strategy: zap.Endpoint.ErrorStrategy = .log_to_console,
pub fn init(allocator: std.mem.Allocator, block_update_queue: *const stores.BlockUpdateQueue, path: []const u8) Self {
return .{
.allocator = allocator,
.block_update_queue = block_update_queue,
.path = path,
};
}
pub fn get(_: *Self, r: zap.Request) !void {
r.setStatus(.method_not_allowed);
r.markAsFinished(true);
}
pub fn post(_: *Self, r: zap.Request) !void {
r.setStatus(.method_not_allowed);
r.markAsFinished(true);
}
pub fn put(self: *Self, r: zap.Request) !void {
blk: {
if (r.body) |body| {
const maybe_block: ?std.json.Parsed(models.Block) = std.json.parseFromSlice(models.Block, self.allocator, body, .{}) catch null;
if (maybe_block) |parsed| {
defer parsed.deinit();
const block = parsed.value;
if (block.material.len > msg_queue.messages.BlockUpdate.material_len_max or !models.Block.materialIsValid(block.material)) {
break :blk;
}
std.log.info("block: {}", .{block});
var msg = msg_queue.messages.BlockUpdate{
.dimension = block.dimension,
.x = block.x,
.y = block.y,
.z = block.z,
};
std.mem.copyForwards(u8, &msg.material, block.material);
msg.material_len = block.material.len;
self.block_update_queue.queue.enqueue(msg);
r.setStatus(.created);
r.markAsFinished(true);
return;
}
}
}
r.setStatus(.bad_request);
r.markAsFinished(true);
}
pub fn delete(_: *Self, r: zap.Request) !void {
r.setStatus(.method_not_allowed);
r.markAsFinished(true);
}
pub fn patch(_: *Self, r: zap.Request) !void {
r.setStatus(.method_not_allowed);
r.markAsFinished(true);
}
pub fn options(_: *Self, r: zap.Request) !void {
try r.setHeader("Access-Control-Allow-Origin", "*");
try r.setHeader("Access-Control-Allow-Methods", "PUT, OPTIONS, HEAD");
r.setStatus(zap.http.StatusCode.no_content);
r.markAsFinished(true);
}
pub fn head(_: *Self, r: zap.Request) !void {
r.setStatus(zap.http.StatusCode.no_content);
r.markAsFinished(true);
}

View file

@ -0,0 +1 @@
pub const Block = @import("Block.zig");

16
src/web/models/Block.zig Normal file
View file

@ -0,0 +1,16 @@
const std = @import("std");
dimension: u8,
x: i32,
y: i32,
z: i32,
material: []const u8,
pub fn materialIsValid(material: []const u8) bool {
for (material) |ch| {
if (!std.ascii.isAlphabetic(ch))
return false;
}
return true;
}

1
src/web/models/root.zig Normal file
View file

@ -0,0 +1 @@
pub const Block = @import("Block.zig");

41
src/web/root.zig Normal file
View file

@ -0,0 +1,41 @@
const std = @import("std");
const zap = @import("zap");
const msg_queue = @import("../msg_queue/root.zig");
const models = @import("models/root.zig");
const endpoints = @import("endpoints/root.zig");
const stores = @import("stores/root.zig");
fn onRequest(r: zap.Request) !void {
_ = r;
}
pub fn start(allocator: std.mem.Allocator, threads: i16, port: u16, queue: *msg_queue.MsgQueueUnmanaged(msg_queue.messages.BlockUpdate)) !void {
var listener = zap.Endpoint.Listener.init(allocator, .{
.port = port,
.on_request = onRequest,
.log = true,
});
defer listener.deinit();
const block_update_queue_store = stores.BlockUpdateQueue{
.queue = queue,
};
var block_endpoint = endpoints.Block.init(
allocator,
&block_update_queue_store,
std.fmt.comptimePrint("/api/{s}", .{endpoints.Block.default_path}),
);
try listener.register(&block_endpoint);
try listener.listen();
std.log.info("Listening on 0.0.0.0:{d}", .{port});
zap.start(.{
.threads = threads,
.workers = 1,
});
}

View file

@ -0,0 +1,3 @@
const msg_queue = @import("../../msg_queue/root.zig");
queue: *msg_queue.MsgQueueUnmanaged(msg_queue.messages.BlockUpdate),

1
src/web/stores/root.zig Normal file
View file

@ -0,0 +1 @@
pub const BlockUpdateQueue = @import("BlockUpdateQueue.zig");