Some buffering in Lua side RPC logic to speed things up.

Flush remaining unexpected data before sending new messages.
This commit is contained in:
Florian Nücke
2021-01-04 16:36:06 +01:00
parent b07b4a0fe1
commit df40fa24dc

View File

@@ -74,53 +74,90 @@ DeviceBus.__index = DeviceBus
local message_delimiter = string.char(0)
local function parseError(result)
if result.type == "error" then
local function parseError(result, reason)
if result and result.type == "error" then
return result.data
elseif result then
return "unexpected message type: " .. result.type
else
return "unexpected message type " .. result.type
return "unexpected error: " .. (reason or "unknown error")
end
end
local function readOne(fd)
local result, status, errnum = poll.rpoll(fd, 10)
if result == 1 then
return unistd.read(fd, 1)
else
local function skipInput(bus)
repeat
local result, status, errnum = poll.rpoll(bus.fd, 0)
if result == 1 then
unistd.read(bus.fd, 1024)
end
until result ~= 1
end
local function fillBuffer(bus)
local result, status, errnum = poll.rpoll(bus.fd, -1)
if result == nil then
return result, status, errnum
elseif result == 0 then
return nil, "timeout"
else
bus.buffer = unistd.read(bus.fd, 1024)
bus.bufferLen = string.len(bus.buffer)
bus.bufferPos = 1
return true
end
end
local function readMessage(device)
local function clearBuffer(bus)
bus.buffer = nil
end
local function readOne(bus)
if not bus.buffer then
local result, status = fillBuffer(bus)
if not result then
return result, status
end
end
local result = bus.buffer:byte(bus.bufferPos)
if bus.bufferPos >= bus.bufferLen then
bus.buffer = nil
else
bus.bufferPos = bus.bufferPos + 1
end
return result
end
local function readMessage(bus)
local value
local message = ""
while true do
value = readOne(device.fd)
if not value then
unistd.sleep(1)
value, reason = readOne(bus)
if value == nil then -- error
return value, reason
else
if value == message_delimiter or value == 0 then
if value == 0 then
if message:match("%S") ~= nil then
local ok, result = pcall(cjson.decode, message)
if ok then
return result
else
return nil, result
end
else
message = ""
end
else
message = message .. value
message = message .. string.char(value)
end
end
end
end
local function writeMessage(device, data)
local function writeMessage(bus, data)
local message = cjson.encode(data)
return unistd.write(device.fd,
message_delimiter ..
message ..
message_delimiter)
return unistd.write(bus.fd, message_delimiter .. message .. message_delimiter)
end
function DeviceBus:new(path)
@@ -138,13 +175,19 @@ function DeviceBus:close()
unistd.close(self.fd)
end
function DeviceBus:flush()
clearBuffer(self)
skipInput(self)
end
function DeviceBus:list()
self:flush()
writeMessage(self, { type = "list" })
local result = readMessage(self)
if result.type == "list" then
local result, reason = readMessage(self)
if result and result.type == "list" then
return result.data
else
return nil, parseError(result)
return error(parseError(result))
end
end
@@ -183,26 +226,28 @@ function DeviceBus:find(deviceTypeName)
end
function DeviceBus:methods(deviceId)
self:flush()
writeMessage(self, { type = "methods", data = deviceId })
local result = readMessage(self)
if result.type == "methods" then
local result, reason = readMessage(self)
if result and result.type == "methods" then
return result.data
else
error(parseError(result))
error(parseError(result, reason))
end
end
function DeviceBus:invoke(deviceId, methodName, ...)
self:flush()
writeMessage(self, { type = "invoke", data = {
deviceId = deviceId,
name = methodName,
parameters = { ... }
} })
local result = readMessage(self)
if result.type == "result" then
}})
local result, reason = readMessage(self)
if result and result.type == "result" then
return result.data
else
error(parseError(result))
error(parseError(result, reason))
end
end