diff --git a/README.md b/README.md new file mode 100644 index 0000000..3ce7285 --- /dev/null +++ b/README.md @@ -0,0 +1,45 @@ +# craftflut + +Alternative gateway implementation for https://codeberg.org/MoKe12g/Pixelflut3D + +## Building + +A minimum Zig version of 0.14.0 is required. + +``` +zig build -Doptimize=(ReleaseSafe|ReleaseFast) -Dmaterial_max_len=(, default: 40) +``` + +## CLI options + +- `--unix-socket`: Path to Unix socket to send block updates to +- `--queue`: Block update queue capacity (default: `1024`) +- `--web-port`: Web server port +- `--web-threads`: Web server thread count (default: `1`) + +## API + +TODO: Classic Pixelflut TCP + +### HTTP + +| Path | Method | Headers | Body | Response | Description | +| ------------ | ------ | ------- | -------------------------------------------------------------------- | ------------- | --------------------------- | +| `/api/block` | `PUT` | | `{"dimension":,"x":,"y":,"z":,"material":}` | `201 Created` | Enqueues given block update | + +## Gateway + +This application relays and batches the received block updates and sends them as CSV data to an existing Unix socket. + +Create a test socket with: `socat -u UNIX-LISTEN:,reuseaddr,fork -` + +### Payload + +Each packet is seperated by a newline (`\n`). + +``` +dimension,x,y,z,material +,,,, +``` + +E.g.: `0,42,42,42,dirt` puts a dirt block in dimension `0` (Overworld) at coordinates `42,42,42`. diff --git a/build.zig b/build.zig index ad56e19..b8780dd 100644 --- a/build.zig +++ b/build.zig @@ -1,14 +1,26 @@ const std = @import("std"); +const material_max_len_default: usize = 40; + pub fn build(b: *std.Build) void { const target = b.standardTargetOptions(.{}); const optimize = b.standardOptimizeOption(.{}); - const zap_dep = b.dependency("zap", .{}); + const material_max_len = b.option( + usize, + "material-max-len", + "Maximum length a material name can have", + ) orelse material_max_len_default; + const lib_options = b.addOptions(); + lib_options.addOption(usize, "material_max_len", material_max_len); + + const zap_dep = b.dependency("zap", .{ .openssl = false }); + const clap_dep = b.dependency("clap", .{}); const lib_mod = b.addModule("craftflut", .{ .root_source_file = b.path("src/root.zig"), }); + lib_mod.addOptions("build_options", lib_options); lib_mod.addImport("zap", zap_dep.module("zap")); const exe_mod = b.createModule(.{ @@ -17,6 +29,7 @@ pub fn build(b: *std.Build) void { .optimize = optimize, }); exe_mod.addImport("craftflut", lib_mod); + exe_mod.addImport("clap", clap_dep.module("clap")); const exe = b.addExecutable(.{ .name = "craftflut", diff --git a/build.zig.zon b/build.zig.zon index d4b5369..1a6592b 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -9,6 +9,10 @@ .url = "git+https://github.com/zigzap/zap.git#ec7cac6f6ab8e1892fe6fc499fd37cd93f7b2256", .hash = "zap-0.9.1-GoeB85JTJAADY1vAnA4lTuU66t6JJiuhGos5ex6CpifA", }, + .clap = .{ + .url = "git+https://github.com/Hejsil/zig-clap?ref=0.10.0#e47028deaefc2fb396d3d9e9f7bd776ae0b2a43a", + .hash = "clap-0.10.0-oBajB434AQBDh-Ei3YtoKIRxZacVPF1iSwp3IX_ZB8f0", + }, }, .paths = .{ "build.zig", diff --git a/src/dispatchers/BlockUpdate.zig b/src/dispatchers/BlockUpdate.zig index e69de29..8cef5fa 100644 --- a/src/dispatchers/BlockUpdate.zig +++ b/src/dispatchers/BlockUpdate.zig @@ -0,0 +1,50 @@ +const std = @import("std"); + +const msg_queue = @import("../msg_queue/root.zig"); +const models = @import("../models/root.zig"); + +const Self = @This(); + +const log = std.log.scoped(.block_update_dispatcher); + +pub const Config = struct { + capacity: usize, +}; + +allocator: std.mem.Allocator, + +config: *const Config, +queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg), +stream: ?std.net.Stream = null, + +pub fn init( + allocator: std.mem.Allocator, + config: *const Config, + queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg), + stream: std.net.Stream, +) Self { + return .{ + .allocator = allocator, + .config = config, + .queue = queue, + .stream = stream, + }; +} + +fn send(self: *Self, update: *const models.BlockUpdateMsg) !void { + var buf: [models.BlockUpdateMsg.csv_max_len]u8 = undefined; + const stream = self.stream.?; + const csv = try update.toCsv(&buf); + try stream.writeAll(csv); + try stream.writeAll(&.{'\n'}); +} + +pub fn start(self: *Self) !void { + const buf = try self.allocator.alloc(models.BlockUpdateMsg, self.queue.capacity()); + defer self.allocator.free(buf); + + while (true) { + const updates = try self.queue.tryDequeueAll(buf); + for (updates) |*update| try self.send(update); + } +} diff --git a/src/dispatchers/root.zig b/src/dispatchers/root.zig index e69de29..ccbedae 100644 --- a/src/dispatchers/root.zig +++ b/src/dispatchers/root.zig @@ -0,0 +1 @@ +pub const BlockUpdate = @import("BlockUpdate.zig"); diff --git a/src/main.zig b/src/main.zig index b77b2d0..0fd1af7 100644 --- a/src/main.zig +++ b/src/main.zig @@ -1,5 +1,6 @@ const std = @import("std"); const builtin = @import("builtin"); +const clap = @import("clap"); const craftflut = @import("craftflut"); @@ -15,9 +16,42 @@ pub fn main() !void { const allocator = gpa.allocator(); { + const params = comptime clap.parseParamsComptime( + \\-h, --help Display this help and exit. + \\--unix-socket Path to Unix socket to send block updates to. + \\--queue Block update queue capacity. + \\--web-port Web server port. + \\--web-threads Web server thread count. + ); + + var diag = clap.Diagnostic{}; + var res = clap.parse(clap.Help, ¶ms, clap.parsers.default, .{ + .diagnostic = &diag, + .allocator = allocator, + }) catch |err| { + diag.report(std.io.getStdErr().writer(), err) catch {}; + return err; + }; + defer res.deinit(); + + if (res.args.help != 0 or + res.args.@"unix-socket" == null or + res.args.@"web-port" == null) + { + const writer = std.io.getStdErr().writer(); + try writer.writeAll("craftflut\n"); + return clap.help(writer, clap.Help, ¶ms, .{}); + } + const config = craftflut.Config{ - .web_port = 3000, - .web_threads = 16, + .unix_socket_path = res.args.@"unix-socket".?, + .dispatcher = .{ + .capacity = res.args.queue orelse 1024, + }, + .web = .{ + .port = res.args.@"web-port".?, + .threads = res.args.@"web-threads" orelse 1, + }, }; try craftflut.start(allocator, &config); } diff --git a/src/models/BlockUpdateMsg.zig b/src/models/BlockUpdateMsg.zig new file mode 100644 index 0000000..9fbbdb6 --- /dev/null +++ b/src/models/BlockUpdateMsg.zig @@ -0,0 +1,39 @@ +const std = @import("std"); + +const build_options = @import("build_options"); + +const Self = @This(); + +pub const Dimension = i32; +pub const Coordinate = i32; + +pub const material_max_len = build_options.material_max_len; +pub const csv_max_len: usize = blk: { + const str = std.fmt.comptimePrint( + "-{d},-{d},-{d},-{d},", + .{ + std.math.maxInt(Dimension), + std.math.maxInt(Coordinate), + std.math.maxInt(Coordinate), + std.math.maxInt(Coordinate), + }, + ); + break :blk str.len + material_max_len; +}; + +dimension: Dimension, +x: Coordinate, +y: Coordinate, +z: Coordinate, +material: [material_max_len]u8 = undefined, +material_len: usize = 0, + +pub fn toCsv(self: *const Self, buf: *[csv_max_len]u8) ![]u8 { + return try std.fmt.bufPrint(buf, "{d},{d},{d},{d},{s}", .{ + self.dimension, + self.x, + self.y, + self.z, + self.material[0..self.material_len], + }); +} diff --git a/src/models/root.zig b/src/models/root.zig new file mode 100644 index 0000000..5bf37d4 --- /dev/null +++ b/src/models/root.zig @@ -0,0 +1 @@ +pub const BlockUpdateMsg = @import("BlockUpdateMsg.zig"); diff --git a/src/mpsc.zig b/src/mpsc.zig index 42d692c..5b5439c 100644 --- a/src/mpsc.zig +++ b/src/mpsc.zig @@ -16,14 +16,11 @@ pub fn RingBuffer(comptime T: type) type { head: std.atomic.Value(usize) = std.atomic.Value(usize).init(0), tail: usize = 0, - pub fn init(buffer: []Slot(T)) Self { - // var buffer: [capacity]Slot(T) = undefined; - // { - // @setEvalBranchQuota(capacity * 4); - // for (0..capacity) |i| { - // buffer[i].version = std.atomic.Value(usize).init(i); - // } - // } + pub fn init(buffer: []Slot(T)) error{CapacityTooSmall}!Self { + if (buffer.len <= 1) { + return error.CapacityTooSmall; + } + for (buffer, 0..) |*slot, i| { slot.version = std.atomic.Value(usize).init(i); } @@ -76,77 +73,25 @@ pub fn RingBuffer(comptime T: type) type { self.tail +%= 1; return value; } + + pub fn dequeueAll(self: *Self, buf: []T) ![]T { + if (buf.len < self.capacity) { + return error.BufferTooSmall; + } + + const head = self.head.load(.acquire) % self.capacity; + var tail = self.tail % self.capacity; + var i: usize = 0; + + while (true) { + const val = self.dequeue() catch break; + buf[i] = val; + i += 1; + tail = (tail + 1) % self.capacity; + if (tail == head) break; + } + + return buf[0..i]; + } }; } - -// pub fn RingBuffer(comptime T: type) type { -// return struct { -// const Self = @This(); - -// capacity: usize, -// buffer: []Slot(T), -// head: std.atomic.Value(usize) = std.atomic.Value(usize).init(0), -// tail: usize = 0, - -// pub fn init(buffer: []Slot(T)) Self { -// // var buffer: [capacity]Slot(T) = undefined; -// // { -// // @setEvalBranchQuota(capacity * 4); -// // for (0..capacity) |i| { -// // buffer[i].version = std.atomic.Value(usize).init(i); -// // } -// // } -// for (buffer, 0..) |*slot, i| { -// slot.value = std.atomic.Value(usize).init(i); -// } - -// return .{ -// .capacity = buffer.len, -// .buffer = buffer, -// }; -// } - -// pub fn enqueue(self: *Self, value: T) error{Overflow}!void { -// while (true) { -// const head = self.head.load(.acquire); -// const index = head % self.capacity; -// const slot = &self.buffer[index]; - -// const expected_version = head; -// if (slot.version.load(.acquire) != expected_version) { -// return error.Overflow; -// } - -// if (self.head.cmpxchgStrong( -// head, -// head + 1, -// .seq_cst, -// .seq_cst, -// )) |_| { -// std.atomic.spinLoopHint(); -// continue; -// } - -// slot.value = value; -// slot.version.store(expected_version + 1, .release); -// return; -// } -// } - -// pub fn dequeue(self: *Self) error{Underflow}!T { -// const tail = self.tail; -// const index = tail % self.capacity; -// const slot = &self.buffer[index]; - -// const expected_version = tail + 1; -// if (slot.version.load(.acquire) != expected_version) { -// return error.Underflow; -// } - -// const value = slot.value; -// slot.version.store(tail +% self.capacity, .release); -// self.tail +%= 1; -// return value; -// } -// }; -// } diff --git a/src/msg_queue/messages.zig b/src/msg_queue/messages.zig deleted file mode 100644 index 282f62e..0000000 --- a/src/msg_queue/messages.zig +++ /dev/null @@ -1,10 +0,0 @@ -pub const BlockUpdate = struct { - pub const material_len_max = 32; - - dimension: u8, - x: i32, - y: i32, - z: i32, - material: [material_len_max]u8 = undefined, - material_len: usize = 0, -}; diff --git a/src/msg_queue/queue.zig b/src/msg_queue/queue.zig index b56798e..6da2369 100644 --- a/src/msg_queue/queue.zig +++ b/src/msg_queue/queue.zig @@ -8,13 +8,21 @@ pub fn Queue(comptime T: type) type { ring_buffer: mpsc.RingBuffer(T), - pub fn init(buffer: []mpsc.Slot(T)) Self { + pub fn init(buffer: []mpsc.Slot(T)) !Self { return .{ - .ring_buffer = mpsc.RingBuffer(T).init(buffer), + .ring_buffer = try mpsc.RingBuffer(T).init(buffer), }; } - pub fn blockingEnqueue(self: *Self, value: T) void { + pub fn capacity(self: *Self) usize { + return self.ring_buffer.capacity; + } + + pub fn tryEnqueue(self: *Self, value: T) !void { + try self.ring_buffer.enqueue(value); + } + + pub fn enqueue(self: *Self, value: T) void { while (true) { self.ring_buffer.enqueue(value) catch |err| switch (err) { error.Overflow => { @@ -26,10 +34,18 @@ pub fn Queue(comptime T: type) type { } } + pub fn tryDequeue(self: *Self) !T { + return try self.ring_buffer.dequeue(); + } + pub fn dequeue(self: *Self) ?T { return self.ring_buffer.dequeue() catch |err| switch (err) { error.Underflow => return null, }; } + + pub fn tryDequeueAll(self: *Self, buf: []T) ![]T { + return try self.ring_buffer.dequeueAll(buf); + } }; } diff --git a/src/msg_queue/root.zig b/src/msg_queue/root.zig index 473a7a1..d55e09b 100644 --- a/src/msg_queue/root.zig +++ b/src/msg_queue/root.zig @@ -3,8 +3,6 @@ const std = @import("std"); const mpsc = @import("../mpsc.zig"); const queue = @import("queue.zig"); -pub const messages = @import("messages.zig"); - pub fn MsgQueueUnmanaged(comptime T: type) type { return struct { const Self = @This(); @@ -13,18 +11,30 @@ pub fn MsgQueueUnmanaged(comptime T: type) type { // consumer_mutex: std.Thread.Mutex = std.Thread.Mutex{}, cond: std.Thread.Condition = std.Thread.Condition{}, - pub fn init(buffer: []mpsc.Slot(T)) Self { + pub fn init(buffer: []mpsc.Slot(T)) !Self { return .{ - .queue = queue.Queue(T).init(buffer), + .queue = try queue.Queue(T).init(buffer), }; } - pub fn enqueue(self: *Self, value: T) void { - self.queue.blockingEnqueue(value); - std.log.info("enqueue: SENDING signal", .{}); + pub fn capacity(self: *Self) usize { + return self.queue.capacity(); + } + + pub fn tryEnqueue(self: *Self, value: T) !void { + try self.queue.tryEnqueue(value); self.cond.signal(); } + pub fn enqueue(self: *Self, value: T) void { + self.queue.enqueue(value); + self.cond.signal(); + } + + pub fn tryDequeue(self: *Self) !T { + return try self.queue.tryDequeue(); + } + pub fn dequeue(self: *Self) T { var m = std.Thread.Mutex{}; m.lock(); @@ -32,11 +42,13 @@ pub fn MsgQueueUnmanaged(comptime T: type) type { while (true) { if (self.queue.dequeue()) |val| return val; - std.log.info("dequeue: STARTING consumer condition wait", .{}); self.cond.wait(&m); - std.log.info("dequeue: RECEIVED signal", .{}); } } + + pub fn tryDequeueAll(self: *Self, buf: []T) ![]T { + return try self.queue.tryDequeueAll(buf); + } }; } @@ -48,14 +60,14 @@ pub fn MsgQueueManaged(comptime T: type) type { buffer: []mpsc.Slot(T), msg_queue: MsgQueueUnmanaged(T), - pub fn init(allocator: std.mem.Allocator, capacity: usize) !Self { - const buffer = try allocator.alloc(mpsc.Slot(T), capacity); + pub fn init(allocator: std.mem.Allocator, cap: usize) !Self { + const buffer = try allocator.alloc(mpsc.Slot(T), cap); errdefer allocator.free(buffer); return .{ .allocator = allocator, .buffer = buffer, - .msg_queue = MsgQueueUnmanaged(T).init(buffer), + .msg_queue = try MsgQueueUnmanaged(T).init(buffer), }; } @@ -67,12 +79,28 @@ pub fn MsgQueueManaged(comptime T: type) type { return &self.msg_queue; } + pub fn capacity(self: *Self) usize { + return self.msg_queue.capacity(); + } + + pub fn tryEnqueue(self: *Self, value: T) !void { + try self.msg_queue.tryEnqueue(value); + } + pub fn enqueue(self: *Self, value: T) void { self.msg_queue.enqueue(value); } + pub fn tryDequeue(self: *Self) !T { + return try self.msg_queue.tryDequeue(); + } + pub fn dequeue(self: *Self) T { return self.msg_queue.dequeue(); } + + pub fn tryDequeueAll(self: *Self, buf: []T) ![]T { + return try self.msg_queue.tryDequeueAll(buf); + } }; } diff --git a/src/root.zig b/src/root.zig index 583610a..c91ef84 100644 --- a/src/root.zig +++ b/src/root.zig @@ -1,28 +1,41 @@ const std = @import("std"); +const models = @import("./models/root.zig"); const msg_queue = @import("msg_queue/root.zig"); const dispatchers = @import("dispatchers/root.zig"); const web = @import("web/root.zig"); +const log = std.log.scoped(.craftflut); + pub const Config = struct { - web_port: u16, - web_threads: i16, + unix_socket_path: []const u8, + dispatcher: dispatchers.BlockUpdate.Config, + web: web.Config, }; -fn receiver(queue: *msg_queue.MsgQueueUnmanaged(msg_queue.messages.BlockUpdate)) void { - while (true) { - const packet = queue.dequeue(); - std.log.info("packet: {}", .{packet}); - } +fn receiver(dispatcher: *dispatchers.BlockUpdate) void { + dispatcher.start() catch unreachable; } pub fn start(allocator: std.mem.Allocator, config: *const Config) !void { - // var update_queue = UpdateQueue.init(); - const capacity: usize = 1024; - var queue = try msg_queue.MsgQueueManaged(msg_queue.messages.BlockUpdate).init(allocator, capacity); + log.info("Starting craftflut gateway", .{}); + + log.info("Opening Unix socket: {s}", .{config.unix_socket_path}); + var stream = try std.net.connectUnixSocket(config.unix_socket_path); + defer stream.close(); + + var queue = try msg_queue.MsgQueueManaged(models.BlockUpdateMsg) + .init(allocator, config.dispatcher.capacity); defer queue.deinit(); - _ = try std.Thread.spawn(.{}, receiver, .{queue.unmanaged()}); + var dispatcher = dispatchers.BlockUpdate.init( + allocator, + &config.dispatcher, + queue.unmanaged(), + stream, + ); + log.info("Starting dispatcher", .{}); + _ = try std.Thread.spawn(.{}, receiver, .{&dispatcher}); - try web.start(allocator, config.web_threads, config.web_port, queue.unmanaged()); + try web.start(allocator, &config.web, queue.unmanaged()); } diff --git a/src/web/endpoints/Block.zig b/src/web/endpoints/Block.zig index 7a3de08..1701bb7 100644 --- a/src/web/endpoints/Block.zig +++ b/src/web/endpoints/Block.zig @@ -1,12 +1,15 @@ const std = @import("std"); const zap = @import("zap"); -const models = @import("../models/root.zig"); +const web_models = @import("../models/root.zig"); const stores = @import("../stores/root.zig"); +const models = @import("../../models/root.zig"); const msg_queue = @import("../../msg_queue/root.zig"); const Self = @This(); +const log = std.log.scoped(.block_endpoint); + pub const default_path = "block"; allocator: std.mem.Allocator, @@ -36,16 +39,15 @@ pub fn post(_: *Self, r: zap.Request) !void { pub fn put(self: *Self, r: zap.Request) !void { blk: { if (r.body) |body| { - const maybe_block: ?std.json.Parsed(models.Block) = std.json.parseFromSlice(models.Block, self.allocator, body, .{}) catch null; + const maybe_block: ?std.json.Parsed(web_models.Block) = std.json.parseFromSlice(web_models.Block, self.allocator, body, .{}) catch null; if (maybe_block) |parsed| { defer parsed.deinit(); const block = parsed.value; - if (block.material.len > msg_queue.messages.BlockUpdate.material_len_max or !models.Block.materialIsValid(block.material)) { + if (block.material.len > models.BlockUpdateMsg.material_max_len or !web_models.Block.materialIsValid(block.material)) { break :blk; } - std.log.info("block: {}", .{block}); - var msg = msg_queue.messages.BlockUpdate{ + var msg = models.BlockUpdateMsg{ .dimension = block.dimension, .x = block.x, .y = block.y, diff --git a/src/web/models/Block.zig b/src/web/models/Block.zig index db1f6e2..2df4710 100644 --- a/src/web/models/Block.zig +++ b/src/web/models/Block.zig @@ -1,9 +1,11 @@ const std = @import("std"); -dimension: u8, -x: i32, -y: i32, -z: i32, +const models = @import("../../models/root.zig"); + +dimension: models.BlockUpdateMsg.Dimension, +x: models.BlockUpdateMsg.Coordinate, +y: models.BlockUpdateMsg.Coordinate, +z: models.BlockUpdateMsg.Coordinate, material: []const u8, pub fn materialIsValid(material: []const u8) bool { diff --git a/src/web/root.zig b/src/web/root.zig index df2cb4d..f6cb7f6 100644 --- a/src/web/root.zig +++ b/src/web/root.zig @@ -2,18 +2,29 @@ const std = @import("std"); const zap = @import("zap"); const msg_queue = @import("../msg_queue/root.zig"); +const models = @import("../models/root.zig"); -const models = @import("models/root.zig"); const endpoints = @import("endpoints/root.zig"); const stores = @import("stores/root.zig"); +const log = std.log.scoped(.web); + +pub const Config = struct { + port: u16, + threads: u8, +}; + fn onRequest(r: zap.Request) !void { _ = r; } -pub fn start(allocator: std.mem.Allocator, threads: i16, port: u16, queue: *msg_queue.MsgQueueUnmanaged(msg_queue.messages.BlockUpdate)) !void { +pub fn start( + allocator: std.mem.Allocator, + config: *const Config, + queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg), +) !void { var listener = zap.Endpoint.Listener.init(allocator, .{ - .port = port, + .port = config.port, .on_request = onRequest, .log = true, }); @@ -32,10 +43,10 @@ pub fn start(allocator: std.mem.Allocator, threads: i16, port: u16, queue: *msg_ try listener.register(&block_endpoint); try listener.listen(); - std.log.info("Listening on 0.0.0.0:{d}", .{port}); + log.info("Listening on 0.0.0.0:{d}", .{config.port}); zap.start(.{ - .threads = threads, + .threads = @intCast(config.threads), .workers = 1, }); } diff --git a/src/web/stores/BlockUpdateQueue.zig b/src/web/stores/BlockUpdateQueue.zig index 8c287be..390ad35 100644 --- a/src/web/stores/BlockUpdateQueue.zig +++ b/src/web/stores/BlockUpdateQueue.zig @@ -1,3 +1,4 @@ const msg_queue = @import("../../msg_queue/root.zig"); +const models = @import("../../models/root.zig"); -queue: *msg_queue.MsgQueueUnmanaged(msg_queue.messages.BlockUpdate), +queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg),