Better structure doc and examples

This commit is contained in:
2025-10-29 17:41:33 +01:00
parent 2167543a65
commit 4268c97d19
10 changed files with 600 additions and 298 deletions

View File

@@ -3,9 +3,15 @@ name = "oc2r-rust"
version = "0.1.0"
edition = "2024"
[workspace.lints.clippy]
enum_glob_use = "deny"
pedantic = { level = "warn", priority = -1 }
nursery = { level = "warn", priority = -1 }
unwrap_used = "deny"
[profile.release]
strip = true
opt-level = "z"
opt-level = "s"
lto = true
codegen-units = 1

70
README.md Normal file
View File

@@ -0,0 +1,70 @@
# oc2r-rust
Thin Rust bindings for the OC2R HLAPI controller. The crate mirrors the
behaviour of the Lua `devices.lua` helper: it speaks JSON over the first
VirtIO console (`/dev/hvc0`) and exposes ergonomic helpers for enumerating
devices, inspecting their methods, and invoking them.
## Status
- ✅ mirrors `devices.lua`: `list`, `get`, `find`, `find_all`, `methods`, `invoke`
-`Device` wrapper with lazy method cache
-`JsonValue` alias you can use to build invocation parameters
- ⏳ event subscription (`subscribe` / `unsubscribe`) not exposed yet
The low-level framing logic follows the Java implementation in
`RPCDeviceBusAdapter`: messages are framed with leading/trailing NUL bytes and
the console is switched into raw mode before use.
## Using the library
Add the crate to your workspace and interact with it directly:
```rust
use oc2r_rust::{DeviceBus, JsonValue, Result, DEFAULT_DEVICE_PATH};
fn main() -> Result<()> {
let mut bus = DeviceBus::connect(DEFAULT_DEVICE_PATH)?;
for device in bus.list()? {
println!("deviceId: {}", device.device_id);
println!(" types: {}", device.type_names.join(", "));
}
// Invoke a method on the first device as an example.
if let Some(mut device) = bus.device("some-device-id")? {
let response = device.invoke("help", &[JsonValue::Null])?;
println!("help(): {response:?}");
}
Ok(())
}
```
### Running the examples
Two runnable examples live in `examples/`:
- `list-devices` enumerate devices and pretty-print their metadata
- `invoke-method` list available methods for the first device and invoke one
Run them with:
```bash
cargo run --example list-devices
cargo run --example invoke-method
```
Both examples expect to run inside the OC2R VM or a compatible environment
where `/dev/hvc0` is exposed.
## Architecture overview
- `bus.rs` high-level `DeviceBus`/`Device` helpers
- `transport.rs` POSIX plumbing (termios, poll, read/write)
- `rpc.rs` miniserde data structures for OC2R messages
- `error.rs` shared error type and `Result<T>` alias
`DeviceBus` flushes the descriptor before sending each request, matching the
behaviour of both Lua and Java implementations and preventing stale replies
from leaking into new invocations.

41
examples/invoke-method.rs Normal file
View File

@@ -0,0 +1,41 @@
use oc2r_rust::{DEFAULT_DEVICE_PATH, DeviceBus, Result};
fn main() -> Result<()> {
let mut bus = DeviceBus::connect(DEFAULT_DEVICE_PATH)?;
let devices = bus.list()?;
let Some(first) = devices.first() else {
println!("No OC2R devices detected.");
return Ok(());
};
println!("Inspecting {}", first.device_id);
let mut device = bus
.device(&first.device_id)?
.expect("device disappeared after list()");
let methods = device.methods()?.to_vec();
for method in &methods {
println!("{}()", method.name);
}
if let Some(method) = methods.iter().find(|m| m.parameters.is_empty()) {
println!("Invoking {}()", method.name);
let result = device.invoke(&method.name, &[])?;
println!("Result: {result:?}");
} else {
println!("No zero-argument methods available to invoke.");
}
// Example for passing parameters:
// device.invoke(
// "setOutput",
// &[
// oc2r_rust::JsonValue::Number(1.into()),
// oc2r_rust::JsonValue::Bool(true),
// ],
// )?;
Ok(())
}

20
examples/list-devices.rs Normal file
View File

