diff --git a/.busted b/.busted new file mode 100644 index 0000000..040c956 --- /dev/null +++ b/.busted @@ -0,0 +1,8 @@ +return { + _all = { + coverage = true + }, + default = { + verbose = true + } +} \ No newline at end of file diff --git a/.gitignore b/.gitignore index 6fd0a37..a0b8fb2 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ luac.out *.x86_64 *.hex +**/luacov-html/* \ No newline at end of file diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..d81c895 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,6 @@ +[submodule "external/du-serializer"] + path = external/du-serializer + url = git@github.com:PerMalmberg/du-serializer.git +[submodule "external/du-unit-testing"] + path = external/du-unit-testing + url = git@github.com:PerMalmberg/du-unit-testing.git diff --git a/.luacov b/.luacov new file mode 100644 index 0000000..1ee2544 --- /dev/null +++ b/.luacov @@ -0,0 +1,5 @@ +reporter = "html" +include = { + "src$", + "src%/.+$" +} diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ade26cb --- /dev/null +++ b/Makefile @@ -0,0 +1,37 @@ +.PHONY: clean test dev release +CLEAN_COV=if [ -e luacov.report.out ]; then rm luacov.report.out; fi; if [ -e luacov.stats.out ]; then rm luacov.stats.out; fi +PWD=$(shell pwd) + +LUA_PATH := ./src/?.lua +LUA_PATH := $(LUA_PATH);$(PWD)/external/du-serializer/?.lua +LUA_PATH := $(LUA_PATH);$(PWD)/external/du-unit-testing/src/?.lua +LUA_PATH := $(LUA_PATH);$(PWD)/external/du-unit-testing/external/du-lua-examples/?.lua +LUA_PATH := $(LUA_PATH);$(PWD)/external/du-unit-testing/external/du-lua-examples/api-mockup/?.lua +LUA_PATH := $(LUA_PATH);$(PWD)/external/du-unit-testing/external/du-luac/lua/?.lua + + +all: release + +lua_path: + @echo "$(LUA_PATH)" + +clean_cov: + @$(CLEAN_COV) + +clean_report: + @if [ -d ./luacov-html ]; then rm -rf ./luacov-html; fi + +clean: clean_cov clean_report + @rm -rf out + +test: clean + @echo Runnings unit tests on du-render + @LUA_PATH="$(LUA_PATH)" busted . --exclude-pattern=".*serializer.*" + @luacov + @$(CLEAN_COV) + +dev: test + @LUA_PATH="$(LUA_PATH)" du-lua build --copy=development/main + @# Modify file inline. Actual regex is '/^\s*---.*$/d' but $ must be doubled in make file + @sed -i '/^\s*---.*$$/d' "./out/development/example/stream/screen.lua" + @sed -i '/^\s*---.*$$/d' "./out/development/example/render/main.lua" diff --git a/README.md b/README.md index c4217c8..ba16ad7 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,5 @@ # du-stream -A data stream library for Dual Universe screens and emitter/receivers + +A stream library for Dual Universe + +This file was automatically generated by [DU-LuaC](https://github.com/wolfe-labs/DU-LuaC)'s interactive CLI. \ No newline at end of file diff --git a/du-stream.code-workspace b/du-stream.code-workspace new file mode 100644 index 0000000..0de7bd0 --- /dev/null +++ b/du-stream.code-workspace @@ -0,0 +1,21 @@ +{ + "folders": [ + { + "path": "." + } + ], + "settings": { + "runOnSave.commands": [ + { + "match": ".*\\.lua", + "command": "make test", + "runIn": "terminal" + } + ], + "Lua.workspace.library": [ + "external/du-libs/", + "${3rd}/luassert/library", + "${3rd}/busted/library" + ] + } +} \ No newline at end of file diff --git a/external/du-serializer b/external/du-serializer new file mode 160000 index 0000000..3372263 --- /dev/null +++ b/external/du-serializer @@ -0,0 +1 @@ +Subproject commit 3372263e49854c86a2226de80ac49167955e9c43 diff --git a/external/du-unit-testing b/external/du-unit-testing new file mode 160000 index 0000000..6df3fe7 --- /dev/null +++ b/external/du-unit-testing @@ -0,0 +1 @@ +Subproject commit 6df3fe7a1cb711be52c1af110bb43293f4f6f2ca diff --git a/project.json b/project.json new file mode 100644 index 0000000..6a9b3aa --- /dev/null +++ b/project.json @@ -0,0 +1,30 @@ +{ + "cli": { + "fmtVersion": 3 + }, + "name": "du-stream", + "description": "A stream library for Dual Universe", + "sourcePath": "src", + "outputPath": "out", + "libs": [], + "builds": { + "main": { + "name": "main", + "type": "control", + "slots": {} + } + }, + "targets": { + "development": { + "name": "development", + "minify": false, + "handleErrors": false + } + }, + "internalPaths": [ + "autoconf/", + "cpml/", + "pl/", + "utils/event" + ] +} \ No newline at end of file diff --git a/src/CommQueue.lua b/src/CommQueue.lua new file mode 100644 index 0000000..e69de29 diff --git a/src/Stream.lua b/src/Stream.lua new file mode 100644 index 0000000..f353932 --- /dev/null +++ b/src/Stream.lua @@ -0,0 +1,241 @@ +require("serializer") --QQQ from du-libs??? +local serializer = { + Serialize = serialize, + Deserialize = deserialize +} +---@module "interface.Device" + +---@alias CommQueue { queue:string[], waitingForReply:boolean, seq:integer } +---@alias ScreenLink {setScriptInput:fun(string), clearScriptOutput:fun(), getScriptOutput:fun():string} +---@alias Renderer {setOutput:fun(string), getInput:fun():string} +---@alias StreamData {output:CommQueue, input:CommQueue, lastReceived:number} + +---@class Stream +---@field New fun(device:Device, parent:DataReceiver, timeout:number):Stream +---@field Tick fun() +---@field Write fun(data:table|string) +---@field WaitingToSend fun():boolean + +--[[ + Data format: + #remaining_chucks|seq|cmd|payload + + Where: + - remaining_chunks is a 2-digit integer indicating how many chunks remains to complete the message. 0 means the last chuck. + - seq is a single digit seqence number, used to ensure we don't read the same data twice. It wraps around at 9. + - cmd is a two digit integer indicating what to do with the data + - payload is the actual payload, if any +]] +local headerSize = 1 -- # + + 2 -- remaining_chucks + + 1 -- | + + 1 -- seq + + 1 -- | + + 2 -- cmd + + 1 -- | + +---@enum StreamCommand +local Command = { + Reset = 0, + Poll = 1, + Ack = 2, + Data = 3, +} + +---Represents a stream between two entities. +local Stream = {} +Stream.__index = Stream + +---Create a new Stream +---@param device Device +---@param parent DataReceiver +---@param timeout number The amount of time to wait for a reply before considering the connection broken. +---@return Stream +function Stream.New(device, parent, timeout) -- QQQ Block size as argument or enum + local s = {} + local blockSize = 1024 - headerSize -- Game allows max 1024 bytes in buffers + + ---@diagnostic disable-next-line: undefined-global + local getTime = getTime or system.getUtcTime + + device.Clear() + + ---@type StreamData + local streamData = { + input = { queue = {}, waitingForReply = false, seq = 0 }, + output = { queue = {}, waitingForReply = false, seq = 0 }, + lastReceived = getTime() + } + + local input = streamData.input + local output = streamData.output + + ---Assembles the package + ---@param payload string + local function assemblePackage(payload) + local queue = input.queue + + if #queue == 0 then + table.insert(queue, "") + end + queue[#queue] = queue[#queue] .. payload + end + + ---Completes a transmission + ---@param count number + local function completeTransmission(count) + if count == 0 then + local queue = input.queue + + local deserialized = serializer.Deserialize(queue[#queue]) + + parent.OnData(deserialized) + -- Last part, begin new data + queue[1] = "" + end + end + + local function sameInput(commQueue, seq) + if seq == commQueue.seq then + return true + end + + commQueue.seq = seq + return false + end + + ---Creates a block + ---@param blockCount integer + ---@param commQueue CommQueue + ---@param cmd StreamCommand + ---@param payload string? + ---@return string + local function createBlock(blockCount, commQueue, cmd, payload) + commQueue.seq = (commQueue.seq + 1) + if commQueue.seq > 9 then + commQueue.seq = 0 + end + + payload = payload or "" + local b = string.format("#%0.2d|%0.1d|%0.2d|%s", blockCount, commQueue.seq, cmd, payload) + return b + end + + ---Reads incoming data + ---@return StreamCommand|nil #Command + ---@return number #Packet count + ---@return string #Payload + local function readData() + local r = device.Read() + + local count, seq, cmd, payload = r:match("^#(%d+)|(%d)|(%d+)|(.*)$") + + payload = payload or "" + local validPacket = count and cmd + if validPacket then + cmd = tonumber(cmd) + count = tonumber(count) or 0 + validPacket = validPacket and cmd and count + end + + if not validPacket then + return nil, 0, "" + end + + -- Since we can't clear the input when running in RenderScript, we have to rely on the sequence number to prevent duplicate data. + if sameInput(input, seq) then + return nil, 0, "" + end + + return cmd, count, payload + end + + ---Call this function in OnUpdate + function s.Tick() + local cmd, count, payload = readData() + + -- Did we get any input? + if cmd then + parent.OnTimeout(false, s) + streamData.lastReceived = getTime() + + if device.IsController() then + if cmd == Command.Data then + assemblePackage(payload) + completeTransmission(count) + end + -- No need to handle ACK, it's just a trigger to move on. + output.waitingForReply = false + else + local sendAck = false + + if cmd == Command.Poll or cmd == Command.Data then + if cmd == Command.Data then + assemblePackage(payload) + completeTransmission(count) + end + + -- Send either ACK or actual data as a reply + if #output.queue > 0 then + device.Send(table.remove(output.queue, 1)) + else + sendAck = true + end + elseif cmd == Command.Reset then + output.queue = {} + output.waitingForReply = false + input.queue = {} + input.waitingForReply = false + sendAck = true + end + + if sendAck then + device.Send(createBlock(0, output, Command.Ack)) + end + end + end + + if getTime() - streamData.lastReceived >= timeout then + parent.OnTimeout(true, s) + streamData.lastReceived = getTime() -- Reset to trigger again + output.queue = {} + output.waitingForReply = false + end + + if device.IsController() and not output.waitingForReply then + if #output.queue == 0 then + device.Send(createBlock(0, output, Command.Poll)) + output.waitingForReply = true + else + device.Send(table.remove(output.queue, 1)) + output.waitingForReply = true + end + end + end + + ---Write the data to the stream + ---@param dataToSend table|string + function s.Write(dataToSend) + local data = serializer.Serialize(dataToSend) + local blockCount = math.ceil(data:len() / blockSize) - 1 + + while data:len() > blockSize - headerSize do + local part = data:sub(1, blockSize) + data = data:sub(blockSize + 1) + table.insert(output.queue, createBlock(blockCount, output, Command.Data, part)) + blockCount = blockCount - 1 + end + + if data:len() > 0 then + table.insert(output.queue, createBlock(blockCount, output, Command.Data, data)) + end + end + + ---Returns true if there is data waiting to be sent. Good for holding off additional write. + ---@return boolean + function s.WaitingToSend() return #output.queue > 0 end + + return setmetatable(s, Stream) +end + +return Stream diff --git a/src/Stream_spec.lua b/src/Stream_spec.lua new file mode 100644 index 0000000..b7f7d63 --- /dev/null +++ b/src/Stream_spec.lua @@ -0,0 +1,113 @@ +local env = require("environment") +env.Prepare() + +local Stream = require("Stream") +local ScreenDevice = require("device/ScreenDevice") + +local toScreen = "" +local toController = "" + +local FakeScreen = {} +FakeScreen.__index = FakeScreen + +function FakeScreen.New() + local s = {} + + function s.setScriptInput(inp) + toScreen = inp + end + + function s.getScriptOutput() + return toController + end + + function s.clearScriptOutput() + toController = "" + end + + return setmetatable(s, FakeScreen) +end + +local FakeRenderScriptDevice = {} +FakeRenderScriptDevice.__index = FakeRenderScriptDevice + +---Creates a new screen device +---@return Device +function FakeRenderScriptDevice.New() + local s = {} + + function s.Send(data) + toController = data + end + + ---@return string + function s.Read() + return toScreen + end + + function s.Clear() + + end + + ---@return boolean + function s.IsController() + return false + end + + return setmetatable(s, FakeRenderScriptDevice) +end + +local DummyReceiver = {} +DummyReceiver.__index = DummyReceiver + +function DummyReceiver.New() + local s = { + isTimedOut = false, + data = nil + } + + function s.OnData(data) + s.data = data + end + + function s.OnTimeout(isTimedOut, stream) + s.isTimedOut = isTimedOut + end + + function s.IsTimedOut() + return s.isTimedOut + end + + ---@return any + function s.Data() + return s.data + end + + return setmetatable(s, DummyReceiver) +end + +describe("Stream", function() + it("Can send data to screen", function() + -- Controller side + local fakeScreen = FakeScreen.New() + local screenDevice = ScreenDevice.New(fakeScreen) + + local controller = DummyReceiver.New() + local controllerStream = Stream.New(screenDevice, controller, 1) + + -- Screen side + local worker = DummyReceiver.New() + local screenStream = Stream.New(FakeRenderScriptDevice.New(), worker, 1) + + + controllerStream.Write("1234567890") + for i = 1, 5, 1 do + controllerStream.Tick() + screenStream.Tick() + end + + assert.are_equal("1234567890", worker.Data()) + assert.is_false(controller.IsTimedOut()) + assert.is_false(worker.IsTimedOut()) + end) +end) diff --git a/src/device/Device.lua b/src/device/Device.lua new file mode 100644 index 0000000..3f1d41f --- /dev/null +++ b/src/device/Device.lua @@ -0,0 +1,4 @@ +---@alias voidf fun() +---@alias stringf fun():string +---@alias boolf fun():boolean +---@alias Device {Send:fun(data:string), Read:stringf, Clear:voidf, IsController:boolf} diff --git a/src/device/RenderScriptDevice.lua b/src/device/RenderScriptDevice.lua new file mode 100644 index 0000000..a50e66c --- /dev/null +++ b/src/device/RenderScriptDevice.lua @@ -0,0 +1,39 @@ +---@class RenderScriptDevice +---@field Send fun(data:string) +---@field Read fun():string +---@field Clear fun() +---@field IsController fun():boolean +---@field New fun(screenLink:table):Device + +local RenderScriptDevice = {} +RenderScriptDevice.__index = RenderScriptDevice + +---Creates a new screen device +---@return Device +function RenderScriptDevice.New() + local s = {} + + function s.Send(data) + ---@diagnostic disable-next-line: undefined-global + setOutput(data) + end + + ---@return string + function s.Read() + ---@diagnostic disable-next-line: undefined-global + return getInput() + end + + function s.Clear() + + end + + ---@return boolean + function s.IsController() + return false + end + + return setmetatable(s, RenderScriptDevice) +end + +return RenderScriptDevice diff --git a/src/device/ScreenDevice.lua b/src/device/ScreenDevice.lua new file mode 100644 index 0000000..e4253a3 --- /dev/null +++ b/src/device/ScreenDevice.lua @@ -0,0 +1,44 @@ +---@module "Device" + +---@class ScreenDevice +---@field Send fun(data:string) +---@field Read fun():string +---@field Clear fun() +---@field IsController fun():boolean +---@field New fun(screenLink:table):Device + +local ScreenDevice = {} +ScreenDevice.__index = ScreenDevice + +---Creates a new screen interface +---@param screenLink {getScriptOutput:stringf, clearScriptOutput:voidf, setScriptInput:fun(string)} +---@return Device +function ScreenDevice.New(screenLink) + local s = {} + + ---@param data string + function s.Send(data) + screenLink.setScriptInput(data) + end + + ---@return string + function s.Read() + local data = screenLink.getScriptOutput() + screenLink.clearScriptOutput() + return data + end + + function s.Clear() + screenLink.clearScriptOutput() + end + + ---@return boolean + function s.IsController() + -- We're running on the controller when we have a link to a screen + return true + end + + return setmetatable(s, ScreenDevice) +end + +return ScreenDevice diff --git a/src/interface/DataReceiver.lua b/src/interface/DataReceiver.lua new file mode 100644 index 0000000..85e780f --- /dev/null +++ b/src/interface/DataReceiver.lua @@ -0,0 +1 @@ +---@alias DataReceiver {OnData:fun(table), OnTimeout:fun(isTimedOut:boolean, stream:Stream)} diff --git a/src/interface/RxTx.lua b/src/interface/RxTx.lua new file mode 100644 index 0000000..0a6496f --- /dev/null +++ b/src/interface/RxTx.lua @@ -0,0 +1,59 @@ +---@module "interface.DeviceInterface" +---@alias onReceiveFunc fun(_:table, channel:string, message:string) + +---@alias EmitterLink {send:fun(channel:string, message:string), setChannelList:fun(table)} +---@alias ReceiverLink {setChannelList:fun(channels:channelList), onEvent:fun(_:table, event:string, f:onReceiveFunc)} + +---@class RxTx +---@field Send fun(data:string) +---@field Read fun():string +---@field Clear fun() +---@field SetChannel fun(string) +---@field New fun(emitter, receiver, channel:string, isController:boolean):Device + +---@alias channelList string[] + +local RxTx = {} +RxTx.__index = RxTx + +---Create a transmit/receive interface +---@param emitter EmitterLink The emitter link +---@param receiver ReceiverLink The receiver link +---@param channel string The channel to communicate on +---@param isController boolean If true, this device is considered the controller. +---@return Device +function RxTx.New(emitter, receiver, channel, isController) + local s = {} + + local inQueue = {} ---@type string[] + + receiver.setChannelList({ channel }) + + ---@diagnostic disable-next-line: undefined-field + receiver:onEvent("onReceive", function(_, chan, message) + inQueue[#inQueue + 1] = message + end) + + ---@param data string + function s.Send(data) + emitter.send(channel, data) + end + + ---@return string + function s.Read() + return table.remove(inQueue, 1) + end + + function s.Clear() + -- NOP + end + + ---@return boolean + function s.IsController() + return isController + end + + return setmetatable(s, RxTx) +end + +return RxTx