From dd0435d5630549f1d77e575d3e99b8645116492d Mon Sep 17 00:00:00 2001 From: Dominic Grimm Date: Wed, 4 Jun 2025 19:08:10 +0200 Subject: [PATCH] Fix material naming --- README.md | 8 ++-- build.zig | 1 + src/dispatchers/BlockUpdate.zig | 84 ++++++++++++++++++++++++++------- src/models/BlockUpdateMsg.zig | 7 ++- src/msg_queue/root.zig | 16 ++++++- src/root.zig | 36 ++++++++++++-- src/web/models/Block.zig | 2 +- 7 files changed, 126 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 3ce7285..dec4bca 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ zig build -Doptimize=(ReleaseSafe|ReleaseFast) -Dmaterial_max_len=(, defaul ## CLI options -- `--unix-socket`: Path to Unix socket to send block updates to +- `--unix-socket`: Path to Unix socket to bind to and to send block updates to - `--queue`: Block update queue capacity (default: `1024`) - `--web-port`: Web server port - `--web-threads`: Web server thread count (default: `1`) @@ -31,7 +31,9 @@ TODO: Classic Pixelflut TCP This application relays and batches the received block updates and sends them as CSV data to an existing Unix socket. -Create a test socket with: `socat -u UNIX-LISTEN:,reuseaddr,fork -` +Connect to the socket with: `socat - UNIX:` + +TODO: Accept multiple clients ### Payload @@ -42,4 +44,4 @@ dimension,x,y,z,material ,,,, ``` -E.g.: `0,42,42,42,dirt` puts a dirt block in dimension `0` (Overworld) at coordinates `42,42,42`. +E.g.: `0,42,42,42,DIRT` puts a dirt block in dimension `0` (Overworld) at coordinates `42,42,42`. diff --git a/build.zig b/build.zig index 414a426..c06cb84 100644 --- a/build.zig +++ b/build.zig @@ -1,6 +1,7 @@ const std = @import("std"); const material_max_len_default: usize = 40; +const dimension_max_len_default: usize = 16; pub fn build(b: *std.Build) void { const target = b.standardTargetOptions(.{}); diff --git a/src/dispatchers/BlockUpdate.zig b/src/dispatchers/BlockUpdate.zig index 38d71d3..14336b4 100644 --- a/src/dispatchers/BlockUpdate.zig +++ b/src/dispatchers/BlockUpdate.zig @@ -21,40 +21,92 @@ config: *const Config, /// Block update queue queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg), -/// Payload stream resource -stream: ?std.net.Stream = null, +/// Data server +server: *std.net.Server, pub fn init( allocator: std.mem.Allocator, config: *const Config, queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg), - stream: std.net.Stream, + server: *std.net.Server, ) Self { return .{ .allocator = allocator, .config = config, .queue = queue, - .stream = stream, + .server = server, }; } -/// Sends one block update payload to the stream, terminates it with a newline. -fn send(self: *Self, update: *const models.BlockUpdateMsg) !void { +/// Sends one block update payload to a stream, terminates it with a newline. +fn send(stream: std.net.Stream, update: *const models.BlockUpdateMsg) !void { var buf: [models.BlockUpdateMsg.csv_max_len]u8 = undefined; - const stream = self.stream.?; const csv = try update.toCsv(&buf); - try stream.writeAll(csv); - try stream.writeAll(&.{'\n'}); + stream.writeAll(csv) catch return; + stream.writeAll(&.{'\n'}) catch return; } -/// Starts dispatcher. Receives new queue entries and sends them to the stream. -pub fn start(self: *Self) !void { - // buffer for `tryDequeueAll` with maximum queue capacity as size - const buf = try self.allocator.alloc(models.BlockUpdateMsg, self.queue.capacity()); - defer self.allocator.free(buf); +fn streamListener(self: *Self, stream: std.net.Stream, running: *std.atomic.Value(bool)) void { + var buf: [512]u8 = undefined; + while (running.load(.monotonic)) { + const n = stream.read(&buf) catch 0; + std.log.info("n: {}", .{n}); + if (n < buf.len) break; + } + std.log.info("terminating stream listener", .{}); + + running.store(false, .release); + self.queue.cond.broadcast(); +} + +pub fn start(self: *Self) !void { while (true) { - const updates = try self.queue.tryDequeueAll(buf); - for (updates) |*update| try self.send(update); + const conn = try self.server.accept(); + log.info("Client connected", .{}); + defer conn.stream.close(); + + // buffer for `tryDequeueAll` with maximum queue capacity as size + const buf = try self.allocator.alloc(models.BlockUpdateMsg, self.queue.capacity()); + defer self.allocator.free(buf); + + var running = std.atomic.Value(bool).init(true); + defer running.store(false, .release); + _ = try std.Thread.spawn(.{}, streamListener, .{ self, conn.stream, &running }); + + blk: { + while (true) { + const updates = try self.queue.dequeueAll(buf); + if (!running.load(.monotonic)) break :blk; + for (updates) |*update| { + if (running.load(.monotonic)) { + try send(conn.stream, update); + } else { + break :blk; + } + } + } + } } } + +// /// Sends one block update payload to the stream, terminates it with a newline. +// fn send(self: *Self, update: *const models.BlockUpdateMsg) !void { +// var buf: [models.BlockUpdateMsg.csv_max_len]u8 = undefined; +// const stream = self.stream.?; +// const csv = try update.toCsv(&buf); +// try stream.writeAll(csv); +// try stream.writeAll(&.{'\n'}); +// } + +// /// Starts dispatcher. Receives new queue entries and sends them to the stream. +// pub fn start(self: *Self) !void { +// // buffer for `tryDequeueAll` with maximum queue capacity as size +// const buf = try self.allocator.alloc(models.BlockUpdateMsg, self.queue.capacity()); +// defer self.allocator.free(buf); + +// while (true) { +// const updates = try self.queue.tryDequeueAll(buf); +// for (updates) |*update| try self.send(update); +// } +// } diff --git a/src/models/BlockUpdateMsg.zig b/src/models/BlockUpdateMsg.zig index 741f0fe..06e0b08 100644 --- a/src/models/BlockUpdateMsg.zig +++ b/src/models/BlockUpdateMsg.zig @@ -8,7 +8,7 @@ pub const Dimension = i32; pub const Coordinate = i32; /// Maximum length of a material name -pub const material_max_len = build_options.material_max_len; +pub const material_max_len: usize = build_options.material_max_len; /// Maximum CSV payload size for a block update pub const csv_max_len: usize = blk: { @@ -38,11 +38,14 @@ material_len: usize = 0, /// Converts block update message to CSV payload /// Format: *`,,,,`* (`,,,,`) pub fn toCsv(self: *const Self, buf: *[csv_max_len]u8) ![]u8 { + var material_upper_buf: [material_max_len]u8 = undefined; + const material_upper = std.ascii.upperString(&material_upper_buf, self.material[0..self.material_len]); + return try std.fmt.bufPrint(buf, "{d},{d},{d},{d},{s}", .{ self.dimension, self.x, self.y, self.z, - self.material[0..self.material_len], + material_upper, }); } diff --git a/src/msg_queue/root.zig b/src/msg_queue/root.zig index d55e09b..acc336b 100644 --- a/src/msg_queue/root.zig +++ b/src/msg_queue/root.zig @@ -23,12 +23,12 @@ pub fn MsgQueueUnmanaged(comptime T: type) type { pub fn tryEnqueue(self: *Self, value: T) !void { try self.queue.tryEnqueue(value); - self.cond.signal(); + self.cond.broadcast(); } pub fn enqueue(self: *Self, value: T) void { self.queue.enqueue(value); - self.cond.signal(); + self.cond.broadcast(); } pub fn tryDequeue(self: *Self) !T { @@ -49,6 +49,18 @@ pub fn MsgQueueUnmanaged(comptime T: type) type { pub fn tryDequeueAll(self: *Self, buf: []T) ![]T { return try self.queue.tryDequeueAll(buf); } + + pub fn dequeueAll(self: *Self, buf: []T) ![]T { + var m = std.Thread.Mutex{}; + m.lock(); + defer m.unlock(); + + while (true) { + const slice = try self.queue.tryDequeueAll(buf); + if (slice.len > 0) return slice; + self.cond.wait(&m); + } + } }; } diff --git a/src/root.zig b/src/root.zig index f0b7d4c..7324171 100644 --- a/src/root.zig +++ b/src/root.zig @@ -19,6 +19,30 @@ pub const Config = struct { web: web.Config, }; +fn openUnixSocketServer(socket_path: []const u8, max_clients: u31) !std.net.Server { + errdefer std.posix.unlink(socket_path) catch {}; + + const sockfd = try std.posix.socket( + std.posix.AF.UNIX, + std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC, + 0, + ); + const stream: std.net.Stream = .{ .handle = sockfd }; + errdefer stream.close(); + + const addr = try std.net.Address.initUnix(socket_path); + const sockaddr_ptr: *const std.posix.sockaddr = @ptrCast(&addr.un); + try std.posix.bind(sockfd, sockaddr_ptr, addr.getOsSockLen()); + try std.posix.listen(sockfd, max_clients); + + const server: std.net.Server = .{ + .listen_address = addr, + .stream = stream, + }; + + return server; +} + /// Block update dispatch wrapper. Panics if error occurs. fn receiver(dispatcher: *dispatchers.BlockUpdate) void { dispatcher.start() catch unreachable; @@ -28,9 +52,13 @@ fn receiver(dispatcher: *dispatchers.BlockUpdate) void { pub fn start(allocator: std.mem.Allocator, config: *const Config) !void { log.info("Starting craftflut gateway", .{}); - log.info("Opening Unix socket: {s}", .{config.unix_socket_path}); - var stream = try std.net.connectUnixSocket(config.unix_socket_path); - defer stream.close(); + log.info("Opening data Unix socket: {s}", .{config.unix_socket_path}); + std.posix.unlink(config.unix_socket_path) catch {}; + var server = try openUnixSocketServer(config.unix_socket_path, 1); + defer { + server.deinit(); + std.posix.unlink(config.unix_socket_path) catch {}; + } var queue = try msg_queue.MsgQueueManaged(models.BlockUpdateMsg) .init(allocator, config.dispatcher.capacity); @@ -40,7 +68,7 @@ pub fn start(allocator: std.mem.Allocator, config: *const Config) !void { allocator, &config.dispatcher, queue.unmanaged(), - stream, + &server, ); log.info("Starting dispatcher", .{}); _ = try std.Thread.spawn(.{}, receiver, .{&dispatcher}); diff --git a/src/web/models/Block.zig b/src/web/models/Block.zig index d9dbc16..4bc789d 100644 --- a/src/web/models/Block.zig +++ b/src/web/models/Block.zig @@ -11,7 +11,7 @@ material: []const u8, /// Checks if material name characters are valid. Doesn't check if maximum length is exceeded. pub fn materialIsValid(material: []const u8) bool { for (material) |ch| { - if (!std.ascii.isAlphabetic(ch)) + if (!std.ascii.isAlphabetic(ch) and ch != '_' and ch != '-') return false; }