@@ -0,0 +1,20 @@
use oc2r_rust::{DEFAULT_DEVICE_PATH, DeviceBus, Result};
fn main() -> Result<()> {
let mut bus = DeviceBus::connect(DEFAULT_DEVICE_PATH)?;
let drained = bus.flush()?;
println!("Drained {drained} stale byte(s).");
for device in bus.list()? {
println!("deviceId: {}", device.device_id);
if !device.type_names.is_empty() {
println!(" typeNames: {}", device.type_names.join(", "));
}
if let Some(desc) = &device.description {
println!(" description: {desc}");
}
println!("--------");
}
Ok(())
}

148
src/bus.rs Normal file
View File

@@ -0,0 +1,148 @@
use crate::error::{Error, Result};
use crate::rpc::{DeviceEntry, JsonValue, MethodEntry, RpcRequest, RpcResponse};
use crate::transport::{flush_fd, read_message, set_raw, write_message};
use std::fs::OpenOptions;
use std::os::fd::{AsRawFd, RawFd};
use std::path::Path;
/// Connection handle to the OC2R RPC controller.
///
/// This is the Rust counterpart to Lua's `DeviceBus`. It serialises access to
/// `/dev/hvc0`, drains stale responses before each request and decodes the
/// JSON reply frames.
pub struct DeviceBus {
file: std::fs::File,
}
impl DeviceBus {
/// Open the console device (defaults to [`DEFAULT_DEVICE_PATH`](crate::DEFAULT_DEVICE_PATH))
/// and switch it to raw mode.
pub fn connect(path: impl AsRef<Path>) -> Result<Self> {
let file = OpenOptions::new()
.read(true)
.write(true)
.open(path.as_ref())?;
let fd = file.as_raw_fd();
set_raw(fd)?;
Ok(Self { file })
}
fn fd(&self) -> RawFd {
self.file.as_raw_fd()
}
pub fn flush(&mut self) -> Result<usize> {
flush_fd(self.fd())
}
/// Enumerate all devices currently exposed by the controller.
pub fn list(&mut self) -> Result<Vec<DeviceEntry>> {
self.flush()?;
write_message(self.fd(), &RpcRequest::List)?;
match read_message(self.fd())? {
RpcResponse::List { data } => Ok(data),
RpcResponse::Error { data } => Err(Error::Protocol(data)),
other => Err(Error::Protocol(format!(
"unexpected response to list: {other:?}"
))),
}
}
pub fn methods(&mut self, device_id: &str) -> Result<Vec<MethodEntry>> {
self.flush()?;
write_message(self.fd(), &RpcRequest::Methods { data: device_id })?;
match read_message(self.fd())? {
RpcResponse::Methods { data } => Ok(data),
RpcResponse::Error { data } => Err(Error::Protocol(data)),
other => Err(Error::Protocol(format!(
"unexpected response to methods: {other:?}"
))),
}
}
pub fn invoke(
&mut self,
device_id: &str,
name: &str,
parameters: &[JsonValue],
) -> Result<JsonValue> {
self.flush()?;
let request = RpcRequest::Invoke {
payload: crate::rpc::InvokePayload {
device_id,
name,
parameters,
},
};
write_message(self.fd(), &request)?;
match read_message(self.fd())? {
RpcResponse::Result { data } => Ok(data),
RpcResponse::Error { data } => Err(Error::Protocol(data)),
other => Err(Error::Protocol(format!(
"unexpected response to invoke: {other:?}"
))),
}
}
pub fn get(&mut self, device_id: &str) -> Result<Option<DeviceEntry>> {
let devices = self.list()?;
Ok(devices.into_iter().find(|d| d.device_id == device_id))
}
/// Look up the first device that advertises the provided type name.
pub fn find(&mut self, type_name: &str) -> Result<Option<DeviceEntry>> {
let devices = self.list()?;
Ok(devices
.into_iter()
.find(|d| d.type_names.iter().any(|t| t == type_name)))
}
/// Retrieve all devices that advertise the provided type name.
pub fn find_all(&mut self, type_name: &str) -> Result<Vec<DeviceEntry>> {
let devices = self.list()?;
Ok(devices
.into_iter()
.filter(|d| d.type_names.iter().any(|t| t == type_name))
.collect())
}
/// Borrow a `Device` helper for the supplied identifier.
pub fn device<'a>(&'a mut self, device_id: &str) -> Result<Option<Device<'a>>> {
let entry = match self.get(device_id)? {
Some(entry) => entry,
None => return Ok(None),
};
Ok(Some(Device {
bus: self,
entry,
cached_methods: None,
}))
}
}
/// High level device wrapper mirroring Lua's `Device` helper.
pub struct Device<'bus> {
bus: &'bus mut DeviceBus,
entry: DeviceEntry,
cached_methods: Option<Vec<MethodEntry>>,
}
impl<'bus> Device<'bus> {
/// Cached metadata returned by [`DeviceBus::list`].
pub fn info(&self) -> &DeviceEntry {
&self.entry
}
/// Fetch and cache the available method signatures for this device.
pub fn methods(&mut self) -> Result<&[MethodEntry]> {
if self.cached_methods.is_none() {
self.cached_methods = Some(self.bus.methods(&self.entry.device_id)?);
}
Ok(self.cached_methods.as_deref().unwrap_or(&[]))
}
/// Invoke `name` with JSON encoded parameters, returning the raw JSON result.
pub fn invoke(&mut self, name: &str, parameters: &[JsonValue]) -> Result<JsonValue> {
self.bus.invoke(&self.entry.device_id, name, parameters)
}
}

