Files
deckui/lib/src/tcp/mod.rs

129 lines
4.0 KiB
Rust

use crate::*;
use std::{
fmt::Debug,
io::{self, Read, Write},
net::{IpAddr, SocketAddr, TcpListener, TcpStream},
result::Result::Ok,
thread,
time::Duration,
};
use anyhow::Result;
use crossbeam::channel::{self, Receiver, Sender};
use log::{error, info};
use rmp_serde::Serializer;
use serde::{Deserialize, Serialize};
const PORT: u16 = 6478;
pub fn create_listener(address: &str, port: u16) -> TcpListener {
TcpListener::bind((address, port)).unwrap_or_else(|e| {
if e.kind() == io::ErrorKind::AddrInUse {
info!("Port {} is already being used by another program", port);
std::process::exit(1);
} else if e.kind() == io::ErrorKind::AddrNotAvailable {
info!("Address {} not available", address);
std::process::exit(1);
} else {
panic!("{:?}", e);
}
})
}
fn handle_connection<R, S>(
mut stream: TcpStream,
reciever: Receiver<R>,
sender: Sender<S>,
origin: Origin,
) -> Result<()>
where
R: Serialize + Debug,
S: for<'a> Deserialize<'a> + Debug,
{
stream.set_nonblocking(true)?;
loop {
thread::sleep(Duration::from_millis(200));
while !reciever.is_empty() {
let msg: R = reciever.recv()?;
let mut msg_buf = Vec::new();
msg.serialize(&mut Serializer::new(&mut msg_buf))?;
info!("Sent to {origin} : {msg:?}");
stream.write(&msg_buf)?;
}
handle_read(&mut stream, &origin, &sender)?;
}
}
fn handle_read<S>(stream: &mut TcpStream, origin: &Origin, sender: &Sender<S>) -> Result<()>
where
S: for<'a> Deserialize<'a> + Debug,
{
let mut byte_read = 0;
let mut read_buff = [0u8; 4096];
if byte_read == 0 {
byte_read = match stream.read(&mut read_buff) {
Ok(0) => return Ok(()), // Socket closed
Ok(n) => n,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => 0, // If there is no data, return 0 bytes written
Err(_) => return Ok(()),
};
}
if byte_read > 0 {
let msg: S = rmp_serde::from_slice(&read_buff[0..byte_read]).unwrap();
info!("[{origin}] {:?}", msg);
sender.send(msg).expect("Could not send");
}
Ok(())
}
pub fn start_server() -> Result<(MessageToClient, MessageFomClient)> {
let (message_to_server, message_from_client) = channel::bounded(4);
let (message_to_client, message_from_server) = channel::bounded(4);
let server = create_listener("0.0.0.0", PORT);
info!("Listening on port {}", PORT);
server.set_nonblocking(true)?;
thread::spawn(move || {
for stream_result in server.incoming() {
thread::sleep(Duration::from_millis(500));
match stream_result {
Ok(stream) => {
let reciever_s = message_from_server.clone();
let sender_c = message_to_server.clone();
thread::spawn(move || {
if let Err(error) =
handle_connection(stream, reciever_s, sender_c, Origin::Client)
{
error!("error while handling stream: {}", error);
return;
}
})
} // Spawn a new thread, ignore the return value because we don't need to join threads
_ => continue,
};
}
});
Ok((message_to_client, message_from_client))
}
pub fn start_client(ip: IpAddr) -> Result<(MessageToServer, MessageFromServer)> {
let (message_to_server, message_from_client) = channel::bounded(4);
let (message_to_client, message_from_server) = channel::bounded(4);
let serv_addr = SocketAddr::new(ip, PORT);
let server = TcpStream::connect(serv_addr)?;
info!("Connected successfully to {}", serv_addr);
thread::spawn(move || handle_connection(server, message_from_client, message_to_client, Origin::Server));
Ok((message_to_server, message_from_server))
}