This repository has been archived on 2023-06-05. You can view files and clone it, but cannot push or open issues or pull requests.
service/src/service.cr

115 lines
2.1 KiB
Crystal

require "version_from_shard"
abstract class Service
VersionFromShard.declare
abstract def run(unit : Unit) : Unit?
def handle(unit : Unit?) : Unit?
if unit
unit.handle(self)
end
end
class Unit
property service
property handler_proc
alias Handler = (self, Service) -> Unit?
DEFAULT_HANLDER = ->(unit : self, service : Service) { service.run(unit).as(Unit?) }
NO_RESTART_HANDLER = ->(_unit : self, _service : Service) { nil }
def initialize(@service : Service, @handler_proc : Handler = DEFAULT_HANLDER)
end
def initialize(service : Service, &handler_proc : Handler)
initialize(service, handler_proc)
end
def handle(service : Service) : Unit?
handler_proc.call(self, service)
end
end
abstract class Starter
abstract def logic : Array(Service)
abstract def start : self
def self.handle(service : Service) : Nil
unit = service.run(Unit.new(service))
if unit
while unit = service.handle(unit.not_nil!)
end
end
end
end
abstract class ArrayStarter < Starter
getter services
def initialize(@services : Array(Service))
end
def logic : Array(Service)
@services
end
end
class SynchronousStarter < ArrayStarter
def start : self
l = logic
channel = Channel(Nil).new(l.size)
logic.each do |service|
spawn do
Starter.handle(service)
channel.send(nil)
end
end
l.size.times do
channel.receive
end
Fiber.yield
self
end
end
class AsynchronousStarter < ArrayStarter
def start : self
logic.each do |service|
Starter.handle(service)
end
self
end
end
abstract class Runner
Log = Log.for(self)
abstract def starters : Array(Starter)
def run : self
s = starters
channel = Channel(Nil).new(s.size)
s.each do |starter|
spawn do
starter.start
channel.send(nil)
end
end
s.size.times do
channel.receive
end
Fiber.yield
self
end
end
end