Fix material naming
This commit is contained in:
parent
8967477f39
commit
dd0435d563
7 changed files with 126 additions and 28 deletions
|
@ -12,7 +12,7 @@ zig build -Doptimize=(ReleaseSafe|ReleaseFast) -Dmaterial_max_len=(<int>, defaul
|
||||||
|
|
||||||
## CLI options
|
## CLI options
|
||||||
|
|
||||||
- `--unix-socket`: Path to Unix socket to send block updates to
|
- `--unix-socket`: Path to Unix socket to bind to and to send block updates to
|
||||||
- `--queue`: Block update queue capacity (default: `1024`)
|
- `--queue`: Block update queue capacity (default: `1024`)
|
||||||
- `--web-port`: Web server port
|
- `--web-port`: Web server port
|
||||||
- `--web-threads`: Web server thread count (default: `1`)
|
- `--web-threads`: Web server thread count (default: `1`)
|
||||||
|
@ -31,7 +31,9 @@ TODO: Classic Pixelflut TCP
|
||||||
|
|
||||||
This application relays and batches the received block updates and sends them as CSV data to an existing Unix socket.
|
This application relays and batches the received block updates and sends them as CSV data to an existing Unix socket.
|
||||||
|
|
||||||
Create a test socket with: `socat -u UNIX-LISTEN:<path>,reuseaddr,fork -`
|
Connect to the socket with: `socat - UNIX:<path>`
|
||||||
|
|
||||||
|
TODO: Accept multiple clients
|
||||||
|
|
||||||
### Payload
|
### Payload
|
||||||
|
|
||||||
|
@ -42,4 +44,4 @@ dimension,x,y,z,material
|
||||||
<int32>,<int32>,<int32>,<int32>,<str>
|
<int32>,<int32>,<int32>,<int32>,<str>
|
||||||
```
|
```
|
||||||
|
|
||||||
E.g.: `0,42,42,42,dirt` puts a dirt block in dimension `0` (Overworld) at coordinates `42,42,42`.
|
E.g.: `0,42,42,42,DIRT` puts a dirt block in dimension `0` (Overworld) at coordinates `42,42,42`.
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
|
|
||||||
const material_max_len_default: usize = 40;
|
const material_max_len_default: usize = 40;
|
||||||
|
const dimension_max_len_default: usize = 16;
|
||||||
|
|
||||||
pub fn build(b: *std.Build) void {
|
pub fn build(b: *std.Build) void {
|
||||||
const target = b.standardTargetOptions(.{});
|
const target = b.standardTargetOptions(.{});
|
||||||
|
|
|
@ -21,40 +21,92 @@ config: *const Config,
|
||||||
/// Block update queue
|
/// Block update queue
|
||||||
queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg),
|
queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg),
|
||||||
|
|
||||||
/// Payload stream resource
|
/// Data server
|
||||||
stream: ?std.net.Stream = null,
|
server: *std.net.Server,
|
||||||
|
|
||||||
pub fn init(
|
pub fn init(
|
||||||
allocator: std.mem.Allocator,
|
allocator: std.mem.Allocator,
|
||||||
config: *const Config,
|
config: *const Config,
|
||||||
queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg),
|
queue: *msg_queue.MsgQueueUnmanaged(models.BlockUpdateMsg),
|
||||||
stream: std.net.Stream,
|
server: *std.net.Server,
|
||||||
) Self {
|
) Self {
|
||||||
return .{
|
return .{
|
||||||
.allocator = allocator,
|
.allocator = allocator,
|
||||||
.config = config,
|
.config = config,
|
||||||
.queue = queue,
|
.queue = queue,
|
||||||
.stream = stream,
|
.server = server,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends one block update payload to the stream, terminates it with a newline.
|
/// Sends one block update payload to a stream, terminates it with a newline.
|
||||||
fn send(self: *Self, update: *const models.BlockUpdateMsg) !void {
|
fn send(stream: std.net.Stream, update: *const models.BlockUpdateMsg) !void {
|
||||||
var buf: [models.BlockUpdateMsg.csv_max_len]u8 = undefined;
|
var buf: [models.BlockUpdateMsg.csv_max_len]u8 = undefined;
|
||||||
const stream = self.stream.?;
|
|
||||||
const csv = try update.toCsv(&buf);
|
const csv = try update.toCsv(&buf);
|
||||||
try stream.writeAll(csv);
|
stream.writeAll(csv) catch return;
|
||||||
try stream.writeAll(&.{'\n'});
|
stream.writeAll(&.{'\n'}) catch return;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn streamListener(self: *Self, stream: std.net.Stream, running: *std.atomic.Value(bool)) void {
|
||||||
|
var buf: [512]u8 = undefined;
|
||||||
|
while (running.load(.monotonic)) {
|
||||||
|
const n = stream.read(&buf) catch 0;
|
||||||
|
std.log.info("n: {}", .{n});
|
||||||
|
if (n < buf.len) break;
|
||||||
|
}
|
||||||
|
|
||||||
|
std.log.info("terminating stream listener", .{});
|
||||||
|
|
||||||
|
running.store(false, .release);
|
||||||
|
self.queue.cond.broadcast();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Starts dispatcher. Receives new queue entries and sends them to the stream.
|
|
||||||
pub fn start(self: *Self) !void {
|
pub fn start(self: *Self) !void {
|
||||||
|
while (true) {
|
||||||
|
const conn = try self.server.accept();
|
||||||
|
log.info("Client connected", .{});
|
||||||
|
defer conn.stream.close();
|
||||||
|
|
||||||
// buffer for `tryDequeueAll` with maximum queue capacity as size
|
// buffer for `tryDequeueAll` with maximum queue capacity as size
|
||||||
const buf = try self.allocator.alloc(models.BlockUpdateMsg, self.queue.capacity());
|
const buf = try self.allocator.alloc(models.BlockUpdateMsg, self.queue.capacity());
|
||||||
defer self.allocator.free(buf);
|
defer self.allocator.free(buf);
|
||||||
|
|
||||||
|
var running = std.atomic.Value(bool).init(true);
|
||||||
|
defer running.store(false, .release);
|
||||||
|
_ = try std.Thread.spawn(.{}, streamListener, .{ self, conn.stream, &running });
|
||||||
|
|
||||||
|
blk: {
|
||||||
while (true) {
|
while (true) {
|
||||||
const updates = try self.queue.tryDequeueAll(buf);
|
const updates = try self.queue.dequeueAll(buf);
|
||||||
for (updates) |*update| try self.send(update);
|
if (!running.load(.monotonic)) break :blk;
|
||||||
|
for (updates) |*update| {
|
||||||
|
if (running.load(.monotonic)) {
|
||||||
|
try send(conn.stream, update);
|
||||||
|
} else {
|
||||||
|
break :blk;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// /// Sends one block update payload to the stream, terminates it with a newline.
|
||||||
|
// fn send(self: *Self, update: *const models.BlockUpdateMsg) !void {
|
||||||
|
// var buf: [models.BlockUpdateMsg.csv_max_len]u8 = undefined;
|
||||||
|
// const stream = self.stream.?;
|
||||||
|
// const csv = try update.toCsv(&buf);
|
||||||
|
// try stream.writeAll(csv);
|
||||||
|
// try stream.writeAll(&.{'\n'});
|
||||||
|
// }
|
||||||
|
|
||||||
|
// /// Starts dispatcher. Receives new queue entries and sends them to the stream.
|
||||||
|
// pub fn start(self: *Self) !void {
|
||||||
|
// // buffer for `tryDequeueAll` with maximum queue capacity as size
|
||||||
|
// const buf = try self.allocator.alloc(models.BlockUpdateMsg, self.queue.capacity());
|
||||||
|
// defer self.allocator.free(buf);
|
||||||
|
|
||||||
|
// while (true) {
|
||||||
|
// const updates = try self.queue.tryDequeueAll(buf);
|
||||||
|
// for (updates) |*update| try self.send(update);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
|
@ -8,7 +8,7 @@ pub const Dimension = i32;
|
||||||
pub const Coordinate = i32;
|
pub const Coordinate = i32;
|
||||||
|
|
||||||
/// Maximum length of a material name
|
/// Maximum length of a material name
|
||||||
pub const material_max_len = build_options.material_max_len;
|
pub const material_max_len: usize = build_options.material_max_len;
|
||||||
|
|
||||||
/// Maximum CSV payload size for a block update
|
/// Maximum CSV payload size for a block update
|
||||||
pub const csv_max_len: usize = blk: {
|
pub const csv_max_len: usize = blk: {
|
||||||
|
@ -38,11 +38,14 @@ material_len: usize = 0,
|
||||||
/// Converts block update message to CSV payload
|
/// Converts block update message to CSV payload
|
||||||
/// Format: *`<dimension>,<x>,<y>,<z>,<material>`* (`<int32>,<int32>,<int32>,<int32>,<str>`)
|
/// Format: *`<dimension>,<x>,<y>,<z>,<material>`* (`<int32>,<int32>,<int32>,<int32>,<str>`)
|
||||||
pub fn toCsv(self: *const Self, buf: *[csv_max_len]u8) ![]u8 {
|
pub fn toCsv(self: *const Self, buf: *[csv_max_len]u8) ![]u8 {
|
||||||
|
var material_upper_buf: [material_max_len]u8 = undefined;
|
||||||
|
const material_upper = std.ascii.upperString(&material_upper_buf, self.material[0..self.material_len]);
|
||||||
|
|
||||||
return try std.fmt.bufPrint(buf, "{d},{d},{d},{d},{s}", .{
|
return try std.fmt.bufPrint(buf, "{d},{d},{d},{d},{s}", .{
|
||||||
self.dimension,
|
self.dimension,
|
||||||
self.x,
|
self.x,
|
||||||
self.y,
|
self.y,
|
||||||
self.z,
|
self.z,
|
||||||
self.material[0..self.material_len],
|
material_upper,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,12 +23,12 @@ pub fn MsgQueueUnmanaged(comptime T: type) type {
|
||||||
|
|
||||||
pub fn tryEnqueue(self: *Self, value: T) !void {
|
pub fn tryEnqueue(self: *Self, value: T) !void {
|
||||||
try self.queue.tryEnqueue(value);
|
try self.queue.tryEnqueue(value);
|
||||||
self.cond.signal();
|
self.cond.broadcast();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn enqueue(self: *Self, value: T) void {
|
pub fn enqueue(self: *Self, value: T) void {
|
||||||
self.queue.enqueue(value);
|
self.queue.enqueue(value);
|
||||||
self.cond.signal();
|
self.cond.broadcast();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn tryDequeue(self: *Self) !T {
|
pub fn tryDequeue(self: *Self) !T {
|
||||||
|
@ -49,6 +49,18 @@ pub fn MsgQueueUnmanaged(comptime T: type) type {
|
||||||
pub fn tryDequeueAll(self: *Self, buf: []T) ![]T {
|
pub fn tryDequeueAll(self: *Self, buf: []T) ![]T {
|
||||||
return try self.queue.tryDequeueAll(buf);
|
return try self.queue.tryDequeueAll(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn dequeueAll(self: *Self, buf: []T) ![]T {
|
||||||
|
var m = std.Thread.Mutex{};
|
||||||
|
m.lock();
|
||||||
|
defer m.unlock();
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const slice = try self.queue.tryDequeueAll(buf);
|
||||||
|
if (slice.len > 0) return slice;
|
||||||
|
self.cond.wait(&m);
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
36
src/root.zig
36
src/root.zig
|
@ -19,6 +19,30 @@ pub const Config = struct {
|
||||||
web: web.Config,
|
web: web.Config,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
fn openUnixSocketServer(socket_path: []const u8, max_clients: u31) !std.net.Server {
|
||||||
|
errdefer std.posix.unlink(socket_path) catch {};
|
||||||
|
|
||||||
|
const sockfd = try std.posix.socket(
|
||||||
|
std.posix.AF.UNIX,
|
||||||
|
std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC,
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
const stream: std.net.Stream = .{ .handle = sockfd };
|
||||||
|
errdefer stream.close();
|
||||||
|
|
||||||
|
const addr = try std.net.Address.initUnix(socket_path);
|
||||||
|
const sockaddr_ptr: *const std.posix.sockaddr = @ptrCast(&addr.un);
|
||||||
|
try std.posix.bind(sockfd, sockaddr_ptr, addr.getOsSockLen());
|
||||||
|
try std.posix.listen(sockfd, max_clients);
|
||||||
|
|
||||||
|
const server: std.net.Server = .{
|
||||||
|
.listen_address = addr,
|
||||||
|
.stream = stream,
|
||||||
|
};
|
||||||
|
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
|
||||||
/// Block update dispatch wrapper. Panics if error occurs.
|
/// Block update dispatch wrapper. Panics if error occurs.
|
||||||
fn receiver(dispatcher: *dispatchers.BlockUpdate) void {
|
fn receiver(dispatcher: *dispatchers.BlockUpdate) void {
|
||||||
dispatcher.start() catch unreachable;
|
dispatcher.start() catch unreachable;
|
||||||
|
@ -28,9 +52,13 @@ fn receiver(dispatcher: *dispatchers.BlockUpdate) void {
|
||||||
pub fn start(allocator: std.mem.Allocator, config: *const Config) !void {
|
pub fn start(allocator: std.mem.Allocator, config: *const Config) !void {
|
||||||
log.info("Starting craftflut gateway", .{});
|
log.info("Starting craftflut gateway", .{});
|
||||||
|
|
||||||
log.info("Opening Unix socket: {s}", .{config.unix_socket_path});
|
log.info("Opening data Unix socket: {s}", .{config.unix_socket_path});
|
||||||
var stream = try std.net.connectUnixSocket(config.unix_socket_path);
|
std.posix.unlink(config.unix_socket_path) catch {};
|
||||||
defer stream.close();
|
var server = try openUnixSocketServer(config.unix_socket_path, 1);
|
||||||
|
defer {
|
||||||
|
server.deinit();
|
||||||
|
std.posix.unlink(config.unix_socket_path) catch {};
|
||||||
|
}
|
||||||
|
|
||||||
var queue = try msg_queue.MsgQueueManaged(models.BlockUpdateMsg)
|
var queue = try msg_queue.MsgQueueManaged(models.BlockUpdateMsg)
|
||||||
.init(allocator, config.dispatcher.capacity);
|
.init(allocator, config.dispatcher.capacity);
|
||||||
|
@ -40,7 +68,7 @@ pub fn start(allocator: std.mem.Allocator, config: *const Config) !void {
|
||||||
allocator,
|
allocator,
|
||||||
&config.dispatcher,
|
&config.dispatcher,
|
||||||
queue.unmanaged(),
|
queue.unmanaged(),
|
||||||
stream,
|
&server,
|
||||||
);
|
);
|
||||||
log.info("Starting dispatcher", .{});
|
log.info("Starting dispatcher", .{});
|
||||||
_ = try std.Thread.spawn(.{}, receiver, .{&dispatcher});
|
_ = try std.Thread.spawn(.{}, receiver, .{&dispatcher});
|
||||||
|
|
|
@ -11,7 +11,7 @@ material: []const u8,
|
||||||
/// Checks if material name characters are valid. Doesn't check if maximum length is exceeded.
|
/// Checks if material name characters are valid. Doesn't check if maximum length is exceeded.
|
||||||
pub fn materialIsValid(material: []const u8) bool {
|
pub fn materialIsValid(material: []const u8) bool {
|
||||||
for (material) |ch| {
|
for (material) |ch| {
|
||||||
if (!std.ascii.isAlphabetic(ch))
|
if (!std.ascii.isAlphabetic(ch) and ch != '_' and ch != '-')
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue