diff --git a/context/context.rs b/context/context.rs index 8c09573..1093f17 100644 --- a/context/context.rs +++ b/context/context.rs @@ -1,12 +1,13 @@ use alloc::arc::Arc; use alloc::boxed::Box; -use collections::{BTreeMap, Vec, VecDeque}; +use collections::{BTreeMap, Vec}; use spin::Mutex; use arch; +use context::file::File; +use context::memory::{Grant, Memory, SharedMemory}; use syscall::data::Event; -use super::file::File; -use super::memory::{Grant, Memory, SharedMemory}; +use sync::{WaitCondition, WaitQueue}; #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum Status { @@ -36,6 +37,10 @@ pub struct Context { pub running: bool, /// Context is halting parent pub vfork: bool, + /// Context is being waited on + pub waitpid: Arc, + /// Context should wake up at specified time + pub wake: Option<(u64, u64)>, /// The architecture specific context pub arch: arch::context::Context, /// Kernel FX @@ -53,7 +58,7 @@ pub struct Context { /// The current working directory pub cwd: Arc>>, /// Kernel events - pub events: Arc>>, + pub events: Arc>, /// The process environment pub env: Arc, Arc>>>>>, /// The open files in the scheme @@ -73,6 +78,8 @@ impl Context { status: Status::Blocked, running: false, vfork: false, + waitpid: Arc::new(WaitCondition::new()), + wake: None, arch: arch::context::Context::new(), kfx: None, kstack: None, @@ -81,7 +88,7 @@ impl Context { stack: None, grants: Arc::new(Mutex::new(Vec::new())), cwd: Arc::new(Mutex::new(Vec::new())), - events: Arc::new(Mutex::new(VecDeque::new())), + events: Arc::new(WaitQueue::new()), env: Arc::new(Mutex::new(BTreeMap::new())), files: Arc::new(Mutex::new(Vec::new())) } @@ -128,6 +135,24 @@ impl Context { } } + pub fn block(&mut self) -> bool { + if self.status == Status::Runnable { + self.status = Status::Blocked; + true + } else { + false + } + } + + pub fn unblock(&mut self) -> bool { + if self.status == Status::Blocked { + self.status = Status::Runnable; + true + } else { + false + } + } + /// Add a file to the lowest available slot. /// Return the file descriptor number or None if no slot was found pub fn add_file(&self, file: File) -> Option { diff --git a/context/event.rs b/context/event.rs index 06fea60..19d119d 100644 --- a/context/event.rs +++ b/context/event.rs @@ -1,11 +1,12 @@ use alloc::arc::{Arc, Weak}; -use collections::{BTreeMap, VecDeque}; -use spin::{Mutex, Once, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use collections::BTreeMap; +use spin::{Once, RwLock, RwLockReadGuard, RwLockWriteGuard}; use context; +use sync::WaitQueue; use syscall::data::Event; -type EventList = Weak>>; +type EventList = Weak>; type Registry = BTreeMap<(usize, usize), BTreeMap<(usize, usize), EventList>>; @@ -67,9 +68,8 @@ pub fn trigger(scheme_id: usize, id: usize, flags: usize, data: usize) { let registry = registry(); if let Some(event_lists) = registry.get(&(scheme_id, id)) { for entry in event_lists.iter() { - if let Some(event_list_lock) = entry.1.upgrade() { - let mut event_list = event_list_lock.lock(); - event_list.push_back(Event { + if let Some(event_list) = entry.1.upgrade() { + event_list.send(Event { id: (entry.0).1, flags: flags, data: data diff --git a/context/switch.rs b/context/switch.rs index 9065530..0e291e6 100644 --- a/context/switch.rs +++ b/context/switch.rs @@ -16,36 +16,55 @@ pub unsafe fn switch() -> bool { arch::interrupt::pause(); } - let from_ptr = { - let contexts = contexts(); - let context_lock = contexts.current().expect("context::switch: Not inside of context"); - let mut context = context_lock.write(); - context.deref_mut() as *mut Context - }; - + let from_ptr; let mut to_ptr = 0 as *mut Context; - - for (pid, context_lock) in contexts().iter() { - if *pid > (*from_ptr).id { + { + let contexts = contexts(); + { + let context_lock = contexts.current().expect("context::switch: Not inside of context"); let mut context = context_lock.write(); - if context.status == Status::Runnable && ! context.running { - to_ptr = context.deref_mut() as *mut Context; - break; - } + from_ptr = context.deref_mut() as *mut Context; } - } - if to_ptr as usize == 0 { - for (pid, context_lock) in contexts().iter() { - if *pid < (*from_ptr).id { + let check_context = |context: &mut Context| -> bool { + if context.status == Status::Blocked && context.wake.is_some() { + let wake = context.wake.expect("context::switch: wake not set"); + + let current = arch::time::monotonic(); + if current.0 > wake.0 || (current.0 == wake.0 && current.1 >= wake.1) { + context.unblock(); + } + } + + if context.status == Status::Runnable && ! context.running { + true + } else { + false + } + }; + + for (pid, context_lock) in contexts.iter() { + if *pid > (*from_ptr).id { let mut context = context_lock.write(); - if context.status == Status::Runnable && ! context.running { + if check_context(&mut context) { to_ptr = context.deref_mut() as *mut Context; break; } } } - } + + if to_ptr as usize == 0 { + for (pid, context_lock) in contexts.iter() { + if *pid < (*from_ptr).id { + let mut context = context_lock.write(); + if check_context(&mut context) { + to_ptr = context.deref_mut() as *mut Context; + break; + } + } + } + } + }; if to_ptr as usize == 0 { // Unset global lock if no context found diff --git a/lib.rs b/lib.rs index 3119c25..d71af53 100644 --- a/lib.rs +++ b/lib.rs @@ -69,6 +69,7 @@ #![feature(asm)] #![feature(collections)] #![feature(const_fn)] +#![feature(core_intrinsics)] #![feature(drop_types_in_const)] #![feature(heap_api)] #![feature(integer_atomics)] @@ -115,6 +116,9 @@ pub mod elf; /// Schemes, filesystem handlers pub mod scheme; +/// Synchronization primitives +pub mod sync; + /// Syscall handlers pub mod syscall; diff --git a/scheme/debug.rs b/scheme/debug.rs index 52d9a8b..e082ac6 100644 --- a/scheme/debug.rs +++ b/scheme/debug.rs @@ -1,31 +1,27 @@ -use collections::VecDeque; use core::str; use core::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; -use spin::{Mutex, Once}; +use spin::Once; use context; +use sync::WaitQueue; use syscall::error::*; use syscall::flag::EVENT_READ; use syscall::scheme::Scheme; pub static DEBUG_SCHEME_ID: AtomicUsize = ATOMIC_USIZE_INIT; -/// Input -static INPUT: Once>> = Once::new(); +/// Input queue +static INPUT: Once> = Once::new(); -/// Initialize contexts, called if needed -fn init_input() -> Mutex> { - Mutex::new(VecDeque::new()) +/// Initialize input queue, called if needed +fn init_input() -> WaitQueue { + WaitQueue::new() } -/// Get the global schemes list, const +/// Add to the input queue #[no_mangle] pub extern fn debug_input(b: u8) { - let len = { - let mut input = INPUT.call_once(init_input).lock(); - input.push_back(b); - input.len() - }; + let len = INPUT.call_once(init_input).send(b); context::event::trigger(DEBUG_SCHEME_ID.load(Ordering::SeqCst), 0, EVENT_READ, len); } @@ -45,22 +41,7 @@ impl Scheme for DebugScheme { /// /// Returns the number of bytes read fn read(&self, _file: usize, buf: &mut [u8]) -> Result { - loop { - let mut i = 0; - { - let mut input = INPUT.call_once(init_input).lock(); - while i < buf.len() && ! input.is_empty() { - buf[i] = input.pop_front().expect("debug_input lost byte"); - i += 1; - } - } - - if i > 0 { - return Ok(i); - } else { - unsafe { context::switch(); } //TODO: Block - } - } + Ok(INPUT.call_once(init_input).receive_into(buf)) } /// Write the `buffer` to the `file` diff --git a/scheme/event.rs b/scheme/event.rs index 8229559..1634e1b 100644 --- a/scheme/event.rs +++ b/scheme/event.rs @@ -1,17 +1,18 @@ use alloc::arc::{Arc, Weak}; -use collections::{BTreeMap, VecDeque}; -use core::mem; +use collections::BTreeMap; +use core::{mem, slice}; use core::sync::atomic::{AtomicUsize, Ordering}; -use spin::{Mutex, RwLock}; +use spin::RwLock; use context; +use sync::WaitQueue; use syscall::data::Event; use syscall::error::*; use syscall::scheme::Scheme; pub struct EventScheme { next_id: AtomicUsize, - handles: RwLock>>>> + handles: RwLock>>> } impl EventScheme { @@ -57,29 +58,8 @@ impl Scheme for EventScheme { handle_weak.upgrade().ok_or(Error::new(EBADF))? }; - let event_size = mem::size_of::(); - let len = buf.len()/event_size; - if len > 0 { - loop { - let mut i = 0; - { - let mut events = handle.lock(); - while ! events.is_empty() && i < len { - let event = events.pop_front().unwrap(); - unsafe { *(buf.as_mut_ptr() as *mut Event).offset(i as isize) = event; } - i += 1; - } - } - - if i > 0 { - return Ok(i * event_size); - } else { - unsafe { context::switch(); } //TODO: Block - } - } - } else { - Ok(0) - } + let event_buf = unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr() as *mut Event, buf.len()/mem::size_of::()) }; + Ok(handle.receive_into(event_buf) * mem::size_of::()) } fn fsync(&self, id: usize) -> Result { diff --git a/scheme/irq.rs b/scheme/irq.rs index 62fcb1a..2ed891e 100644 --- a/scheme/irq.rs +++ b/scheme/irq.rs @@ -1,21 +1,54 @@ use core::{mem, str}; +use core::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; +use spin::{Mutex, Once}; -use arch::interrupt::irq::{ACKS, COUNTS, acknowledge}; +use arch::interrupt::irq::acknowledge; +use context; +use sync::WaitCondition; use syscall::error::*; +use syscall::flag::EVENT_READ; use syscall::scheme::Scheme; +pub static IRQ_SCHEME_ID: AtomicUsize = ATOMIC_USIZE_INIT; + +/// IRQ queues +static ACKS: Mutex<[usize; 16]> = Mutex::new([0; 16]); +static COUNTS: Mutex<[usize; 16]> = Mutex::new([0; 16]); +static WAITS: Once<[WaitCondition; 16]> = Once::new(); + +fn init_waits() -> [WaitCondition; 16] { + [ + WaitCondition::new(), WaitCondition::new(), WaitCondition::new(), WaitCondition::new(), + WaitCondition::new(), WaitCondition::new(), WaitCondition::new(), WaitCondition::new(), + WaitCondition::new(), WaitCondition::new(), WaitCondition::new(), WaitCondition::new(), + WaitCondition::new(), WaitCondition::new(), WaitCondition::new(), WaitCondition::new() + ] +} + +/// Add to the input queue +#[no_mangle] +pub extern fn irq_trigger(irq: u8) { + COUNTS.lock()[irq as usize] += 1; + WAITS.call_once(init_waits)[irq as usize].notify(); + context::event::trigger(IRQ_SCHEME_ID.load(Ordering::SeqCst), irq as usize, EVENT_READ, mem::size_of::()); +} + pub struct IrqScheme; impl Scheme for IrqScheme { - fn open(&self, path: &[u8], _flags: usize, _uid: u32, _gid: u32) -> Result { - let path_str = str::from_utf8(path).or(Err(Error::new(ENOENT)))?; + fn open(&self, path: &[u8], _flags: usize, uid: u32, _gid: u32) -> Result { + if uid == 0 { + let path_str = str::from_utf8(path).or(Err(Error::new(ENOENT)))?; - let id = path_str.parse::().or(Err(Error::new(ENOENT)))?; + let id = path_str.parse::().or(Err(Error::new(ENOENT)))?; - if id < COUNTS.lock().len() { - Ok(id) + if id < COUNTS.lock().len() { + Ok(id) + } else { + Err(Error::new(ENOENT)) + } } else { - Err(Error::new(ENOENT)) + Err(Error::new(EACCES)) } } @@ -26,15 +59,17 @@ impl Scheme for IrqScheme { fn read(&self, file: usize, buffer: &mut [u8]) -> Result { // Ensures that the length of the buffer is larger than the size of a usize if buffer.len() >= mem::size_of::() { - let ack = ACKS.lock()[file]; - let current = COUNTS.lock()[file]; - if ack != current { - // Safe if the length of the buffer is larger than the size of a usize - assert!(buffer.len() >= mem::size_of::()); - unsafe { *(buffer.as_mut_ptr() as *mut usize) = current; } - Ok(mem::size_of::()) - } else { - Ok(0) + loop { + let ack = ACKS.lock()[file]; + let current = COUNTS.lock()[file]; + if ack != current { + // Safe if the length of the buffer is larger than the size of a usize + assert!(buffer.len() >= mem::size_of::()); + unsafe { *(buffer.as_mut_ptr() as *mut usize) = current; } + return Ok(mem::size_of::()); + } else { + WAITS.call_once(init_waits)[file].wait(); + } } } else { Err(Error::new(EINVAL)) @@ -58,6 +93,10 @@ impl Scheme for IrqScheme { } } + fn fevent(&self, _file: usize, _flags: usize) -> Result { + Ok(0) + } + fn fsync(&self, _file: usize) -> Result { Ok(0) } diff --git a/scheme/mod.rs b/scheme/mod.rs index d64d03f..1d61491 100644 --- a/scheme/mod.rs +++ b/scheme/mod.rs @@ -19,9 +19,9 @@ use self::debug::{DEBUG_SCHEME_ID, DebugScheme}; use self::event::EventScheme; use self::env::EnvScheme; use self::initfs::InitFsScheme; -use self::irq::IrqScheme; +use self::irq::{IRQ_SCHEME_ID, IrqScheme}; use self::pipe::{PIPE_SCHEME_ID, PipeScheme}; -use self::root::RootScheme; +use self::root::{ROOT_SCHEME_ID, RootScheme}; /// Debug scheme pub mod debug; @@ -114,12 +114,12 @@ static SCHEMES: Once> = Once::new(); /// Initialize schemes, called if needed fn init_schemes() -> RwLock { let mut list: SchemeList = SchemeList::new(); - list.insert(Box::new(*b""), Arc::new(Box::new(RootScheme::new()))).expect("failed to insert root scheme"); + ROOT_SCHEME_ID.store(list.insert(Box::new(*b""), Arc::new(Box::new(RootScheme::new()))).expect("failed to insert root scheme"), Ordering::SeqCst); DEBUG_SCHEME_ID.store(list.insert(Box::new(*b"debug"), Arc::new(Box::new(DebugScheme))).expect("failed to insert debug scheme"), Ordering::SeqCst); list.insert(Box::new(*b"event"), Arc::new(Box::new(EventScheme::new()))).expect("failed to insert event scheme"); list.insert(Box::new(*b"env"), Arc::new(Box::new(EnvScheme::new()))).expect("failed to insert env scheme"); list.insert(Box::new(*b"initfs"), Arc::new(Box::new(InitFsScheme::new()))).expect("failed to insert initfs scheme"); - list.insert(Box::new(*b"irq"), Arc::new(Box::new(IrqScheme))).expect("failed to insert irq scheme"); + IRQ_SCHEME_ID.store(list.insert(Box::new(*b"irq"), Arc::new(Box::new(IrqScheme))).expect("failed to insert irq scheme"), Ordering::SeqCst); PIPE_SCHEME_ID.store(list.insert(Box::new(*b"pipe"), Arc::new(Box::new(PipeScheme))).expect("failed to insert pipe scheme"), Ordering::SeqCst); RwLock::new(list) } diff --git a/scheme/pipe.rs b/scheme/pipe.rs index f9e680d..a17be8f 100644 --- a/scheme/pipe.rs +++ b/scheme/pipe.rs @@ -1,8 +1,9 @@ use alloc::arc::{Arc, Weak}; -use collections::{BTreeMap, VecDeque}; +use collections::BTreeMap; use core::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; -use spin::{Mutex, Once, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use spin::{Once, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use sync::WaitQueue; use syscall::error::{Error, Result, EBADF, EPIPE}; use syscall::scheme::Scheme; @@ -103,43 +104,21 @@ impl Scheme for PipeScheme { /// Read side of a pipe #[derive(Clone)] pub struct PipeRead { - vec: Arc>> + vec: Arc> } impl PipeRead { pub fn new() -> Self { PipeRead { - vec: Arc::new(Mutex::new(VecDeque::new())) + vec: Arc::new(WaitQueue::new()) } } fn read(&self, buf: &mut [u8]) -> Result { - if buf.is_empty() || (Arc::weak_count(&self.vec) == 0 && self.vec.lock().is_empty()) { + if buf.is_empty() || (Arc::weak_count(&self.vec) == 0 && self.vec.is_empty()) { Ok(0) } else { - /*loop { - { - if let Some(byte) = self.vec.lock().pop_front() { - buf[0] = byte; - break; - } - } - unsafe { context::switch(); } - }*/ - - let mut i = 0; - - while i < buf.len() { - match self.vec.lock().pop_front() { - Some(b) => { - buf[i] = b; - i += 1; - }, - None => break - } - } - - Ok(i) + Ok(self.vec.receive_into(buf)) } } } @@ -147,7 +126,7 @@ impl PipeRead { /// Read side of a pipe #[derive(Clone)] pub struct PipeWrite { - vec: Weak>>, + vec: Weak>, } impl PipeWrite { @@ -160,9 +139,7 @@ impl PipeWrite { fn write(&self, buf: &[u8]) -> Result { match self.vec.upgrade() { Some(vec) => { - for &b in buf.iter() { - vec.lock().push_back(b); - } + vec.send_from(buf); Ok(buf.len()) }, diff --git a/scheme/root.rs b/scheme/root.rs index c088d6e..1d4d4d0 100644 --- a/scheme/root.rs +++ b/scheme/root.rs @@ -1,7 +1,7 @@ use alloc::arc::Arc; use alloc::boxed::Box; use collections::BTreeMap; -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; use spin::RwLock; use context; @@ -10,6 +10,8 @@ use syscall::scheme::Scheme; use scheme; use scheme::user::{UserInner, UserScheme}; +pub static ROOT_SCHEME_ID: AtomicUsize = ATOMIC_USIZE_INIT; + pub struct RootScheme { next_id: AtomicUsize, handles: RwLock>> @@ -25,28 +27,33 @@ impl RootScheme { } impl Scheme for RootScheme { - fn open(&self, path: &[u8], _flags: usize, _uid: u32, _gid: u32) -> Result { - let context = { - let contexts = context::contexts(); - let context = contexts.current().ok_or(Error::new(ESRCH))?; - Arc::downgrade(&context) - }; + fn open(&self, path: &[u8], _flags: usize, uid: u32, _gid: u32) -> Result { + if uid == 0 { + let context = { + let contexts = context::contexts(); + let context = contexts.current().ok_or(Error::new(ESRCH))?; + Arc::downgrade(&context) + }; - let inner = { - let mut schemes = scheme::schemes_mut(); - if schemes.get_name(path).is_some() { - return Err(Error::new(EEXIST)); - } - let inner = Arc::new(UserInner::new(context)); - let id = schemes.insert(path.to_vec().into_boxed_slice(), Arc::new(Box::new(UserScheme::new(Arc::downgrade(&inner))))).expect("failed to insert user scheme"); - inner.scheme_id.store(id, Ordering::SeqCst); - inner - }; + let id = self.next_id.fetch_add(1, Ordering::SeqCst); - let id = self.next_id.fetch_add(1, Ordering::SeqCst); - self.handles.write().insert(id, inner); + let inner = { + let mut schemes = scheme::schemes_mut(); + if schemes.get_name(path).is_some() { + return Err(Error::new(EEXIST)); + } + let inner = Arc::new(UserInner::new(id, context)); + let scheme_id = schemes.insert(path.to_vec().into_boxed_slice(), Arc::new(Box::new(UserScheme::new(Arc::downgrade(&inner))))).expect("failed to insert user scheme"); + inner.scheme_id.store(scheme_id, Ordering::SeqCst); + inner + }; - Ok(id) + self.handles.write().insert(id, inner); + + Ok(id) + } else { + Err(Error::new(EACCES)) + } } fn dup(&self, file: usize) -> Result { @@ -82,6 +89,10 @@ impl Scheme for RootScheme { inner.write(buf) } + fn fevent(&self, _file: usize, _flags: usize) -> Result { + Ok(0) + } + fn fsync(&self, _file: usize) -> Result { Ok(0) } diff --git a/scheme/user.rs b/scheme/user.rs index 6bc25fd..fbd2726 100644 --- a/scheme/user.rs +++ b/scheme/user.rs @@ -1,35 +1,39 @@ use alloc::arc::Weak; -use collections::{BTreeMap, VecDeque}; use core::sync::atomic::{AtomicUsize, AtomicU64, Ordering}; -use core::{mem, usize}; -use spin::{Mutex, RwLock}; +use core::{mem, slice, usize}; +use spin::RwLock; use arch; use arch::paging::{InactivePageTable, Page, VirtualAddress, entry}; use arch::paging::temporary_page::TemporaryPage; use context::{self, Context}; use context::memory::Grant; +use scheme::root::ROOT_SCHEME_ID; +use sync::{WaitQueue, WaitMap}; use syscall::data::{Packet, Stat}; use syscall::error::*; +use syscall::flag::EVENT_READ; use syscall::number::*; use syscall::scheme::Scheme; pub struct UserInner { + handle_id: usize, pub scheme_id: AtomicUsize, next_id: AtomicU64, context: Weak>, - todo: Mutex>, - done: Mutex> + todo: WaitQueue, + done: WaitMap } impl UserInner { - pub fn new(context: Weak>) -> UserInner { + pub fn new(handle_id: usize, context: Weak>) -> UserInner { UserInner { + handle_id: handle_id, scheme_id: AtomicUsize::new(0), next_id: AtomicU64::new(1), context: context, - todo: Mutex::new(VecDeque::new()), - done: Mutex::new(BTreeMap::new()) + todo: WaitQueue::new(), + done: WaitMap::new() } } @@ -54,18 +58,10 @@ impl UserInner { d: d }; - self.todo.lock().push_back(packet); + let len = self.todo.send(packet); + context::event::trigger(ROOT_SCHEME_ID.load(Ordering::SeqCst), self.handle_id, EVENT_READ, len * mem::size_of::()); - loop { - { - let mut done = self.done.lock(); - if let Some(a) = done.remove(&id) { - return Error::demux(a); - } - } - - unsafe { context::switch(); } //TODO: Block - } + Error::demux(self.done.receive(&id)) } pub fn capture(&self, buf: &[u8]) -> Result { @@ -158,29 +154,8 @@ impl UserInner { } pub fn read(&self, buf: &mut [u8]) -> Result { - let packet_size = mem::size_of::(); - let len = buf.len()/packet_size; - if len > 0 { - loop { - let mut i = 0; - { - let mut todo = self.todo.lock(); - while ! todo.is_empty() && i < len { - let packet = todo.pop_front().unwrap(); - unsafe { *(buf.as_mut_ptr() as *mut Packet).offset(i as isize) = packet; } - i += 1; - } - } - - if i > 0 { - return Ok(i * packet_size); - } else { - unsafe { context::switch(); } //TODO: Block - } - } - } else { - Ok(0) - } + let packet_buf = unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr() as *mut Packet, buf.len()/mem::size_of::()) }; + Ok(self.todo.receive_into(packet_buf) * mem::size_of::()) } pub fn write(&self, buf: &[u8]) -> Result { @@ -195,7 +170,7 @@ impl UserInner { _ => println!("Unknown scheme -> kernel message {}", packet.a) } } else { - self.done.lock().insert(packet.id, packet.a); + self.done.send(packet.id, packet.a); } i += 1; } diff --git a/sync/mod.rs b/sync/mod.rs new file mode 100644 index 0000000..833925b --- /dev/null +++ b/sync/mod.rs @@ -0,0 +1,7 @@ +pub use self::wait_condition::WaitCondition; +pub use self::wait_queue::WaitQueue; +pub use self::wait_map::WaitMap; + +pub mod wait_condition; +pub mod wait_queue; +pub mod wait_map; diff --git a/sync/wait_condition.rs b/sync/wait_condition.rs new file mode 100644 index 0000000..997b458 --- /dev/null +++ b/sync/wait_condition.rs @@ -0,0 +1,49 @@ +use alloc::arc::Arc; +use collections::Vec; +use core::mem; +use spin::{Mutex, RwLock}; + +use context::{self, Context}; + +#[derive(Debug)] +pub struct WaitCondition { + contexts: Mutex>>> +} + +impl WaitCondition { + pub fn new() -> WaitCondition { + WaitCondition { + contexts: Mutex::new(Vec::new()) + } + } + + pub fn notify(&self) -> usize { + let mut contexts = Vec::new(); + mem::swap(&mut *self.contexts.lock(), &mut contexts); + for context_lock in contexts.iter() { + context_lock.write().unblock(); + } + contexts.len() + } + + pub fn wait(&self) { + { + let context_lock = { + let contexts = context::contexts(); + let context_lock = contexts.current().expect("WaitCondition::wait: no context"); + context_lock.clone() + }; + + context_lock.write().block(); + + self.contexts.lock().push(context_lock); + } + unsafe { context::switch(); } + } +} + +impl Drop for WaitCondition { + fn drop(&mut self){ + self.notify(); + } +} diff --git a/sync/wait_map.rs b/sync/wait_map.rs new file mode 100644 index 0000000..468ad8c --- /dev/null +++ b/sync/wait_map.rs @@ -0,0 +1,33 @@ +use collections::BTreeMap; +use spin::Mutex; + +use sync::WaitCondition; + +#[derive(Debug)] +pub struct WaitMap { + inner: Mutex>, + condition: WaitCondition +} + +impl WaitMap where K: Ord { + pub fn new() -> WaitMap { + WaitMap { + inner: Mutex::new(BTreeMap::new()), + condition: WaitCondition::new() + } + } + + pub fn send(&self, key: K, value: V) { + self.inner.lock().insert(key, value); + self.condition.notify(); + } + + pub fn receive(&self, key: &K) -> V { + loop { + if let Some(value) = self.inner.lock().remove(key) { + return value; + } + self.condition.wait(); + } + } +} diff --git a/sync/wait_queue.rs b/sync/wait_queue.rs new file mode 100644 index 0000000..445164c --- /dev/null +++ b/sync/wait_queue.rs @@ -0,0 +1,95 @@ +use collections::vec_deque::VecDeque; +use core::mem; +use core::ops::DerefMut; +use spin::Mutex; + +use sync::WaitCondition; + +#[derive(Debug)] +pub struct WaitQueue { + pub inner: Mutex>, + pub condition: WaitCondition, +} + +impl WaitQueue { + pub fn new() -> WaitQueue { + WaitQueue { + inner: Mutex::new(VecDeque::new()), + condition: WaitCondition::new() + } + } + + pub fn clone(&self) -> WaitQueue where T: Clone { + WaitQueue { + inner: Mutex::new(self.inner.lock().clone()), + condition: WaitCondition::new() + } + } + + pub fn is_empty(&self) -> bool { + self.inner.lock().is_empty() + } + + pub fn receive(&self) -> T { + loop { + if let Some(value) = self.inner.lock().pop_front() { + return value; + } + self.condition.wait(); + } + } + + pub fn receive_into(&self, buf: &mut [T]) -> usize { + let mut i = 0; + + if i < buf.len() { + buf[i] = self.receive(); + i += 1; + } + + while i < buf.len() { + if let Some(value) = self.inner.lock().pop_front() { + buf[i] = value; + i += 1; + } else { + break; + } + } + + i + } + + pub fn receive_all(&self) -> VecDeque { + loop { + { + let mut inner = self.inner.lock(); + if ! inner.is_empty() { + let mut swap_inner = VecDeque::new(); + mem::swap(inner.deref_mut(), &mut swap_inner); + return swap_inner; + } + } + self.condition.wait(); + } + } + + pub fn send(&self, value: T) -> usize { + let len = { + let mut inner = self.inner.lock(); + inner.push_back(value); + inner.len() + }; + self.condition.notify(); + len + } + + pub fn send_from(&self, buf: &[T]) -> usize where T: Copy { + let len = { + let mut inner = self.inner.lock(); + inner.extend(buf.iter()); + inner.len() + }; + self.condition.notify(); + len + } +} diff --git a/syscall/futex.rs b/syscall/futex.rs new file mode 100644 index 0000000..4b9c30c --- /dev/null +++ b/syscall/futex.rs @@ -0,0 +1,110 @@ +use alloc::arc::Arc; +use collections::VecDeque; +use core::intrinsics; +use spin::{Once, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +use context::{self, Context}; +use syscall::error::{Error, Result, ESRCH, EAGAIN, EINVAL}; +use syscall::flag::{FUTEX_WAIT, FUTEX_WAKE, FUTEX_REQUEUE}; +use syscall::validate::validate_slice_mut; + +type FutexList = VecDeque<(usize, Arc>)>; + +/// Fast userspace mutex list +static FUTEXES: Once> = Once::new(); + +/// Initialize futexes, called if needed +fn init_futexes() -> RwLock { + RwLock::new(VecDeque::new()) +} + +/// Get the global futexes list, const +pub fn futexes() -> RwLockReadGuard<'static, FutexList> { + FUTEXES.call_once(init_futexes).read() +} + +/// Get the global futexes list, mutable +pub fn futexes_mut() -> RwLockWriteGuard<'static, FutexList> { + FUTEXES.call_once(init_futexes).write() +} + +pub fn futex(addr: &mut i32, op: usize, val: i32, val2: usize, addr2: *mut i32) -> Result { + match op { + FUTEX_WAIT => { + { + let mut futexes = futexes_mut(); + + let context_lock = { + let contexts = context::contexts(); + let context_lock = contexts.current().ok_or(Error::new(ESRCH))?; + context_lock.clone() + }; + + if unsafe { intrinsics::atomic_load(addr) != val } { + return Err(Error::new(EAGAIN)); + } + + context_lock.write().block(); + + futexes.push_back((addr as *mut i32 as usize, context_lock)); + } + + unsafe { context::switch(); } + + Ok(0) + }, + FUTEX_WAKE => { + let mut woken = 0; + + { + let mut futexes = futexes_mut(); + + let mut i = 0; + while i < futexes.len() && (woken as i32) < val { + if futexes[i].0 == addr as *mut i32 as usize { + if let Some(futex) = futexes.swap_remove_back(i) { + futex.1.write().unblock(); + woken += 1; + } + } else { + i += 1; + } + } + } + + Ok(woken) + }, + FUTEX_REQUEUE => { + let addr2_safe = validate_slice_mut(addr2, 1).map(|addr2_safe| &mut addr2_safe[0])?; + + let mut woken = 0; + let mut requeued = 0; + + { + let mut futexes = futexes_mut(); + + let mut i = 0; + while i < futexes.len() && (woken as i32) < val { + if futexes[i].0 == addr as *mut i32 as usize { + if let Some(futex) = futexes.swap_remove_back(i) { + futex.1.write().unblock(); + woken += 1; + } + } else { + i += 1; + } + } + while i < futexes.len() && requeued < val2 { + if futexes[i].0 == addr as *mut i32 as usize { + futexes[i].0 = addr2_safe as *mut i32 as usize; + requeued += 1; + } + i += 1; + } + } + + Ok(woken) + }, + _ => Err(Error::new(EINVAL)) + } +} diff --git a/syscall/mod.rs b/syscall/mod.rs index 9de8b92..8e1c190 100644 --- a/syscall/mod.rs +++ b/syscall/mod.rs @@ -5,6 +5,7 @@ extern crate syscall; pub use self::syscall::{data, error, flag, number, scheme}; pub use self::fs::*; +pub use self::futex::futex; pub use self::process::*; pub use self::time::*; pub use self::validate::*; @@ -16,6 +17,9 @@ use self::number::*; /// Filesystem syscalls pub mod fs; +/// Fast userspace mutex +pub mod futex; + /// Process syscalls pub mod process; @@ -28,7 +32,7 @@ pub mod validate; #[no_mangle] pub extern fn syscall(a: usize, b: usize, c: usize, d: usize, e: usize, f: usize, stack: usize) -> usize { #[inline(always)] - fn inner(a: usize, b: usize, c: usize, d: usize, e: usize, _f: usize, stack: usize) -> Result { + fn inner(a: usize, b: usize, c: usize, d: usize, e: usize, f: usize, stack: usize) -> Result { match a & SYS_CLASS { SYS_CLASS_FILE => match a & SYS_ARG { SYS_ARG_SLICE => file_op_slice(a, b, validate_slice(c as *const u8, d)?), @@ -66,6 +70,7 @@ pub extern fn syscall(a: usize, b: usize, c: usize, d: usize, e: usize, f: usize SYS_SETUID => setuid(b as u32), SYS_SETGID => setgid(b as u32), SYS_CLOCK_GETTIME => clock_gettime(b, validate_slice_mut(c as *mut TimeSpec, 1).map(|time| &mut time[0])?), + SYS_FUTEX => futex(validate_slice_mut(b as *mut i32, 1).map(|uaddr| &mut uaddr[0])?, c, d as i32, e, f as *mut i32), SYS_PIPE2 => pipe2(validate_slice_mut(b as *mut usize, 2)?, c), SYS_PHYSALLOC => physalloc(b), SYS_PHYSFREE => physfree(b, c), @@ -78,10 +83,10 @@ pub extern fn syscall(a: usize, b: usize, c: usize, d: usize, e: usize, f: usize } let result = inner(a, b, c, d, e, f, stack); - +/* if let Err(ref err) = result { - println!("{}, {}, {}, {}: {}", a & 0xFFFF, b, c, d, err); + println!("{}, {}, {}, {}: {}", a, b, c, d, err); } - +*/ Error::mux(result) } diff --git a/syscall/process.rs b/syscall/process.rs index cdd768f..765edb5 100644 --- a/syscall/process.rs +++ b/syscall/process.rs @@ -2,9 +2,8 @@ use alloc::arc::Arc; use alloc::boxed::Box; use collections::{BTreeMap, Vec}; -use core::mem; +use core::{mem, str}; use core::ops::DerefMut; -use core::str; use spin::Mutex; use arch; @@ -247,7 +246,7 @@ pub fn clone(flags: usize, stack_base: usize) -> Result { let contexts = context::contexts(); let context_lock = contexts.current().ok_or(Error::new(ESRCH))?; let mut context = context_lock.write(); - context.status = context::Status::Blocked; + context.block(); vfork = true; } else { vfork = false; @@ -416,14 +415,13 @@ pub fn exit(status: usize) -> ! { let vfork = context.vfork; context.vfork = false; + context.waitpid.notify(); (vfork, context.ppid) }; if vfork { if let Some(context_lock) = contexts.get(ppid) { let mut context = context_lock.write(); - if context.status == context::Status::Blocked { - context.status = context::Status::Runnable; - } else { + if ! context.unblock() { println!("{} not blocked for exit vfork unblock", ppid); } } else { @@ -622,9 +620,7 @@ pub fn exec(path: &[u8], arg_ptrs: &[[usize; 2]]) -> Result { if vfork { if let Some(context_lock) = contexts.get(ppid) { let mut context = context_lock.write(); - if context.status == context::Status::Blocked { - context.status = context::Status::Runnable; - } else { + if ! context.unblock() { println!("{} not blocked for exec vfork unblock", ppid); } } else { @@ -808,7 +804,7 @@ pub fn waitpid(pid: usize, status_ptr: usize, flags: usize) -> Result { loop { { let mut exited = false; - + let waitpid; { let contexts = context::contexts(); let context_lock = contexts.get(pid).ok_or(Error::new(ESRCH))?; @@ -820,6 +816,7 @@ pub fn waitpid(pid: usize, status_ptr: usize, flags: usize) -> Result { } exited = true; } + waitpid = context.waitpid.clone(); } if exited { @@ -827,6 +824,8 @@ pub fn waitpid(pid: usize, status_ptr: usize, flags: usize) -> Result { return contexts.remove(pid).ok_or(Error::new(ESRCH)).and(Ok(pid)); } else if flags & WNOHANG == WNOHANG { return Ok(0); + } else { + waitpid.wait(); } } diff --git a/syscall/time.rs b/syscall/time.rs index ca05773..448f311 100644 --- a/syscall/time.rs +++ b/syscall/time.rs @@ -26,17 +26,20 @@ pub fn nanosleep(req: &TimeSpec, rem_opt: Option<&mut TimeSpec>) -> Result end.0 || (current.0 == end.0 && current.1 >= end.1) { - break; - } + { + let contexts = context::contexts(); + let context_lock = contexts.current().ok_or(Error::new(ESRCH))?; + let mut context = context_lock.write(); + + context.wake = Some(end); + context.block(); } + unsafe { context::switch(); } + if let Some(mut rem) = rem_opt { + //TODO let current = arch::time::monotonic(); rem.tv_sec = 0; rem.tv_nsec = 0; }