38
src/error.rs Normal file
View File

@@ -0,0 +1,38 @@
//! Error types shared across the OC2R RPC client.
use std::io;
#[derive(Debug)]
pub enum Error {
Io(io::Error),
Json(miniserde::Error),
Framing,
Protocol(String),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Error::Io(e)
}
}
impl From<miniserde::Error> for Error {
fn from(e: miniserde::Error) -> Self {
Error::Json(e)
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Io(e) => write!(f, "io error: {e}"),
Error::Json(e) => write!(f, "json error: {e}"),
Error::Framing => f.write_str("invalid message framing"),
Error::Protocol(msg) => write!(f, "protocol error: {msg}"),
}
}
}
impl std::error::Error for Error {}

34
src/lib.rs Normal file
View File

@@ -0,0 +1,34 @@
//! Minimal OC2R HLAPI client for Rust.
//!
//! This crate mirrors the behaviour of the bundled Lua `devices.lua`
//! helper. It communicates with the OC2R RPC controller over the first
//! VirtIO console device (`/dev/hvc0`) and exposes helpers for listing
//! devices, inspecting available methods and invoking them.
//!
//! # Quick start
//! ```no_run
//! use oc2r_rust::{DeviceBus, JsonValue, Result, DEFAULT_DEVICE_PATH};
//!
//! fn main() -> Result<()> {
//! let mut bus = DeviceBus::connect(DEFAULT_DEVICE_PATH)?;
//! let devices = bus.list()?;
//! println!("Found {} device(s)", devices.len());
//!
//! if let Some(device) = bus.device(&devices[0].device_id)? {
//! let response = device.invoke("help", &[JsonValue::Null])?;
//! println!("help(): {response:?}");
//! }
//! Ok(())
//! }
//! ```
pub mod bus;
pub mod error;
pub mod rpc;
mod transport;
pub use bus::{Device, DeviceBus};
pub use error::{Error, Result};
pub use rpc::{DeviceEntry, JsonValue, MethodEntry, MethodParameter};
pub const DEFAULT_DEVICE_PATH: &str = "/dev/hvc0";

View File

