Fix ring buffer
This commit is contained in:
parent
e70a04ac16
commit
c9c2b45b2a
17 changed files with 330 additions and 135 deletions
45
README.md
Normal file
45
README.md
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
# craftflut
|
||||||
|
|
||||||
|
Alternative gateway implementation for https://codeberg.org/MoKe12g/Pixelflut3D
|
||||||
|
|
||||||
|
## Building
|
||||||
|
|
||||||
|
A minimum Zig version of 0.14.0 is required.
|
||||||
|
|
||||||
|
```
|
||||||
|
zig build -Doptimize=(ReleaseSafe|ReleaseFast) -Dmaterial_max_len=(<int>, default: 40)
|
||||||
|
```
|
||||||
|
|
||||||
|
## CLI options
|
||||||
|
|
||||||
|
- `--unix-socket`: Path to Unix socket to send block updates to
|
||||||
|
- `--queue`: Block update queue capacity (default: `1024`)
|
||||||
|
- `--web-port`: Web server port
|
||||||
|
- `--web-threads`: Web server thread count (default: `1`)
|
||||||
|
|
||||||
|
## API
|
||||||
|
|
||||||
|
TODO: Classic Pixelflut TCP
|
||||||
|
|
||||||
|
### HTTP
|
||||||
|
|
||||||
|
| Path | Method | Headers | Body | Response | Description |
|
||||||
|
| ------------ | ------ | ------- | -------------------------------------------------------------------- | ------------- | --------------------------- |
|
||||||
|
| `/api/block` | `PUT` | | `{"dimension":<i32>,"x":<i32>,"y":<i32>,"z":<i32>,"material":<str>}` | `201 Created` | Enqueues given block update |
|
||||||
|
|
||||||
|
## Gateway
|
||||||
|
|
||||||
|
This application relays and batches the received block updates and sends them as CSV data to an existing Unix socket.
|
||||||
|
|
||||||
|
Create a test socket with: `socat -u UNIX-LISTEN:<path>,reuseaddr,fork -`
|
||||||
|
|
||||||
|
### Payload
|
||||||
|
|
||||||
|
Each packet is seperated by a newline (`\n`).
|
||||||
|
|
||||||
|
```
|
||||||
|
dimension,x,y,z,material
|
||||||
|
<int32>,<int32>,<int32>,<int32>,<str>
|
||||||
|
```
|
||||||
|
|
||||||
|
E.g.: `0,42,42,42,dirt` puts a dirt block in dimension `0` (Overworld) at coordinates `42,42,42`.
|
15
build.zig
15
build.zig
|
@ -1,14 +1,26 @@
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
|
|
||||||
|
const material_max_len_default: usize = 40;
|
||||||
|
|
||||||
pub fn build(b: *std.Build) void {
|
pub fn build(b: *std.Build) void {
|
||||||
const target = b.standardTargetOptions(.{});
|
const target = b.standardTargetOptions(.{});
|
||||||
const optimize = b.standardOptimizeOption(.{});
|
const optimize = b.standardOptimizeOption(.{});
|
||||||
|
|
||||||
const zap_dep = b.dependency("zap", .{});
|
const material_max_len = b.option(
|
||||||
|
usize,
|
||||||
|
"material-max-len",
|
||||||
|
"Maximum length a material name can have",
|
||||||
|
) orelse material_max_len_default;
|
||||||
|
const lib_options = b.addOptions();
|
||||||
|
lib_options.addOption(usize, "material_max_len", material_max_len);
|
||||||
|
|
||||||
|
const zap_dep = b.dependency("zap", .{ .openssl = false });
|
||||||
|
const clap_dep = b.dependency("clap", .{});
|
||||||
|
|
||||||
const lib_mod = b.addModule("craftflut", .{
|
const lib_mod = b.addModule("craftflut", .{
|
||||||
.root_source_file = b.path("src/root.zig"),
|
.root_source_file = b.path("src/root.zig"),
|
||||||
});
|
});
|
||||||
|
lib_mod.addOptions("build_options", lib_options);
|
||||||
lib_mod.addImport("zap", zap_dep.module("zap"));
|
lib_mod.addImport("zap", zap_dep.module("zap"));
|
||||||
|
|
||||||
const exe_mod = b.createModule(.{
|
const exe_mod = b.createModule(.{
|
||||||
|
@ -17,6 +29,7 @@ pub fn build(b: *std.Build) void {
|
||||||
.optimize = optimize,
|
.optimize = optimize,
|
||||||
});
|
});
|
||||||
exe_mod.addImport("craftflut", lib_mod);
|
exe_mod.addImport("craftflut", lib_mod);
|
||||||
|
exe_mod.addImport("clap", clap_dep.module("clap"));
|
||||||
|
|
||||||
const exe = b.addExecutable(.{
|
const exe = b.addExecutable(.{
|
||||||
.name = "craftflut",
|
.name = "craftflut",
|
||||||
|
|
|
@ -9,6 +9,10 @@
|
||||||
.url = "git+https://github.com/zigzap/zap.git#ec7cac6f6ab8e1892fe6fc499fd37cd93f7b2256",
|
.url = "git+https://github.com/zigzap/zap.git#ec7cac6f6ab8e1892fe6fc499fd37cd93f7b2256",
|
||||||
.hash = "zap-0.9.1-GoeB85JTJAADY1vAnA4lTuU66t6JJiuhGos5ex6CpifA",
|
.hash = "zap-0.9.1-GoeB85JTJAADY1vAnA4lTuU66t6JJiuhGos5ex6CpifA",
|
||||||
},
|
},
|
||||||
|
.clap = .{
|
||||||
|
.url = "git+https://github.com/Hejsil/zig-clap?ref=0.10.0#e47028deaefc2fb396d3d9e9f7bd776ae0b2a43a",
|
||||||
|
.hash = "clap-0.10.0-oBajB434AQBDh-Ei3YtoKIRxZacVPF1iSwp3IX_ZB8f0",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
.paths = .{
|
.paths = .{
|
||||||
"build.zig",
|
"build.zig",
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
const std = @import("std");
|
||||||
|
|
||||||
|
const msg_queue = @import("../msg_queue/root.zig");
|
||||||
|
const models = @import("../models/root.zig");
|
||||||
|
|
||||||
|
const Self = @This();
|
||||||
|
|
||||||
|
const log = std.log.scoped(.block_update_dispatcher);
|
||||||
|
|
||||||
|
pub const Config = struct {
|
||||||
|
capacity: usize,
|
||||||
|
};
|
||||||
|
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
|
||||||
|
config: *const Config,
|
||||||
|
queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg),
|
||||||
|
stream: ?std.net.Stream = null,
|
||||||
|
|
||||||
|
pub fn init(
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
config: *const Config,
|
||||||
|
queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg),
|
||||||
|
stream: std.net.Stream,
|
||||||
|
) Self {
|
||||||
|
return .{
|
||||||
|
.allocator = allocator,
|
||||||
|
.config = config,
|
||||||
|
.queue = queue,
|
||||||
|
.stream = stream,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send(self: *Self, update: *const models.BlockUpdateMsg) !void {
|
||||||
|
var buf: [models.BlockUpdateMsg.csv_max_len]u8 = undefined;
|
||||||
|
const stream = self.stream.?;
|
||||||
|
const csv = try update.toCsv(&buf);
|
||||||
|
try stream.writeAll(csv);
|
||||||
|
try stream.writeAll(&.{'\n'});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start(self: *Self) !void {
|
||||||
|
const buf = try self.allocator.alloc(models.BlockUpdateMsg, self.queue.capacity());
|
||||||
|
defer self.allocator.free(buf);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const updates = try self.queue.tryDequeueAll(buf);
|
||||||
|
for (updates) |*update| try self.send(update);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
pub const BlockUpdate = @import("BlockUpdate.zig");
|
38
src/main.zig
38
src/main.zig
|
@ -1,5 +1,6 @@
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
const builtin = @import("builtin");
|
const builtin = @import("builtin");
|
||||||
|
const clap = @import("clap");
|
||||||
|
|
||||||
const craftflut = @import("craftflut");
|
const craftflut = @import("craftflut");
|
||||||
|
|
||||||
|
@ -15,9 +16,42 @@ pub fn main() !void {
|
||||||
const allocator = gpa.allocator();
|
const allocator = gpa.allocator();
|
||||||
|
|
||||||
{
|
{
|
||||||
|
const params = comptime clap.parseParamsComptime(
|
||||||
|
\\-h, --help Display this help and exit.
|
||||||
|
\\--unix-socket <str> Path to Unix socket to send block updates to.
|
||||||
|
\\--queue <usize> Block update queue capacity.
|
||||||
|
\\--web-port <u16> Web server port.
|
||||||
|
\\--web-threads <u8> Web server thread count.
|
||||||
|
);
|
||||||
|
|
||||||
|
var diag = clap.Diagnostic{};
|
||||||
|
var res = clap.parse(clap.Help, ¶ms, clap.parsers.default, .{
|
||||||
|
.diagnostic = &diag,
|
||||||
|
.allocator = allocator,
|
||||||
|
}) catch |err| {
|
||||||
|
diag.report(std.io.getStdErr().writer(), err) catch {};
|
||||||
|
return err;
|
||||||
|
};
|
||||||
|
defer res.deinit();
|
||||||
|
|
||||||
|
if (res.args.help != 0 or
|
||||||
|
res.args.@"unix-socket" == null or
|
||||||
|
res.args.@"web-port" == null)
|
||||||
|
{
|
||||||
|
const writer = std.io.getStdErr().writer();
|
||||||
|
try writer.writeAll("craftflut\n");
|
||||||
|
return clap.help(writer, clap.Help, ¶ms, .{});
|
||||||
|
}
|
||||||
|
|
||||||
const config = craftflut.Config{
|
const config = craftflut.Config{
|
||||||
.web_port = 3000,
|
.unix_socket_path = res.args.@"unix-socket".?,
|
||||||
.web_threads = 16,
|
.dispatcher = .{
|
||||||
|
.capacity = res.args.queue orelse 1024,
|
||||||
|
},
|
||||||
|
.web = .{
|
||||||
|
.port = res.args.@"web-port".?,
|
||||||
|
.threads = res.args.@"web-threads" orelse 1,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
try craftflut.start(allocator, &config);
|
try craftflut.start(allocator, &config);
|
||||||
}
|
}
|
||||||
|
|
39
src/models/BlockUpdateMsg.zig
Normal file
39
src/models/BlockUpdateMsg.zig
Normal file
|
@ -0,0 +1,39 @@
|
||||||
|
const std = @import("std");
|
||||||
|
|
||||||
|
const build_options = @import("build_options");
|
||||||
|
|
||||||
|
const Self = @This();
|
||||||
|
|
||||||
|
pub const Dimension = i32;
|
||||||
|
pub const Coordinate = i32;
|
||||||
|
|
||||||
|
pub const material_max_len = build_options.material_max_len;
|
||||||
|
pub const csv_max_len: usize = blk: {
|
||||||
|
const str = std.fmt.comptimePrint(
|
||||||
|
"-{d},-{d},-{d},-{d},",
|
||||||
|
.{
|
||||||
|
std.math.maxInt(Dimension),
|
||||||
|
std.math.maxInt(Coordinate),
|
||||||
|
std.math.maxInt(Coordinate),
|
||||||
|
std.math.maxInt(Coordinate),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
break :blk str.len + material_max_len;
|
||||||
|
};
|
||||||
|
|
||||||
|
dimension: Dimension,
|
||||||
|
x: Coordinate,
|
||||||
|
y: Coordinate,
|
||||||
|
z: Coordinate,
|
||||||
|
material: [material_max_len]u8 = undefined,
|
||||||
|
material_len: usize = 0,
|
||||||
|
|
||||||
|
pub fn toCsv(self: *const Self, buf: *[csv_max_len]u8) ![]u8 {
|
||||||
|
return try std.fmt.bufPrint(buf, "{d},{d},{d},{d},{s}", .{
|
||||||
|
self.dimension,
|
||||||
|
self.x,
|
||||||
|
self.y,
|
||||||
|
self.z,
|
||||||
|
self.material[0..self.material_len],
|
||||||
|
});
|
||||||
|
}
|
1
src/models/root.zig
Normal file
1
src/models/root.zig
Normal file
|
@ -0,0 +1 @@
|
||||||
|
pub const BlockUpdateMsg = @import("BlockUpdateMsg.zig");
|
101
src/mpsc.zig
101
src/mpsc.zig
|
@ -16,14 +16,11 @@ pub fn RingBuffer(comptime T: type) type {
|
||||||
head: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
|
head: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
|
||||||
tail: usize = 0,
|
tail: usize = 0,
|
||||||
|
|
||||||
pub fn init(buffer: []Slot(T)) Self {
|
pub fn init(buffer: []Slot(T)) error{CapacityTooSmall}!Self {
|
||||||
// var buffer: [capacity]Slot(T) = undefined;
|
if (buffer.len <= 1) {
|
||||||
// {
|
return error.CapacityTooSmall;
|
||||||
// @setEvalBranchQuota(capacity * 4);
|
}
|
||||||
// for (0..capacity) |i| {
|
|
||||||
// buffer[i].version = std.atomic.Value(usize).init(i);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
for (buffer, 0..) |*slot, i| {
|
for (buffer, 0..) |*slot, i| {
|
||||||
slot.version = std.atomic.Value(usize).init(i);
|
slot.version = std.atomic.Value(usize).init(i);
|
||||||
}
|
}
|
||||||
|
@ -76,77 +73,25 @@ pub fn RingBuffer(comptime T: type) type {
|
||||||
self.tail +%= 1;
|
self.tail +%= 1;
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
pub fn dequeueAll(self: *Self, buf: []T) ![]T {
|
||||||
|
if (buf.len < self.capacity) {
|
||||||
|
return error.BufferTooSmall;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pub fn RingBuffer(comptime T: type) type {
|
const head = self.head.load(.acquire) % self.capacity;
|
||||||
// return struct {
|
var tail = self.tail % self.capacity;
|
||||||
// const Self = @This();
|
var i: usize = 0;
|
||||||
|
|
||||||
// capacity: usize,
|
while (true) {
|
||||||
// buffer: []Slot(T),
|
const val = self.dequeue() catch break;
|
||||||
// head: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
|
buf[i] = val;
|
||||||
// tail: usize = 0,
|
i += 1;
|
||||||
|
tail = (tail + 1) % self.capacity;
|
||||||
|
if (tail == head) break;
|
||||||
|
}
|
||||||
|
|
||||||
// pub fn init(buffer: []Slot(T)) Self {
|
return buf[0..i];
|
||||||
// // var buffer: [capacity]Slot(T) = undefined;
|
}
|
||||||
// // {
|
};
|
||||||
// // @setEvalBranchQuota(capacity * 4);
|
}
|
||||||
// // for (0..capacity) |i| {
|
|
||||||
// // buffer[i].version = std.atomic.Value(usize).init(i);
|
|
||||||
// // }
|
|
||||||
// // }
|
|
||||||
// for (buffer, 0..) |*slot, i| {
|
|
||||||
// slot.value = std.atomic.Value(usize).init(i);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// return .{
|
|
||||||
// .capacity = buffer.len,
|
|
||||||
// .buffer = buffer,
|
|
||||||
// };
|
|
||||||
// }
|
|
||||||
|
|
||||||
// pub fn enqueue(self: *Self, value: T) error{Overflow}!void {
|
|
||||||
// while (true) {
|
|
||||||
// const head = self.head.load(.acquire);
|
|
||||||
// const index = head % self.capacity;
|
|
||||||
// const slot = &self.buffer[index];
|
|
||||||
|
|
||||||
// const expected_version = head;
|
|
||||||
// if (slot.version.load(.acquire) != expected_version) {
|
|
||||||
// return error.Overflow;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if (self.head.cmpxchgStrong(
|
|
||||||
// head,
|
|
||||||
// head + 1,
|
|
||||||
// .seq_cst,
|
|
||||||
// .seq_cst,
|
|
||||||
// )) |_| {
|
|
||||||
// std.atomic.spinLoopHint();
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// slot.value = value;
|
|
||||||
// slot.version.store(expected_version + 1, .release);
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// pub fn dequeue(self: *Self) error{Underflow}!T {
|
|
||||||
// const tail = self.tail;
|
|
||||||
// const index = tail % self.capacity;
|
|
||||||
// const slot = &self.buffer[index];
|
|
||||||
|
|
||||||
// const expected_version = tail + 1;
|
|
||||||
// if (slot.version.load(.acquire) != expected_version) {
|
|
||||||
// return error.Underflow;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// const value = slot.value;
|
|
||||||
// slot.version.store(tail +% self.capacity, .release);
|
|
||||||
// self.tail +%= 1;
|
|
||||||
// return value;
|
|
||||||
// }
|
|
||||||
// };
|
|
||||||
// }
|
|
||||||
|
|
|
@ -1,10 +0,0 @@
|
||||||
pub const BlockUpdate = struct {
|
|
||||||
pub const material_len_max = 32;
|
|
||||||
|
|
||||||
dimension: u8,
|
|
||||||
x: i32,
|
|
||||||
y: i32,
|
|
||||||
z: i32,
|
|
||||||
material: [material_len_max]u8 = undefined,
|
|
||||||
material_len: usize = 0,
|
|
||||||
};
|
|
|
@ -8,13 +8,21 @@ pub fn Queue(comptime T: type) type {
|
||||||
|
|
||||||
ring_buffer: mpsc.RingBuffer(T),
|
ring_buffer: mpsc.RingBuffer(T),
|
||||||
|
|
||||||
pub fn init(buffer: []mpsc.Slot(T)) Self {
|
pub fn init(buffer: []mpsc.Slot(T)) !Self {
|
||||||
return .{
|
return .{
|
||||||
.ring_buffer = mpsc.RingBuffer(T).init(buffer),
|
.ring_buffer = try mpsc.RingBuffer(T).init(buffer),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn blockingEnqueue(self: *Self, value: T) void {
|
pub fn capacity(self: *Self) usize {
|
||||||
|
return self.ring_buffer.capacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn tryEnqueue(self: *Self, value: T) !void {
|
||||||
|
try self.ring_buffer.enqueue(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn enqueue(self: *Self, value: T) void {
|
||||||
while (true) {
|
while (true) {
|
||||||
self.ring_buffer.enqueue(value) catch |err| switch (err) {
|
self.ring_buffer.enqueue(value) catch |err| switch (err) {
|
||||||
error.Overflow => {
|
error.Overflow => {
|
||||||
|
@ -26,10 +34,18 @@ pub fn Queue(comptime T: type) type {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn tryDequeue(self: *Self) !T {
|
||||||
|
return try self.ring_buffer.dequeue();
|
||||||
|
}
|
||||||
|
|
||||||
pub fn dequeue(self: *Self) ?T {
|
pub fn dequeue(self: *Self) ?T {
|
||||||
return self.ring_buffer.dequeue() catch |err| switch (err) {
|
return self.ring_buffer.dequeue() catch |err| switch (err) {
|
||||||
error.Underflow => return null,
|
error.Underflow => return null,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn tryDequeueAll(self: *Self, buf: []T) ![]T {
|
||||||
|
return try self.ring_buffer.dequeueAll(buf);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,6 @@ const std = @import("std");
|
||||||
const mpsc = @import("../mpsc.zig");
|
const mpsc = @import("../mpsc.zig");
|
||||||
const queue = @import("queue.zig");
|
const queue = @import("queue.zig");
|
||||||
|
|
||||||
pub const messages = @import("messages.zig");
|
|
||||||
|
|
||||||
pub fn MsgQueueUnmanaged(comptime T: type) type {
|
pub fn MsgQueueUnmanaged(comptime T: type) type {
|
||||||
return struct {
|
return struct {
|
||||||
const Self = @This();
|
const Self = @This();
|
||||||
|
@ -13,18 +11,30 @@ pub fn MsgQueueUnmanaged(comptime T: type) type {
|
||||||
// consumer_mutex: std.Thread.Mutex = std.Thread.Mutex{},
|
// consumer_mutex: std.Thread.Mutex = std.Thread.Mutex{},
|
||||||
cond: std.Thread.Condition = std.Thread.Condition{},
|
cond: std.Thread.Condition = std.Thread.Condition{},
|
||||||
|
|
||||||
pub fn init(buffer: []mpsc.Slot(T)) Self {
|
pub fn init(buffer: []mpsc.Slot(T)) !Self {
|
||||||
return .{
|
return .{
|
||||||
.queue = queue.Queue(T).init(buffer),
|
.queue = try queue.Queue(T).init(buffer),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn enqueue(self: *Self, value: T) void {
|
pub fn capacity(self: *Self) usize {
|
||||||
self.queue.blockingEnqueue(value);
|
return self.queue.capacity();
|
||||||
std.log.info("enqueue: SENDING signal", .{});
|
}
|
||||||
|
|
||||||
|
pub fn tryEnqueue(self: *Self, value: T) !void {
|
||||||
|
try self.queue.tryEnqueue(value);
|
||||||
self.cond.signal();
|
self.cond.signal();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn enqueue(self: *Self, value: T) void {
|
||||||
|
self.queue.enqueue(value);
|
||||||
|
self.cond.signal();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn tryDequeue(self: *Self) !T {
|
||||||
|
return try self.queue.tryDequeue();
|
||||||
|
}
|
||||||
|
|
||||||
pub fn dequeue(self: *Self) T {
|
pub fn dequeue(self: *Self) T {
|
||||||
var m = std.Thread.Mutex{};
|
var m = std.Thread.Mutex{};
|
||||||
m.lock();
|
m.lock();
|
||||||
|
@ -32,11 +42,13 @@ pub fn MsgQueueUnmanaged(comptime T: type) type {
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (self.queue.dequeue()) |val| return val;
|
if (self.queue.dequeue()) |val| return val;
|
||||||
std.log.info("dequeue: STARTING consumer condition wait", .{});
|
|
||||||
self.cond.wait(&m);
|
self.cond.wait(&m);
|
||||||
std.log.info("dequeue: RECEIVED signal", .{});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn tryDequeueAll(self: *Self, buf: []T) ![]T {
|
||||||
|
return try self.queue.tryDequeueAll(buf);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,14 +60,14 @@ pub fn MsgQueueManaged(comptime T: type) type {
|
||||||
buffer: []mpsc.Slot(T),
|
buffer: []mpsc.Slot(T),
|
||||||
msg_queue: MsgQueueUnmanaged(T),
|
msg_queue: MsgQueueUnmanaged(T),
|
||||||
|
|
||||||
pub fn init(allocator: std.mem.Allocator, capacity: usize) !Self {
|
pub fn init(allocator: std.mem.Allocator, cap: usize) !Self {
|
||||||
const buffer = try allocator.alloc(mpsc.Slot(T), capacity);
|
const buffer = try allocator.alloc(mpsc.Slot(T), cap);
|
||||||
errdefer allocator.free(buffer);
|
errdefer allocator.free(buffer);
|
||||||
|
|
||||||
return .{
|
return .{
|
||||||
.allocator = allocator,
|
.allocator = allocator,
|
||||||
.buffer = buffer,
|
.buffer = buffer,
|
||||||
.msg_queue = MsgQueueUnmanaged(T).init(buffer),
|
.msg_queue = try MsgQueueUnmanaged(T).init(buffer),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,12 +79,28 @@ pub fn MsgQueueManaged(comptime T: type) type {
|
||||||
return &self.msg_queue;
|
return &self.msg_queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn capacity(self: *Self) usize {
|
||||||
|
return self.msg_queue.capacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn tryEnqueue(self: *Self, value: T) !void {
|
||||||
|
try self.msg_queue.tryEnqueue(value);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn enqueue(self: *Self, value: T) void {
|
pub fn enqueue(self: *Self, value: T) void {
|
||||||
self.msg_queue.enqueue(value);
|
self.msg_queue.enqueue(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn tryDequeue(self: *Self) !T {
|
||||||
|
return try self.msg_queue.tryDequeue();
|
||||||
|
}
|
||||||
|
|
||||||
pub fn dequeue(self: *Self) T {
|
pub fn dequeue(self: *Self) T {
|
||||||
return self.msg_queue.dequeue();
|
return self.msg_queue.dequeue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn tryDequeueAll(self: *Self, buf: []T) ![]T {
|
||||||
|
return try self.msg_queue.tryDequeueAll(buf);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
37
src/root.zig
37
src/root.zig
|
@ -1,28 +1,41 @@
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
|
|
||||||
|
const models = @import("./models/root.zig");
|
||||||
const msg_queue = @import("msg_queue/root.zig");
|
const msg_queue = @import("msg_queue/root.zig");
|
||||||
const dispatchers = @import("dispatchers/root.zig");
|
const dispatchers = @import("dispatchers/root.zig");
|
||||||
const web = @import("web/root.zig");
|
const web = @import("web/root.zig");
|
||||||
|
|
||||||
|
const log = std.log.scoped(.craftflut);
|
||||||
|
|
||||||
pub const Config = struct {
|
pub const Config = struct {
|
||||||
web_port: u16,
|
unix_socket_path: []const u8,
|
||||||
web_threads: i16,
|
dispatcher: dispatchers.BlockUpdate.Config,
|
||||||
|
web: web.Config,
|
||||||
};
|
};
|
||||||
|
|
||||||
fn receiver(queue: *msg_queue.MsgQueueUnmanaged(msg_queue.messages.BlockUpdate)) void {
|
fn receiver(dispatcher: *dispatchers.BlockUpdate) void {
|
||||||
while (true) {
|
dispatcher.start() catch unreachable;
|
||||||
const packet = queue.dequeue();
|
|
||||||
std.log.info("packet: {}", .{packet});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start(allocator: std.mem.Allocator, config: *const Config) !void {
|
pub fn start(allocator: std.mem.Allocator, config: *const Config) !void {
|
||||||
// var update_queue = UpdateQueue.init();
|
log.info("Starting craftflut gateway", .{});
|
||||||
const capacity: usize = 1024;
|
|
||||||
var queue = try msg_queue.MsgQueueManaged(msg_queue.messages.BlockUpdate).init(allocator, capacity);
|
log.info("Opening Unix socket: {s}", .{config.unix_socket_path});
|
||||||
|
var stream = try std.net.connectUnixSocket(config.unix_socket_path);
|
||||||
|
defer stream.close();
|
||||||
|
|
||||||
|
var queue = try msg_queue.MsgQueueManaged(models.BlockUpdateMsg)
|
||||||
|
.init(allocator, config.dispatcher.capacity);
|
||||||
defer queue.deinit();
|
defer queue.deinit();
|
||||||
|
|
||||||
_ = try std.Thread.spawn(.{}, receiver, .{queue.unmanaged()});
|
var dispatcher = dispatchers.BlockUpdate.init(
|
||||||
|
allocator,
|
||||||
|
&config.dispatcher,
|
||||||
|
queue.unmanaged(),
|
||||||
|
stream,
|
||||||
|
);
|
||||||
|
log.info("Starting dispatcher", .{});
|
||||||
|
_ = try std.Thread.spawn(.{}, receiver, .{&dispatcher});
|
||||||
|
|
||||||
try web.start(allocator, config.web_threads, config.web_port, queue.unmanaged());
|
try web.start(allocator, &config.web, queue.unmanaged());
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,15 @@
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
const zap = @import("zap");
|
const zap = @import("zap");
|
||||||
|
|
||||||
const models = @import("../models/root.zig");
|
const web_models = @import("../models/root.zig");
|
||||||
const stores = @import("../stores/root.zig");
|
const stores = @import("../stores/root.zig");
|
||||||
|
const models = @import("../../models/root.zig");
|
||||||
const msg_queue = @import("../../msg_queue/root.zig");
|
const msg_queue = @import("../../msg_queue/root.zig");
|
||||||
|
|
||||||
const Self = @This();
|
const Self = @This();
|
||||||
|
|
||||||
|
const log = std.log.scoped(.block_endpoint);
|
||||||
|
|
||||||
pub const default_path = "block";
|
pub const default_path = "block";
|
||||||
|
|
||||||
allocator: std.mem.Allocator,
|
allocator: std.mem.Allocator,
|
||||||
|
@ -36,16 +39,15 @@ pub fn post(_: *Self, r: zap.Request) !void {
|
||||||
pub fn put(self: *Self, r: zap.Request) !void {
|
pub fn put(self: *Self, r: zap.Request) !void {
|
||||||
blk: {
|
blk: {
|
||||||
if (r.body) |body| {
|
if (r.body) |body| {
|
||||||
const maybe_block: ?std.json.Parsed(models.Block) = std.json.parseFromSlice(models.Block, self.allocator, body, .{}) catch null;
|
const maybe_block: ?std.json.Parsed(web_models.Block) = std.json.parseFromSlice(web_models.Block, self.allocator, body, .{}) catch null;
|
||||||
if (maybe_block) |parsed| {
|
if (maybe_block) |parsed| {
|
||||||
defer parsed.deinit();
|
defer parsed.deinit();
|
||||||
const block = parsed.value;
|
const block = parsed.value;
|
||||||
if (block.material.len > msg_queue.messages.BlockUpdate.material_len_max or !models.Block.materialIsValid(block.material)) {
|
if (block.material.len > models.BlockUpdateMsg.material_max_len or !web_models.Block.materialIsValid(block.material)) {
|
||||||
break :blk;
|
break :blk;
|
||||||
}
|
}
|
||||||
std.log.info("block: {}", .{block});
|
|
||||||
|
|
||||||
var msg = msg_queue.messages.BlockUpdate{
|
var msg = models.BlockUpdateMsg{
|
||||||
.dimension = block.dimension,
|
.dimension = block.dimension,
|
||||||
.x = block.x,
|
.x = block.x,
|
||||||
.y = block.y,
|
.y = block.y,
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
|
|
||||||
dimension: u8,
|
const models = @import("../../models/root.zig");
|
||||||
x: i32,
|
|
||||||
y: i32,
|
dimension: models.BlockUpdateMsg.Dimension,
|
||||||
z: i32,
|
x: models.BlockUpdateMsg.Coordinate,
|
||||||
|
y: models.BlockUpdateMsg.Coordinate,
|
||||||
|
z: models.BlockUpdateMsg.Coordinate,
|
||||||
material: []const u8,
|
material: []const u8,
|
||||||
|
|
||||||
pub fn materialIsValid(material: []const u8) bool {
|
pub fn materialIsValid(material: []const u8) bool {
|
||||||
|
|
|
@ -2,18 +2,29 @@ const std = @import("std");
|
||||||
const zap = @import("zap");
|
const zap = @import("zap");
|
||||||
|
|
||||||
const msg_queue = @import("../msg_queue/root.zig");
|
const msg_queue = @import("../msg_queue/root.zig");
|
||||||
|
const models = @import("../models/root.zig");
|
||||||
|
|
||||||
const models = @import("models/root.zig");
|
|
||||||
const endpoints = @import("endpoints/root.zig");
|
const endpoints = @import("endpoints/root.zig");
|
||||||
const stores = @import("stores/root.zig");
|
const stores = @import("stores/root.zig");
|
||||||
|
|
||||||
|
const log = std.log.scoped(.web);
|
||||||
|
|
||||||
|
pub const Config = struct {
|
||||||
|
port: u16,
|
||||||
|
threads: u8,
|
||||||
|
};
|
||||||
|
|
||||||
fn onRequest(r: zap.Request) !void {
|
fn onRequest(r: zap.Request) !void {
|
||||||
_ = r;
|
_ = r;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start(allocator: std.mem.Allocator, threads: i16, port: u16, queue: *msg_queue.MsgQueueUnmanaged(msg_queue.messages.BlockUpdate)) !void {
|
pub fn start(
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
config: *const Config,
|
||||||
|
queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg),
|
||||||
|
) !void {
|
||||||
var listener = zap.Endpoint.Listener.init(allocator, .{
|
var listener = zap.Endpoint.Listener.init(allocator, .{
|
||||||
.port = port,
|
.port = config.port,
|
||||||
.on_request = onRequest,
|
.on_request = onRequest,
|
||||||
.log = true,
|
.log = true,
|
||||||
});
|
});
|
||||||
|
@ -32,10 +43,10 @@ pub fn start(allocator: std.mem.Allocator, threads: i16, port: u16, queue: *msg_
|
||||||
try listener.register(&block_endpoint);
|
try listener.register(&block_endpoint);
|
||||||
|
|
||||||
try listener.listen();
|
try listener.listen();
|
||||||
std.log.info("Listening on 0.0.0.0:{d}", .{port});
|
log.info("Listening on 0.0.0.0:{d}", .{config.port});
|
||||||
|
|
||||||
zap.start(.{
|
zap.start(.{
|
||||||
.threads = threads,
|
.threads = @intCast(config.threads),
|
||||||
.workers = 1,
|
.workers = 1,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
const msg_queue = @import("../../msg_queue/root.zig");
|
const msg_queue = @import("../../msg_queue/root.zig");
|
||||||
|
const models = @import("../../models/root.zig");
|
||||||
|
|
||||||
queue: *msg_queue.MsgQueueUnmanaged(msg_queue.messages.BlockUpdate),
|
queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue