diff --git a/Cargo.toml b/Cargo.toml index c17ac79..7d8bed3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,9 +3,15 @@ name = "oc2r-rust" version = "0.1.0" edition = "2024" +[workspace.lints.clippy] +enum_glob_use = "deny" +pedantic = { level = "warn", priority = -1 } +nursery = { level = "warn", priority = -1 } +unwrap_used = "deny" + [profile.release] strip = true -opt-level = "z" +opt-level = "s" lto = true codegen-units = 1 diff --git a/README.md b/README.md new file mode 100644 index 0000000..f6a6681 --- /dev/null +++ b/README.md @@ -0,0 +1,70 @@ +# oc2r-rust + +Thin Rust bindings for the OC2R HLAPI controller. The crate mirrors the +behaviour of the Lua `devices.lua` helper: it speaks JSON over the first +VirtIO console (`/dev/hvc0`) and exposes ergonomic helpers for enumerating +devices, inspecting their methods, and invoking them. + +## Status + +- ✅ mirrors `devices.lua`: `list`, `get`, `find`, `find_all`, `methods`, `invoke` +- ✅ `Device` wrapper with lazy method cache +- ✅ `JsonValue` alias you can use to build invocation parameters +- ⏳ event subscription (`subscribe` / `unsubscribe`) not exposed yet + +The low-level framing logic follows the Java implementation in +`RPCDeviceBusAdapter`: messages are framed with leading/trailing NUL bytes and +the console is switched into raw mode before use. + +## Using the library + +Add the crate to your workspace and interact with it directly: + +```rust +use oc2r_rust::{DeviceBus, JsonValue, Result, DEFAULT_DEVICE_PATH}; + +fn main() -> Result<()> { + let mut bus = DeviceBus::connect(DEFAULT_DEVICE_PATH)?; + + for device in bus.list()? { + println!("deviceId: {}", device.device_id); + println!(" types: {}", device.type_names.join(", ")); + } + + // Invoke a method on the first device as an example. + if let Some(mut device) = bus.device("some-device-id")? { + let response = device.invoke("help", &[JsonValue::Null])?; + println!("help(): {response:?}"); + } + + Ok(()) +} +``` + +### Running the examples + +Two runnable examples live in `examples/`: + +- `list-devices` – enumerate devices and pretty-print their metadata +- `invoke-method` – list available methods for the first device and invoke one + +Run them with: + +```bash +cargo run --example list-devices +cargo run --example invoke-method +``` + +Both examples expect to run inside the OC2R VM or a compatible environment +where `/dev/hvc0` is exposed. + +## Architecture overview + +- `bus.rs` – high-level `DeviceBus`/`Device` helpers +- `transport.rs` – POSIX plumbing (termios, poll, read/write) +- `rpc.rs` – miniserde data structures for OC2R messages +- `error.rs` – shared error type and `Result` alias + +`DeviceBus` flushes the descriptor before sending each request, matching the +behaviour of both Lua and Java implementations and preventing stale replies +from leaking into new invocations. diff --git a/examples/invoke-method.rs b/examples/invoke-method.rs new file mode 100644 index 0000000..0cd3b08 --- /dev/null +++ b/examples/invoke-method.rs @@ -0,0 +1,41 @@ +use oc2r_rust::{DEFAULT_DEVICE_PATH, DeviceBus, Result}; + +fn main() -> Result<()> { + let mut bus = DeviceBus::connect(DEFAULT_DEVICE_PATH)?; + let devices = bus.list()?; + + let Some(first) = devices.first() else { + println!("No OC2R devices detected."); + return Ok(()); + }; + + println!("Inspecting {}", first.device_id); + + let mut device = bus + .device(&first.device_id)? + .expect("device disappeared after list()"); + + let methods = device.methods()?.to_vec(); + for method in &methods { + println!("{}()", method.name); + } + + if let Some(method) = methods.iter().find(|m| m.parameters.is_empty()) { + println!("Invoking {}()", method.name); + let result = device.invoke(&method.name, &[])?; + println!("Result: {result:?}"); + } else { + println!("No zero-argument methods available to invoke."); + } + + // Example for passing parameters: + // device.invoke( + // "setOutput", + // &[ + // oc2r_rust::JsonValue::Number(1.into()), + // oc2r_rust::JsonValue::Bool(true), + // ], + // )?; + + Ok(()) +} diff --git a/examples/list-devices.rs b/examples/list-devices.rs new file mode 100644 index 0000000..adaebdf --- /dev/null +++ b/examples/list-devices.rs @@ -0,0 +1,20 @@ +use oc2r_rust::{DEFAULT_DEVICE_PATH, DeviceBus, Result}; + +fn main() -> Result<()> { + let mut bus = DeviceBus::connect(DEFAULT_DEVICE_PATH)?; + let drained = bus.flush()?; + println!("Drained {drained} stale byte(s)."); + + for device in bus.list()? { + println!("deviceId: {}", device.device_id); + if !device.type_names.is_empty() { + println!(" typeNames: {}", device.type_names.join(", ")); + } + if let Some(desc) = &device.description { + println!(" description: {desc}"); + } + println!("--------"); + } + + Ok(()) +} diff --git a/src/bus.rs b/src/bus.rs new file mode 100644 index 0000000..6a49d80 --- /dev/null +++ b/src/bus.rs @@ -0,0 +1,148 @@ +use crate::error::{Error, Result}; +use crate::rpc::{DeviceEntry, JsonValue, MethodEntry, RpcRequest, RpcResponse}; +use crate::transport::{flush_fd, read_message, set_raw, write_message}; +use std::fs::OpenOptions; +use std::os::fd::{AsRawFd, RawFd}; +use std::path::Path; + +/// Connection handle to the OC2R RPC controller. +/// +/// This is the Rust counterpart to Lua's `DeviceBus`. It serialises access to +/// `/dev/hvc0`, drains stale responses before each request and decodes the +/// JSON reply frames. +pub struct DeviceBus { + file: std::fs::File, +} + +impl DeviceBus { + /// Open the console device (defaults to [`DEFAULT_DEVICE_PATH`](crate::DEFAULT_DEVICE_PATH)) + /// and switch it to raw mode. + pub fn connect(path: impl AsRef) -> Result { + let file = OpenOptions::new() + .read(true) + .write(true) + .open(path.as_ref())?; + let fd = file.as_raw_fd(); + set_raw(fd)?; + Ok(Self { file }) + } + + fn fd(&self) -> RawFd { + self.file.as_raw_fd() + } + + pub fn flush(&mut self) -> Result { + flush_fd(self.fd()) + } + + /// Enumerate all devices currently exposed by the controller. + pub fn list(&mut self) -> Result> { + self.flush()?; + write_message(self.fd(), &RpcRequest::List)?; + match read_message(self.fd())? { + RpcResponse::List { data } => Ok(data), + RpcResponse::Error { data } => Err(Error::Protocol(data)), + other => Err(Error::Protocol(format!( + "unexpected response to list: {other:?}" + ))), + } + } + + pub fn methods(&mut self, device_id: &str) -> Result> { + self.flush()?; + write_message(self.fd(), &RpcRequest::Methods { data: device_id })?; + match read_message(self.fd())? { + RpcResponse::Methods { data } => Ok(data), + RpcResponse::Error { data } => Err(Error::Protocol(data)), + other => Err(Error::Protocol(format!( + "unexpected response to methods: {other:?}" + ))), + } + } + + pub fn invoke( + &mut self, + device_id: &str, + name: &str, + parameters: &[JsonValue], + ) -> Result { + self.flush()?; + let request = RpcRequest::Invoke { + payload: crate::rpc::InvokePayload { + device_id, + name, + parameters, + }, + }; + write_message(self.fd(), &request)?; + match read_message(self.fd())? { + RpcResponse::Result { data } => Ok(data), + RpcResponse::Error { data } => Err(Error::Protocol(data)), + other => Err(Error::Protocol(format!( + "unexpected response to invoke: {other:?}" + ))), + } + } + + pub fn get(&mut self, device_id: &str) -> Result> { + let devices = self.list()?; + Ok(devices.into_iter().find(|d| d.device_id == device_id)) + } + + /// Look up the first device that advertises the provided type name. + pub fn find(&mut self, type_name: &str) -> Result> { + let devices = self.list()?; + Ok(devices + .into_iter() + .find(|d| d.type_names.iter().any(|t| t == type_name))) + } + + /// Retrieve all devices that advertise the provided type name. + pub fn find_all(&mut self, type_name: &str) -> Result> { + let devices = self.list()?; + Ok(devices + .into_iter() + .filter(|d| d.type_names.iter().any(|t| t == type_name)) + .collect()) + } + + /// Borrow a `Device` helper for the supplied identifier. + pub fn device<'a>(&'a mut self, device_id: &str) -> Result>> { + let entry = match self.get(device_id)? { + Some(entry) => entry, + None => return Ok(None), + }; + Ok(Some(Device { + bus: self, + entry, + cached_methods: None, + })) + } +} + +/// High level device wrapper mirroring Lua's `Device` helper. +pub struct Device<'bus> { + bus: &'bus mut DeviceBus, + entry: DeviceEntry, + cached_methods: Option>, +} + +impl<'bus> Device<'bus> { + /// Cached metadata returned by [`DeviceBus::list`]. + pub fn info(&self) -> &DeviceEntry { + &self.entry + } + + /// Fetch and cache the available method signatures for this device. + pub fn methods(&mut self) -> Result<&[MethodEntry]> { + if self.cached_methods.is_none() { + self.cached_methods = Some(self.bus.methods(&self.entry.device_id)?); + } + Ok(self.cached_methods.as_deref().unwrap_or(&[])) + } + + /// Invoke `name` with JSON encoded parameters, returning the raw JSON result. + pub fn invoke(&mut self, name: &str, parameters: &[JsonValue]) -> Result { + self.bus.invoke(&self.entry.device_id, name, parameters) + } +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..3c50e54 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,38 @@ +//! Error types shared across the OC2R RPC client. + +use std::io; + +#[derive(Debug)] +pub enum Error { + Io(io::Error), + Json(miniserde::Error), + Framing, + Protocol(String), +} + +pub type Result = std::result::Result; + +impl From for Error { + fn from(e: io::Error) -> Self { + Error::Io(e) + } +} + +impl From for Error { + fn from(e: miniserde::Error) -> Self { + Error::Json(e) + } +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Error::Io(e) => write!(f, "io error: {e}"), + Error::Json(e) => write!(f, "json error: {e}"), + Error::Framing => f.write_str("invalid message framing"), + Error::Protocol(msg) => write!(f, "protocol error: {msg}"), + } + } +} + +impl std::error::Error for Error {} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..16015e0 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,34 @@ +//! Minimal OC2R HLAPI client for Rust. +//! +//! This crate mirrors the behaviour of the bundled Lua `devices.lua` +//! helper. It communicates with the OC2R RPC controller over the first +//! VirtIO console device (`/dev/hvc0`) and exposes helpers for listing +//! devices, inspecting available methods and invoking them. +//! +//! # Quick start +//! ```no_run +//! use oc2r_rust::{DeviceBus, JsonValue, Result, DEFAULT_DEVICE_PATH}; +//! +//! fn main() -> Result<()> { +//! let mut bus = DeviceBus::connect(DEFAULT_DEVICE_PATH)?; +//! let devices = bus.list()?; +//! println!("Found {} device(s)", devices.len()); +//! +//! if let Some(device) = bus.device(&devices[0].device_id)? { +//! let response = device.invoke("help", &[JsonValue::Null])?; +//! println!("help(): {response:?}"); +//! } +//! Ok(()) +//! } +//! ``` + +pub mod bus; +pub mod error; +pub mod rpc; +mod transport; + +pub use bus::{Device, DeviceBus}; +pub use error::{Error, Result}; +pub use rpc::{DeviceEntry, JsonValue, MethodEntry, MethodParameter}; + +pub const DEFAULT_DEVICE_PATH: &str = "/dev/hvc0"; diff --git a/src/main.rs b/src/main.rs index 1f92653..212b79d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,303 +1,28 @@ -use miniserde::{Deserialize, Serialize, json}; -use miniserde_enum::{Deserialize_enum, Serialize_enum}; -use std::fmt; -use std::fs::OpenOptions; -use std::io; -use std::os::fd::{AsRawFd, RawFd}; -use std::result; - -#[derive(Serialize_enum, Debug)] -#[serde(tag = "type")] -pub enum RpcRequest<'a> { - #[serde(rename = "list")] - List, - #[serde(rename = "methods")] - Methods { data: &'a str }, - #[serde(rename = "invoke")] - Invoke { - #[serde(rename = "data")] - payload: InvokePayload<'a>, - }, -} - -#[derive(Serialize, Debug)] -pub struct InvokePayload<'a> { - #[serde(rename = "deviceId")] - pub device_id: &'a str, - pub name: &'a str, - pub parameters: &'a [json::Value], -} - -#[derive(Deserialize_enum, Debug)] -#[serde(tag = "type")] -pub enum RpcResponse { - #[serde(rename = "list")] - List { - data: Vec, - }, - #[serde(rename = "methods")] - Methods { - data: Vec, - }, - #[serde(rename = "result")] - Result { - data: json::Value, - }, - Error { - data: String, - }, -} - -#[derive(Deserialize, Debug)] -pub struct DeviceEntry { - #[serde(rename = "deviceId")] - pub device_id: String, - #[serde(rename = "typeNames")] - pub type_names: Vec, - pub description: Option, -} - -impl fmt::Display for DeviceEntry { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - writeln!(f, "deviceId: {}", self.device_id)?; - if !self.type_names.is_empty() { - writeln!(f, " type_names: {}", self.type_names.join(",\n "))?; - } - if let Some(desc) = &self.description { - writeln!(f, " description: {}", desc)?; - } - Ok(()) - } -} - -#[derive(Deserialize, Debug)] -pub struct MethodEntry { - pub name: String, - pub description: Option, - pub parameters: Vec, - pub return_type: Option, -} - -#[derive(Deserialize, Debug)] -pub struct MethodParameter { - pub name: Option, - pub description: Option, - pub r#type: Option, -} - -type Result = result::Result; - -#[derive(Debug)] -enum Error { - Io(io::Error), - Json(miniserde::Error), - Framing, - Protocol(&'static str), -} -impl From for Error { - fn from(e: io::Error) -> Self { - Error::Io(e) - } -} -impl From for Error { - fn from(e: miniserde::Error) -> Self { - Error::Json(e) - } -} - -impl std::fmt::Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Error::Io(e) => write!(f, "io error: {e}"), - Error::Json(e) => write!(f, "json error: {e}"), - Error::Framing => f.write_str("invalid message framing"), - Error::Protocol(msg) => write!(f, "protocol error: {msg}"), - } - } -} - -impl std::error::Error for Error {} - -const DELIM: u8 = 0; - -fn set_raw(fd: RawFd) -> Result<()> { - unsafe { - let mut term = std::mem::zeroed::(); - if libc::tcgetattr(fd, &mut term) != 0 { - return Err(Error::Io(io::Error::last_os_error())); - } - libc::cfmakeraw(&mut term); - term.c_cc[libc::VMIN] = 1; - term.c_cc[libc::VTIME] = 0; - if libc::tcsetattr(fd, libc::TCSANOW, &term) != 0 { - return Err(Error::Io(io::Error::last_os_error())); - } - } - Ok(()) -} - -fn flush(fd: RawFd) -> Result<()> { - println!("Flushing pending input…"); - let mut buf = [0u8; 1024]; - let mut total = 0usize; - loop { - let ready = loop { - let mut pfd = libc::pollfd { - fd, - events: libc::POLLIN, - revents: 0, - }; - let res = unsafe { libc::poll(&mut pfd, 1, 0) }; - if res < 0 { - let err = io::Error::last_os_error(); - if err.kind() == io::ErrorKind::Interrupted { - continue; - } - return Err(Error::Io(err)); - } - break res; - }; - - if ready == 0 { - if total == 0 { - println!("Flush complete; nothing pending."); - } else { - println!("Flush complete; drained {total} bytes."); - } - return Ok(()); - } - - loop { - let read = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), buf.len()) }; - if read < 0 { - let err = io::Error::last_os_error(); - match err.kind() { - io::ErrorKind::Interrupted => continue, - io::ErrorKind::WouldBlock => break, - _ => return Err(Error::Io(err)), - } - } else if read == 0 { - break; - } else { - total += read as usize; - if read as usize != buf.len() { - break; - } - } - } - } -} - -#[inline] -fn write_all_fd(fd: RawFd, mut buf: &[u8]) -> Result<()> { - while !buf.is_empty() { - let written = unsafe { libc::write(fd, buf.as_ptr().cast(), buf.len()) }; - if written < 0 { - let err = io::Error::last_os_error(); - match err.kind() { - io::ErrorKind::Interrupted => continue, - io::ErrorKind::WouldBlock => continue, - _ => return Err(Error::Io(err)), - } - } else if written == 0 { - return Err(Error::Framing); - } else { - buf = &buf[written as usize..]; - } - } - Ok(()) -} - -fn write_message(fd: RawFd, msg: &impl Serialize) -> Result<()> { - let payload = json::to_string(msg); - println!("Sending message: {}", payload); - write_all_fd(fd, &[DELIM])?; - write_all_fd(fd, payload.as_bytes())?; - write_all_fd(fd, &[DELIM])?; - Ok(()) -} - -fn wait_for_readable(fd: RawFd) -> Result<()> { - loop { - let mut pfd = libc::pollfd { - fd, - events: libc::POLLIN, - revents: 0, - }; - let res = unsafe { libc::poll(&mut pfd, 1, -1) }; - if res < 0 { - let err = io::Error::last_os_error(); - if err.kind() == io::ErrorKind::Interrupted { - continue; - } - return Err(Error::Io(err)); - } - if res > 0 { - return Ok(()); - } - } -} - -fn read_message(fd: RawFd) -> Result { - println!("Waiting for response…"); - wait_for_readable(fd)?; - let mut buf = Vec::new(); - let mut byte = [0u8; 1]; - loop { - let read = unsafe { libc::read(fd, byte.as_mut_ptr().cast(), 1) }; - if read < 0 { - let err = io::Error::last_os_error(); - match err.kind() { - io::ErrorKind::Interrupted => continue, - io::ErrorKind::WouldBlock => { - wait_for_readable(fd)?; - continue; - } - _ => return Err(Error::Io(err)), - } - } else if read == 0 { - return Err(Error::Framing); - } else { - let value = byte[0]; - if value == DELIM { - if buf.is_empty() { - continue; - } - let string = String::from_utf8_lossy(&buf); - println!("Received message: {}", string); - return Ok(json::from_str(&string)?); - } else { - buf.push(value); - } - } - } -} +use oc2r_rust::{DEFAULT_DEVICE_PATH, DeviceBus, DeviceEntry, Result}; fn main() -> Result<()> { - println!("Opening /dev/hvc0…"); - let file = OpenOptions::new() - .read(true) - .write(true) - .open("/dev/hvc0")?; - let fd = file.as_raw_fd(); + println!("Opening {DEFAULT_DEVICE_PATH}…"); + let mut bus = DeviceBus::connect(DEFAULT_DEVICE_PATH)?; - println!("Configuring raw console mode…"); - set_raw(fd)?; - println!("Console is now in raw mode."); - flush(fd)?; + let drained = bus.flush()?; + println!("Drained {drained} stale bytes."); - println!("Requesting device list…"); - write_message(fd, &RpcRequest::List)?; - match read_message(fd)? { - RpcResponse::List { data } => { - println!("--------"); - for dev in data { - println!("{dev}") - } - Ok(()) - } - RpcResponse::Error { .. } => Err(Error::Protocol("rpc error")), - RpcResponse::Methods { .. } => Err(Error::Protocol("unexpected methods response")), - RpcResponse::Result { .. } => Err(Error::Protocol("unexpected result response")), + println!("Listing devices…"); + let devices = bus.list()?; + for device in devices { + print_device(&device); } + + Ok(()) +} + +fn print_device(device: &DeviceEntry) { + println!("deviceId: {}", device.device_id); + if !device.type_names.is_empty() { + println!(" typeNames: {}", device.type_names.join(", ")); + } + if let Some(desc) = &device.description { + println!(" description: {desc}"); + } + println!("--------"); } diff --git a/src/rpc.rs b/src/rpc.rs new file mode 100644 index 0000000..2524e76 --- /dev/null +++ b/src/rpc.rs @@ -0,0 +1,67 @@ +//! Miniserde data structures mirroring the OC2R RPC JSON messages. + +use miniserde::{Deserialize, Serialize, json}; +use miniserde_enum::{Deserialize_enum, Serialize_enum}; + +#[derive(Serialize_enum, Debug)] +#[serde(tag = "type")] +pub enum RpcRequest<'a> { + #[serde(rename = "list")] + List, + #[serde(rename = "methods")] + Methods { data: &'a str }, + #[serde(rename = "invoke")] + Invoke { + #[serde(rename = "data")] + payload: InvokePayload<'a>, + }, +} + +#[derive(Serialize, Debug)] +pub struct InvokePayload<'a> { + #[serde(rename = "deviceId")] + pub device_id: &'a str, + pub name: &'a str, + pub parameters: &'a [json::Value], +} + +#[derive(Deserialize_enum, Debug)] +#[serde(tag = "type")] +pub enum RpcResponse { + #[serde(rename = "list")] + List { data: Vec }, + #[serde(rename = "methods")] + Methods { data: Vec }, + #[serde(rename = "result")] + Result { data: json::Value }, + #[serde(rename = "error")] + Error { data: String }, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct DeviceEntry { + #[serde(rename = "deviceId")] + pub device_id: String, + #[serde(rename = "typeNames")] + pub type_names: Vec, + pub description: Option, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct MethodEntry { + pub name: String, + pub description: Option, + pub parameters: Vec, + #[serde(rename = "returnType")] + pub return_type: Option, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct MethodParameter { + pub name: Option, + pub description: Option, + #[serde(rename = "type")] + pub r#type: Option, +} + +pub type JsonValue = json::Value; diff --git a/src/transport.rs b/src/transport.rs new file mode 100644 index 0000000..7f99639 --- /dev/null +++ b/src/transport.rs @@ -0,0 +1,153 @@ +//! Thin wrappers around the POSIX APIs we need for OC2R framing. + +use crate::error::{Error, Result}; +use crate::rpc::RpcResponse; +use libc::{poll, pollfd}; +use miniserde::{Serialize, json}; +use std::io; +use std::os::fd::RawFd; + +const DELIM: [u8; 1] = [0]; + +pub(crate) fn set_raw(fd: RawFd) -> Result<()> { + unsafe { + let mut term = std::mem::zeroed::(); + if libc::tcgetattr(fd, &mut term) != 0 { + return Err(Error::Io(io::Error::last_os_error())); + } + libc::cfmakeraw(&mut term); + term.c_cc[libc::VMIN] = 1; + term.c_cc[libc::VTIME] = 0; + if libc::tcsetattr(fd, libc::TCSANOW, &term) != 0 { + return Err(Error::Io(io::Error::last_os_error())); + } + } + Ok(()) +} + +pub(crate) fn flush_fd(fd: RawFd) -> Result { + let mut buf = [0u8; 1024]; + let mut total = 0usize; + loop { + let ready = loop { + let mut pfd = pollfd { + fd, + events: libc::POLLIN, + revents: 0, + }; + let res = unsafe { poll(&mut pfd, 1, 0) }; + if res < 0 { + let err = io::Error::last_os_error(); + if err.kind() == io::ErrorKind::Interrupted { + continue; + } + return Err(Error::Io(err)); + } + break res; + }; + + if ready == 0 { + return Ok(total); + } + + loop { + let read = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), buf.len()) }; + if read < 0 { + let err = io::Error::last_os_error(); + match err.kind() { + io::ErrorKind::Interrupted => continue, + io::ErrorKind::WouldBlock => break, + _ => return Err(Error::Io(err)), + } + } else if read == 0 { + break; + } else { + total += read as usize; + if read as usize != buf.len() { + break; + } + } + } + } +} + +pub(crate) fn write_message(fd: RawFd, msg: &impl Serialize) -> Result<()> { + let payload = json::to_string(msg); + write_all_fd(fd, &DELIM)?; + write_all_fd(fd, payload.as_bytes())?; + write_all_fd(fd, &DELIM)?; + Ok(()) +} + +fn write_all_fd(fd: RawFd, mut buf: &[u8]) -> Result<()> { + while !buf.is_empty() { + let written = unsafe { libc::write(fd, buf.as_ptr().cast(), buf.len()) }; + if written < 0 { + let err = io::Error::last_os_error(); + match err.kind() { + io::ErrorKind::Interrupted => continue, + io::ErrorKind::WouldBlock => continue, + _ => return Err(Error::Io(err)), + } + } else if written == 0 { + return Err(Error::Framing); + } else { + buf = &buf[written as usize..]; + } + } + Ok(()) +} + +pub(crate) fn read_message(fd: RawFd) -> Result { + wait_for_readable(fd)?; + let mut buf = Vec::new(); + let mut byte = [0u8; 1]; + loop { + let read = unsafe { libc::read(fd, byte.as_mut_ptr().cast(), 1) }; + if read < 0 { + let err = io::Error::last_os_error(); + match err.kind() { + io::ErrorKind::Interrupted => continue, + io::ErrorKind::WouldBlock => { + wait_for_readable(fd)?; + continue; + } + _ => return Err(Error::Io(err)), + } + } else if read == 0 { + return Err(Error::Framing); + } else { + let value = byte[0]; + if value == DELIM[0] { + if buf.is_empty() { + continue; + } + let message = String::from_utf8_lossy(&buf); + return Ok(json::from_str(&message)?); + } else { + buf.push(value); + } + } + } +} + +fn wait_for_readable(fd: RawFd) -> Result<()> { + loop { + let mut pfd = pollfd { + fd, + events: libc::POLLIN, + revents: 0, + }; + let res = unsafe { poll(&mut pfd, 1, -1) }; + if res < 0 { + let err = io::Error::last_os_error(); + if err.kind() == io::ErrorKind::Interrupted { + continue; + } + return Err(Error::Io(err)); + } + if res > 0 { + return Ok(()); + } + } +}