@@ -1,303 +1,28 @@
use miniserde::{Deserialize, Serialize, json};
use miniserde_enum::{Deserialize_enum, Serialize_enum};
use std::fmt;
use std::fs::OpenOptions;
use std::io;
use std::os::fd::{AsRawFd, RawFd};
use std::result;
#[derive(Serialize_enum, Debug)]
#[serde(tag = "type")]
pub enum RpcRequest<'a> {
#[serde(rename = "list")]
List,
#[serde(rename = "methods")]
Methods { data: &'a str },
#[serde(rename = "invoke")]
Invoke {
#[serde(rename = "data")]
payload: InvokePayload<'a>,
},
}
#[derive(Serialize, Debug)]
pub struct InvokePayload<'a> {
#[serde(rename = "deviceId")]
pub device_id: &'a str,
pub name: &'a str,
pub parameters: &'a [json::Value],
}
#[derive(Deserialize_enum, Debug)]
#[serde(tag = "type")]
pub enum RpcResponse {
#[serde(rename = "list")]
List {
data: Vec<DeviceEntry>,
},
#[serde(rename = "methods")]
Methods {
data: Vec<MethodEntry>,
},
#[serde(rename = "result")]
Result {
data: json::Value,
},
Error {
data: String,
},
}
#[derive(Deserialize, Debug)]
pub struct DeviceEntry {
#[serde(rename = "deviceId")]
pub device_id: String,
#[serde(rename = "typeNames")]
pub type_names: Vec<String>,
pub description: Option<String>,
}
impl fmt::Display for DeviceEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "deviceId: {}", self.device_id)?;
if !self.type_names.is_empty() {
writeln!(f, " type_names: {}", self.type_names.join(",\n "))?;
}
if let Some(desc) = &self.description {
writeln!(f, " description: {}", desc)?;
}
Ok(())
}
}
#[derive(Deserialize, Debug)]
pub struct MethodEntry {
pub name: String,
pub description: Option<String>,
pub parameters: Vec<MethodParameter>,
pub return_type: Option<String>,
}
#[derive(Deserialize, Debug)]
pub struct MethodParameter {
pub name: Option<String>,
pub description: Option<String>,
pub r#type: Option<String>,
}
type Result<T> = result::Result<T, Error>;
#[derive(Debug)]
enum Error {
Io(io::Error),
Json(miniserde::Error),
Framing,
Protocol(&'static str),
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Error::Io(e)
}
}
impl From<miniserde::Error> for Error {
fn from(e: miniserde::Error) -> Self {
Error::Json(e)
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Io(e) => write!(f, "io error: {e}"),
Error::Json(e) => write!(f, "json error: {e}"),
Error::Framing => f.write_str("invalid message framing"),
Error::Protocol(msg) => write!(f, "protocol error: {msg}"),
}
}
}
impl std::error::Error for Error {}
const DELIM: u8 = 0;
fn set_raw(fd: RawFd) -> Result<()> {
unsafe {
let mut term = std::mem::zeroed::<libc::termios>();
if libc::tcgetattr(fd, &mut term) != 0 {
return Err(Error::Io(io::Error::last_os_error()));
}
libc::cfmakeraw(&mut term);
term.c_cc[libc::VMIN] = 1;
term.c_cc[libc::VTIME] = 0;
if libc::tcsetattr(fd, libc::TCSANOW, &term) != 0 {
return Err(Error::Io(io::Error::last_os_error()));
}
}
Ok(())
}
fn flush(fd: RawFd) -> Result<()> {
println!("Flushing pending input…");
let mut buf = [0u8; 1024];
let mut total = 0usize;
loop {
let ready = loop {
let mut pfd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
let res = unsafe { libc::poll(&mut pfd, 1, 0) };
if res < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
return Err(Error::Io(err));
}
break res;
};
if ready == 0 {
if total == 0 {
println!("Flush complete; nothing pending.");
} else {
println!("Flush complete; drained {total} bytes.");
}
return Ok(());
}
loop {
let read = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), buf.len()) };
if read < 0 {
let err = io::Error::last_os_error();
match err.kind() {
io::ErrorKind::Interrupted => continue,
io::ErrorKind::WouldBlock => break,
_ => return Err(Error::Io(err)),
}
} else if read == 0 {
break;
} else {
total += read as usize;
if read as usize != buf.len() {
break;
}
}
}
}
}
#[inline]
fn write_all_fd(fd: RawFd, mut buf: &[u8]) -> Result<()> {
while !buf.is_empty() {
let written = unsafe { libc::write(fd, buf.as_ptr().cast(), buf.len()) };
if written < 0 {
let err = io::Error::last_os_error();
match err.kind() {
io::ErrorKind::Interrupted => continue,
io::ErrorKind::WouldBlock => continue,
_ => return Err(Error::Io(err)),
}
} else if written == 0 {
return Err(Error::Framing);
} else {
buf = &buf[written as usize..];
}
}
Ok(())
}
fn write_message(fd: RawFd, msg: &impl Serialize) -> Result<()> {
let payload = json::to_string(msg);
println!("Sending message: {}", payload);
write_all_fd(fd, &[DELIM])?;
write_all_fd(fd, payload.as_bytes())?;
write_all_fd(fd, &[DELIM])?;
Ok(())
}
fn wait_for_readable(fd: RawFd) -> Result<()> {
loop {
let mut pfd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
let res = unsafe { libc::poll(&mut pfd, 1, -1) };
if res < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
return Err(Error::Io(err));
}
if res > 0 {
return Ok(());
}
}
}
fn read_message(fd: RawFd) -> Result<RpcResponse> {
println!("Waiting for response…");
wait_for_readable(fd)?;
let mut buf = Vec::new();
let mut byte = [0u8; 1];
loop {
let read = unsafe { libc::read(fd, byte.as_mut_ptr().cast(), 1) };
if read < 0 {
let err = io::Error::last_os_error();
match err.kind() {
io::ErrorKind::Interrupted => continue,
io::ErrorKind::WouldBlock => {
wait_for_readable(fd)?;
continue;
}
_ => return Err(Error::Io(err)),
}
} else if read == 0 {
return Err(Error::Framing);
} else {
let value = byte[0];
if value == DELIM {
if buf.is_empty() {
continue;
}
let string = String::from_utf8_lossy(&buf);
println!("Received message: {}", string);
return Ok(json::from_str(&string)?);
} else {
buf.push(value);
}
}
}
}
use oc2r_rust::{DEFAULT_DEVICE_PATH, DeviceBus, DeviceEntry, Result};
fn main() -> Result<()> {
println!("Opening /dev/hvc0");
let file = OpenOptions::new()
.read(true)
.write(true)
.open("/dev/hvc0")?;
let fd = file.as_raw_fd();
println!("Opening {DEFAULT_DEVICE_PATH}");
let mut bus = DeviceBus::connect(DEFAULT_DEVICE_PATH)?;
println!("Configuring raw console mode…");
set_raw(fd)?;
println!("Console is now in raw mode.");
flush(fd)?;
let drained = bus.flush()?;
println!("Drained {drained} stale bytes.");
println!("Requesting device list");
write_message(fd, &RpcRequest::List)?;
match read_message(fd)? {
RpcResponse::List { data } => {
println!("--------");
for dev in data {
println!("{dev}")
}
Ok(())
}
RpcResponse::Error { .. } => Err(Error::Protocol("rpc error")),
RpcResponse::Methods { .. } => Err(Error::Protocol("unexpected methods response")),
RpcResponse::Result { .. } => Err(Error::Protocol("unexpected result response")),
println!("Listing devices");
let devices = bus.list()?;
for device in devices {
print_device(&device);
}
Ok(())
}
fn print_device(device: &DeviceEntry) {
println!("deviceId: {}", device.device_id);
if !device.type_names.is_empty() {
println!(" typeNames: {}", device.type_names.join(", "));
}
if let Some(desc) = &device.description {
println!(" description: {desc}");
}
println!("--------");
}

67
src/rpc.rs Normal file
View File

@@ -0,0 +1,67 @@
//! Miniserde data structures mirroring the OC2R RPC JSON messages.
use miniserde::{Deserialize, Serialize, json};
use miniserde_enum::{Deserialize_enum, Serialize_enum};
#[derive(Serialize_enum, Debug)]
#[serde(tag = "type")]
pub enum RpcRequest<'a> {
#[serde(rename = "list")]
List,
#[serde(rename = "methods")]
Methods { data: &'a str },
#[serde(rename = "invoke")]
Invoke {
#[serde(rename = "data")]
payload: InvokePayload<'a>,
},
}
#[derive(Serialize, Debug)]
pub struct InvokePayload<'a> {
#[serde(rename = "deviceId")]
pub device_id: &'a str,
pub name: &'a str,
pub parameters: &'a [json::Value],
}
#[derive(Deserialize_enum, Debug)]
#[serde(tag = "type")]
pub enum RpcResponse {
#[serde(rename = "list")]
List { data: Vec<DeviceEntry> },
#[serde(rename = "methods")]
Methods { data: Vec<MethodEntry> },
#[serde(rename = "result")]
Result { data: json::Value },
#[serde(rename = "error")]
Error { data: String },
}
#[derive(Deserialize, Debug, Clone)]
pub struct DeviceEntry {
#[serde(rename = "deviceId")]
pub device_id: String,
#[serde(rename = "typeNames")]
pub type_names: Vec<String>,
pub description: Option<String>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct MethodEntry {
pub name: String,
pub description: Option<String>,
pub parameters: Vec<MethodParameter>,
#[serde(rename = "returnType")]
pub return_type: Option<String>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct MethodParameter {
pub name: Option<String>,
pub description: Option<String>,
#[serde(rename = "type")]
pub r#type: Option<String>,
}
pub type JsonValue = json::Value;

153
src/transport.rs Normal file
View File

@@ -0,0 +1,153 @@
//! Thin wrappers around the POSIX APIs we need for OC2R framing.
use crate::error::{Error, Result};
use crate::rpc::RpcResponse;
use libc::{poll, pollfd};
use miniserde::{Serialize, json};
use std::io;
use std::os::fd::RawFd;
const DELIM: [u8; 1] = [0];
pub(crate) fn set_raw(fd: RawFd) -> Result<()> {
unsafe {
let mut term = std::mem::zeroed::<libc::termios>();
if libc::tcgetattr(fd, &mut term) != 0 {
return Err(Error::Io(io::Error::last_os_error()));
}
libc::cfmakeraw(&mut term);
term.c_cc[libc::VMIN] = 1;
term.c_cc[libc::VTIME] = 0;
if libc::tcsetattr(fd, libc::TCSANOW, &term) != 0 {
return Err(Error::Io(io::Error::last_os_error()));
}
}
Ok(())
}
pub(crate) fn flush_fd(fd: RawFd) -> Result<usize> {
let mut buf = [0u8; 1024];
let mut total = 0usize;
loop {
let ready = loop {
let mut pfd = pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
let res = unsafe { poll(&mut pfd, 1, 0) };
if res < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
return Err(Error::Io(err));
}
break res;
};
if ready == 0 {
return Ok(total);
}
loop {
let read = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), buf.len()) };
if read < 0 {
let err = io::Error::last_os_error();
match err.kind() {
io::ErrorKind::Interrupted => continue,
io::ErrorKind::WouldBlock => break,
_ => return Err(Error::Io(err)),
}
} else if read == 0 {
break;
} else {
total += read as usize;
if read as usize != buf.len() {
break;
}
}
}
}
}
pub(crate) fn write_message(fd: RawFd, msg: &impl Serialize) -> Result<()> {
let payload = json::to_string(msg);
write_all_fd(fd, &DELIM)?;
write_all_fd(fd, payload.as_bytes())?;
write_all_fd(fd, &DELIM)?;
Ok(())
}
fn write_all_fd(fd: RawFd, mut buf: &[u8]) -> Result<()> {
while !buf.is_empty() {
let written = unsafe { libc::write(fd, buf.as_ptr().cast(), buf.len()) };
if written < 0 {
let err = io::Error::last_os_error();
match err.kind() {
io::ErrorKind::Interrupted => continue,
io::ErrorKind::WouldBlock => continue,
_ => return Err(Error::Io(err)),
}
} else if written == 0 {
return Err(Error::Framing);
} else {
buf = &buf[written as usize..];
}
}
Ok(())
}
pub(crate) fn read_message(fd: RawFd) -> Result<RpcResponse> {
wait_for_readable(fd)?;
let mut buf = Vec::new();
let mut byte = [0u8; 1];
loop {
let read = unsafe { libc::read(fd, byte.as_mut_ptr().cast(), 1) };
if read < 0 {
let err = io::Error::last_os_error();
match err.kind() {
io::ErrorKind::Interrupted => continue,
io::ErrorKind::WouldBlock => {
wait_for_readable(fd)?;
continue;
}
_ => return Err(Error::Io(err)),
}
} else if read == 0 {
return Err(Error::Framing);
} else {
let value = byte[0];
if value == DELIM[0] {
if buf.is_empty() {
continue;
}
let message = String::from_utf8_lossy(&buf);
return Ok(json::from_str(&message)?);
} else {
buf.push(value);
}
}
}
}
fn wait_for_readable(fd: RawFd) -> Result<()> {
loop {
let mut pfd = pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
let res = unsafe { poll(&mut pfd, 1, -1) };
if res < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
return Err(Error::Io(err));
}
if res > 0 {
return Ok(());
}
}
}