diff --git a/Cargo.toml b/Cargo.toml index 4632b2e..26341cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,16 +15,20 @@ categories = ["asynchronous", "network-programming", "os"] exclude = ["/.*"] [dependencies] +cfg-if = "1" concurrent-queue = "1.2.2" futures-lite = "1.11.0" log = "0.4.11" once_cell = "1.4.1" parking = "2.0.0" -polling = "2.0.0" +polling = { git = "https://github.com/smol-rs/polling" } slab = "0.4.2" socket2 = { version = "0.4.2", features = ["all"] } waker-fn = "1.1.0" +[target.'cfg(target_os = "linux")'.dependencies] +io-uring = { version = "=0.5.3", features = ["unstable"] } + [build-dependencies] autocfg = "1" diff --git a/src/lib.rs b/src/lib.rs index de6d526..fd9e6e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1012,17 +1012,14 @@ impl Drop for Async { impl AsyncRead for Async { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - loop { - match (&mut *self).get_mut().read(buf) { - Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} - res => return Poll::Ready(res), - } - ready!(self.poll_readable(cx))?; - } + let this = self.get_mut(); + + // Manually access the fields in order to keep the borrow checker happy. + Reactor::get().poll_read(this.io.as_mut().unwrap(), &this.source, buf, cx) } fn poll_read_vectored( @@ -1049,13 +1046,7 @@ where cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - loop { - match (*self).get_ref().read(buf) { - Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} - res => return Poll::Ready(res), - } - ready!(self.poll_readable(cx))?; - } + Reactor::get().poll_read(&mut self.get_ref(), &self.source, buf, cx) } fn poll_read_vectored( @@ -1075,17 +1066,12 @@ where impl AsyncWrite for Async { fn poll_write( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - loop { - match (&mut *self).get_mut().write(buf) { - Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} - res => return Poll::Ready(res), - } - ready!(self.poll_writable(cx))?; - } + let this = self.get_mut(); + Reactor::get().poll_write(this.io.as_mut().unwrap(), &this.source, buf, cx) } fn poll_write_vectored( @@ -1126,13 +1112,7 @@ where cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - loop { - match (*self).get_ref().write(buf) { - Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} - res => return Poll::Ready(res), - } - ready!(self.poll_writable(cx))?; - } + Reactor::get().poll_write(&mut self.get_ref(), &self.source, buf, cx) } fn poll_write_vectored( diff --git a/src/reactor.rs b/src/reactor/mod.rs similarity index 54% rename from src/reactor.rs rename to src/reactor/mod.rs index b6ae153..e5928fe 100644 --- a/src/reactor.rs +++ b/src/reactor/mod.rs @@ -1,89 +1,60 @@ use std::borrow::Borrow; -use std::collections::BTreeMap; +use std::cell::UnsafeCell; use std::fmt; use std::future::Future; -use std::io; +use std::io::{self, Read, Write}; use std::marker::PhantomData; -use std::mem; +use std::mem::MaybeUninit; #[cfg(unix)] use std::os::unix::io::RawFd; #[cfg(windows)] use std::os::windows::io::RawSocket; use std::panic; use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; use std::time::{Duration, Instant}; -use concurrent_queue::ConcurrentQueue; use futures_lite::ready; use once_cell::sync::Lazy; -use polling::{Event, Poller}; use slab::Slab; +mod poll; + +cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + mod uring; + use uring as imp; + } else { + use poll as imp; + } +} + const READ: usize = 0; const WRITE: usize = 1; +const MAX_OPERATIONS: usize = 2; +const READ_OP: usize = 0; +const WRITE_OP: usize = 1; + /// The reactor. /// /// There is only one global instance of this type, accessible by [`Reactor::get()`]. -pub(crate) struct Reactor { - /// Portable bindings to epoll/kqueue/event ports/wepoll. - /// - /// This is where I/O is polled, producing I/O events. - poller: Poller, - - /// Ticker bumped before polling. - /// - /// This is useful for checking what is the current "round" of `ReactorLock::react()` when - /// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those - /// methods must make sure they don't receive stale I/O events - they only accept events from a - /// fresh "round" of `ReactorLock::react()`. - ticker: AtomicUsize, - - /// Registered sources. - sources: Mutex>>, - - /// Temporary storage for I/O events when polling the reactor. - /// - /// Holding a lock on this event list implies the exclusive right to poll I/O. - events: Mutex>, - - /// An ordered map of registered timers. - /// - /// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to - /// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the - /// timer. - timers: Mutex>, - - /// A queue of timer operations (insert and remove). - /// - /// When inserting or removing a timer, we don't process it immediately - we just push it into - /// this queue. Timers actually get processed when the queue fills up or the reactor is polled. - timer_ops: ConcurrentQueue, -} +pub(crate) struct Reactor(imp::Reactor); impl Reactor { /// Returns a reference to the reactor. pub(crate) fn get() -> &'static Reactor { static REACTOR: Lazy = Lazy::new(|| { crate::driver::init(); - Reactor { - poller: Poller::new().expect("cannot initialize I/O event notification"), - ticker: AtomicUsize::new(0), - sources: Mutex::new(Slab::new()), - events: Mutex::new(Vec::new()), - timers: Mutex::new(BTreeMap::new()), - timer_ops: ConcurrentQueue::bounded(1000), - } + Reactor(imp::Reactor::new()) }); &REACTOR } /// Returns the current ticker. pub(crate) fn ticker(&self) -> usize { - self.ticker.load(Ordering::SeqCst) + self.0.ticker() } /// Registers an I/O source in the reactor. @@ -92,242 +63,84 @@ impl Reactor { #[cfg(unix)] raw: RawFd, #[cfg(windows)] raw: RawSocket, ) -> io::Result> { - // Create an I/O source for this file descriptor. - let source = { - let mut sources = self.sources.lock().unwrap(); - let key = sources.vacant_entry().key(); - let source = Arc::new(Source { - raw, - key, - state: Default::default(), - }); - sources.insert(source.clone()); - source - }; - - // Register the file descriptor. - if let Err(err) = self.poller.add(raw, Event::none(source.key)) { - let mut sources = self.sources.lock().unwrap(); - sources.remove(source.key); - return Err(err); - } - - Ok(source) + self.0.insert_io(raw) } /// Deregisters an I/O source from the reactor. pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> { - let mut sources = self.sources.lock().unwrap(); - sources.remove(source.key); - self.poller.delete(source.raw) + self.0.remove_io(source) } /// Registers a timer in the reactor. /// /// Returns the inserted timer's ID. pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize { - // Generate a new timer ID. - static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1); - let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed); - - // Push an insert operation. - while self - .timer_ops - .push(TimerOp::Insert(when, id, waker.clone())) - .is_err() - { - // If the queue is full, drain it and try again. - let mut timers = self.timers.lock().unwrap(); - self.process_timer_ops(&mut timers); - } - - // Notify that a timer has been inserted. - self.notify(); - - id + self.0.insert_timer(when, waker) } /// Deregisters a timer from the reactor. pub(crate) fn remove_timer(&self, when: Instant, id: usize) { - // Push a remove operation. - while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() { - // If the queue is full, drain it and try again. - let mut timers = self.timers.lock().unwrap(); - self.process_timer_ops(&mut timers); - } + self.0.remove_timer(when, id) } /// Notifies the thread blocked on the reactor. pub(crate) fn notify(&self) { - self.poller.notify().expect("failed to notify reactor"); + self.0.notify() + } + + /// Try to read from the given source. + /// + /// If the read is not successful, it is registered in the + /// reactor. The source is then notified when the read is + /// successful. + pub(crate) fn poll_read( + &self, + readable: &mut impl Read, + source: &Source, + buf: &mut [u8], + cx: &mut Context<'_>, + ) -> Poll> { + self.0.poll_read(readable, source, buf, cx) + } + + /// Try to write to the given source. + /// + /// If the write is not successful, it is registered in the + /// reactor. In certain cases this can take advantage of + /// completion-based APIs in order to be faster than normal. + pub(crate) fn poll_write( + &self, + writable: &mut impl Write, + source: &Source, + buf: &[u8], + cx: &mut Context<'_>, + ) -> Poll> { + self.0.poll_write(writable, source, buf, cx) } /// Locks the reactor, potentially blocking if the lock is held by another thread. pub(crate) fn lock(&self) -> ReactorLock<'_> { - let reactor = self; - let events = self.events.lock().unwrap(); - ReactorLock { reactor, events } + ReactorLock(self.0.lock()) } /// Attempts to lock the reactor. pub(crate) fn try_lock(&self) -> Option> { - self.events.try_lock().ok().map(|events| { - let reactor = self; - ReactorLock { reactor, events } - }) + self.0.try_lock().map(ReactorLock) } - /// Processes ready timers and extends the list of wakers to wake. - /// - /// Returns the duration until the next timer before this method was called. - fn process_timers(&self, wakers: &mut Vec) -> Option { - let mut timers = self.timers.lock().unwrap(); - self.process_timer_ops(&mut timers); - - let now = Instant::now(); - - // Split timers into ready and pending timers. - // - // Careful to split just *after* `now`, so that a timer set for exactly `now` is considered - // ready. - let pending = timers.split_off(&(now + Duration::from_nanos(1), 0)); - let ready = mem::replace(&mut *timers, pending); - - // Calculate the duration until the next event. - let dur = if ready.is_empty() { - // Duration until the next timer. - timers - .keys() - .next() - .map(|(when, _)| when.saturating_duration_since(now)) - } else { - // Timers are about to fire right now. - Some(Duration::from_secs(0)) - }; - - // Drop the lock before waking. - drop(timers); - - // Add wakers to the list. - log::trace!("process_timers: {} ready wakers", ready.len()); - for (_, waker) in ready { - wakers.push(waker); - } - - dur - } - - /// Processes queued timer operations. - fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) { - // Process only as much as fits into the queue, or else this loop could in theory run - // forever. - for _ in 0..self.timer_ops.capacity().unwrap() { - match self.timer_ops.pop() { - Ok(TimerOp::Insert(when, id, waker)) => { - timers.insert((when, id), waker); - } - Ok(TimerOp::Remove(when, id)) => { - timers.remove(&(when, id)); - } - Err(_) => break, - } - } + /// Get the `Poller` backing this reactor. + pub(crate) fn poller(&self) -> &polling::Poller { + self.0.poller() } } /// A lock on the reactor. -pub(crate) struct ReactorLock<'a> { - reactor: &'a Reactor, - events: MutexGuard<'a, Vec>, -} +pub(crate) struct ReactorLock<'a>(imp::ReactorLock<'a>); impl ReactorLock<'_> { /// Processes new events, blocking until the first event or the timeout. pub(crate) fn react(&mut self, timeout: Option) -> io::Result<()> { - let mut wakers = Vec::new(); - - // Process ready timers. - let next_timer = self.reactor.process_timers(&mut wakers); - - // compute the timeout for blocking on I/O events. - let timeout = match (next_timer, timeout) { - (None, None) => None, - (Some(t), None) | (None, Some(t)) => Some(t), - (Some(a), Some(b)) => Some(a.min(b)), - }; - - // Bump the ticker before polling I/O. - let tick = self - .reactor - .ticker - .fetch_add(1, Ordering::SeqCst) - .wrapping_add(1); - - self.events.clear(); - - // Block on I/O events. - let res = match self.reactor.poller.wait(&mut self.events, timeout) { - // No I/O events occurred. - Ok(0) => { - if timeout != Some(Duration::from_secs(0)) { - // The non-zero timeout was hit so fire ready timers. - self.reactor.process_timers(&mut wakers); - } - Ok(()) - } - - // At least one I/O event occurred. - Ok(_) => { - // Iterate over sources in the event list. - let sources = self.reactor.sources.lock().unwrap(); - - for ev in self.events.iter() { - // Check if there is a source in the table with this key. - if let Some(source) = sources.get(ev.key) { - let mut state = source.state.lock().unwrap(); - - // Collect wakers if a writability event was emitted. - for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] { - if emitted { - state[dir].tick = tick; - state[dir].drain_into(&mut wakers); - } - } - - // Re-register if there are still writers or readers. The can happen if - // e.g. we were previously interested in both readability and writability, - // but only one of them was emitted. - if !state[READ].is_empty() || !state[WRITE].is_empty() { - self.reactor.poller.modify( - source.raw, - Event { - key: source.key, - readable: !state[READ].is_empty(), - writable: !state[WRITE].is_empty(), - }, - )?; - } - } - } - - Ok(()) - } - - // The syscall was interrupted. - Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()), - - // An actual error occureed. - Err(err) => Err(err), - }; - - // Wake up ready tasks. - log::trace!("react: {} ready wakers", wakers.len()); - for waker in wakers { - // Don't let a panicking waker blow everything up. - panic::catch_unwind(|| waker.wake()).ok(); - } - - res + self.0.react(timeout) } } @@ -352,7 +165,16 @@ pub(crate) struct Source { key: usize, /// Inner state with registered wakers. - state: Mutex<[Direction; 2]>, + state: Mutex, +} + +/// The inner state of a `Source`, with registered wakers. +#[derive(Debug, Default)] +struct State { + /// The directions used for polling read/write readiness. + readiness: [Direction; 2], + /// The ongoing operations for completion-based I/O. + operations: [Operation; MAX_OPERATIONS], } /// A read or write direction. @@ -392,6 +214,45 @@ impl Direction { } } +/// An operation for completion-based I/O. +#[derive(Debug, Default)] +struct Operation { + /// The current status of the operation. + status: OperationStatus, + /// The waker registered for this operation. + waker: Option, + /// The buffer for this operation. + /// + /// Once the completion operation begins, logically this buffer + /// is no longer owned by the `Operation` but by the system API + /// (e.g. `io_uring`). In order to assert to the type system this + /// condition, we use an `UnsafeCell` to hold the buffer. + /// + /// # Invariants + /// + /// When `status` is `Pending`, this buffer is "owned" + /// by the system, and it is unsound to access it. + /// When `status` is something else, it can be accessed safely. + buffer: UnsafeCell>>, +} + +/// The status of an `Operation`. +#[derive(Debug)] +enum OperationStatus { + /// The operation has not yet begun. + NotStarted, + /// The operation is currently running. + Pending, + /// The operation has completed with the given result. + Complete(isize), +} + +impl Default for OperationStatus { + fn default() -> Self { + OperationStatus::NotStarted + } +} + impl Source { /// Polls the I/O source for readability. pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll> { @@ -407,7 +268,8 @@ impl Source { /// /// If a different waker is already registered, it gets replaced and woken. fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll> { - let mut state = self.state.lock().unwrap(); + let mut lock = self.state.lock().unwrap(); + let state = &mut lock.readiness; // Check if the reactor has delivered an event. if let Some((a, b)) = state[dir].ticks { @@ -435,9 +297,9 @@ impl Source { // Update interest in this I/O handle. if was_empty { - Reactor::get().poller.modify( + Reactor::get().poller().modify( self.raw, - Event { + polling::Event { key: self.key, readable: !state[READ].is_empty(), writable: !state[WRITE].is_empty(), @@ -583,7 +445,8 @@ impl> + Clone, T> Future for Ready { .. } = &mut *self; - let mut state = handle.borrow().source.state.lock().unwrap(); + let mut lock = handle.borrow().source.state.lock().unwrap(); + let state = &mut lock.readiness; // Check if the reactor has delivered an event. if let Some((a, b)) = *ticks { @@ -616,9 +479,9 @@ impl> + Clone, T> Future for Ready { // Update interest in this I/O handle. if was_empty { - Reactor::get().poller.modify( + Reactor::get().poller().modify( handle.borrow().source.raw, - Event { + polling::Event { key: handle.borrow().source.key, readable: !state[READ].is_empty(), writable: !state[WRITE].is_empty(), @@ -641,7 +504,7 @@ struct RemoveOnDrop>, T> { impl>, T> Drop for RemoveOnDrop { fn drop(&mut self) { let mut state = self.handle.borrow().source.state.lock().unwrap(); - let wakers = &mut state[self.dir].wakers; + let wakers = &mut state.readiness[self.dir].wakers; if wakers.contains(self.key) { wakers.remove(self.key); } diff --git a/src/reactor/poll.rs b/src/reactor/poll.rs new file mode 100644 index 0000000..8da91b6 --- /dev/null +++ b/src/reactor/poll.rs @@ -0,0 +1,386 @@ +use super::{Source, TimerOp}; + +use std::collections::BTreeMap; +use std::io::Read; +use std::io::{self, Write}; +use std::mem; +#[cfg(unix)] +use std::os::unix::io::RawFd; +#[cfg(windows)] +use std::os::windows::io::RawSocket; +use std::panic; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::task::{Context, Poll, Waker}; +use std::time::{Duration, Instant}; + +use concurrent_queue::ConcurrentQueue; +use futures_lite::ready; +use polling::{Event, Poller}; +use slab::Slab; + +const READ: usize = 0; +const WRITE: usize = 1; + +/// A `Reactor` implementation oriented around `polling`. +pub(crate) struct Reactor { + /// Portable bindings to epoll/kqueue/event ports/wepoll. + /// + /// This is where I/O is polled, producing I/O events. + poller: Poller, + + /// Ticker bumped before polling. + /// + /// This is useful for checking what is the current "round" of `ReactorLock::react()` when + /// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those + /// methods must make sure they don't receive stale I/O events - they only accept events from a + /// fresh "round" of `ReactorLock::react()`. + ticker: AtomicUsize, + + /// Registered sources. + pub(super) sources: Mutex>>, + + /// Temporary storage for I/O events when polling the reactor. + /// + /// Holding a lock on this event list implies the exclusive right to poll I/O. + events: Mutex>, + + /// An ordered map of registered timers. + /// + /// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to + /// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the + /// timer. + timers: Mutex>, + + /// A queue of timer operations (insert and remove). + /// + /// When inserting or removing a timer, we don't process it immediately - we just push it into + /// this queue. Timers actually get processed when the queue fills up or the reactor is polled. + timer_ops: ConcurrentQueue, +} + +impl Reactor { + /// Creates a new `Reactor`. + pub(crate) fn new() -> Reactor { + Reactor { + poller: Poller::new().expect("cannot initialize I/O event notification"), + ticker: AtomicUsize::new(0), + sources: Mutex::new(Slab::new()), + events: Mutex::new(Vec::new()), + timers: Mutex::new(BTreeMap::new()), + timer_ops: ConcurrentQueue::bounded(1000), + } + } + + /// Get the `Poller` backing this reactor. + pub(crate) fn poller(&self) -> &Poller { + &self.poller + } + + /// Returns the current ticker. + pub(crate) fn ticker(&self) -> usize { + self.ticker.load(Ordering::SeqCst) + } + + /// Registers an I/O source in the reactor. + pub(crate) fn insert_io( + &self, + #[cfg(unix)] raw: RawFd, + #[cfg(windows)] raw: RawSocket, + ) -> io::Result> { + // Create an I/O source for this file descriptor. + let source = { + let mut sources = self.sources.lock().unwrap(); + let key = sources.vacant_entry().key(); + let source = Arc::new(Source { + raw, + key, + state: Default::default(), + }); + sources.insert(source.clone()); + source + }; + + // Register the file descriptor. + if let Err(err) = self.poller.add(raw, Event::none(source.key)) { + let mut sources = self.sources.lock().unwrap(); + sources.remove(source.key); + return Err(err); + } + + Ok(source) + } + + /// Deregisters an I/O source from the reactor. + pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> { + let mut sources = self.sources.lock().unwrap(); + sources.remove(source.key); + self.poller.delete(source.raw) + } + + /// Registers a timer in the reactor. + /// + /// Returns the inserted timer's ID. + pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize { + // Generate a new timer ID. + static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1); + let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed); + + // Push an insert operation. + while self + .timer_ops + .push(TimerOp::Insert(when, id, waker.clone())) + .is_err() + { + // If the queue is full, drain it and try again. + let mut timers = self.timers.lock().unwrap(); + self.process_timer_ops(&mut timers); + } + + // Notify that a timer has been inserted. + self.notify(); + + id + } + + /// Deregisters a timer from the reactor. + pub(crate) fn remove_timer(&self, when: Instant, id: usize) { + // Push a remove operation. + while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() { + // If the queue is full, drain it and try again. + let mut timers = self.timers.lock().unwrap(); + self.process_timer_ops(&mut timers); + } + } + + /// Notifies the thread blocked on the reactor. + pub(crate) fn notify(&self) { + self.poller.notify().expect("failed to notify reactor"); + } + + /// Try to poll for a `Read` event on the given source. + pub(crate) fn poll_read( + &self, + readable: &mut impl Read, + source: &Source, + buf: &mut [u8], + cx: &mut Context<'_>, + ) -> Poll> { + loop { + match readable.read(buf) { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => {} + res => return Poll::Ready(res), + } + ready!(source.poll_readable(cx))?; + } + } + + /// Try to poll for a `Write` event on the given source. + pub(crate) fn poll_write( + &self, + writable: &mut impl Write, + source: &Source, + buf: &[u8], + cx: &mut Context<'_>, + ) -> Poll> { + loop { + match writable.write(buf) { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => {} + res => return Poll::Ready(res), + } + ready!(source.poll_writable(cx))?; + } + } + + /// Locks the reactor, potentially blocking if the lock is held by another thread. + pub(crate) fn lock(&self) -> ReactorLock<'_> { + let reactor = self; + let events = self.events.lock().unwrap(); + ReactorLock { reactor, events } + } + + /// Attempts to lock the reactor. + pub(crate) fn try_lock(&self) -> Option> { + self.events.try_lock().ok().map(|events| { + let reactor = self; + ReactorLock { reactor, events } + }) + } + + /// Processes ready timers and extends the list of wakers to wake. + /// + /// Returns the duration until the next timer before this method was called. + pub(super) fn process_timers(&self, wakers: &mut Vec) -> Option { + let mut timers = self.timers.lock().unwrap(); + self.process_timer_ops(&mut timers); + + let now = Instant::now(); + + // Split timers into ready and pending timers. + // + // Careful to split just *after* `now`, so that a timer set for exactly `now` is considered + // ready. + let pending = timers.split_off(&(now + Duration::from_nanos(1), 0)); + let ready = mem::replace(&mut *timers, pending); + + // Calculate the duration until the next event. + let dur = if ready.is_empty() { + // Duration until the next timer. + timers + .keys() + .next() + .map(|(when, _)| when.saturating_duration_since(now)) + } else { + // Timers are about to fire right now. + Some(Duration::from_secs(0)) + }; + + // Drop the lock before waking. + drop(timers); + + // Add wakers to the list. + log::trace!("process_timers: {} ready wakers", ready.len()); + for (_, waker) in ready { + wakers.push(waker); + } + + dur + } + + /// Processes queued timer operations. + fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) { + // Process only as much as fits into the queue, or else this loop could in theory run + // forever. + for _ in 0..self.timer_ops.capacity().unwrap() { + match self.timer_ops.pop() { + Ok(TimerOp::Insert(when, id, waker)) => { + timers.insert((when, id), waker); + } + Ok(TimerOp::Remove(when, id)) => { + timers.remove(&(when, id)); + } + Err(_) => break, + } + } + } +} + +/// A lock on the reactor. +pub(crate) struct ReactorLock<'a> { + reactor: &'a Reactor, + events: MutexGuard<'a, Vec>, +} + +impl ReactorLock<'_> { + /// Processes new events, blocking until the first event or the timeout. + pub(crate) fn react(&mut self, timeout: Option) -> io::Result<()> { + let mut wakers = Vec::new(); + + let (timeout, tick) = self.prepare_for_polling(timeout, &mut wakers); + + // Block on I/O events. + let res = self.pump_events(timeout, tick, &mut wakers); + + // Wake up ready tasks. + log::trace!("react: {} ready wakers", wakers.len()); + for waker in wakers { + // Don't let a panicking waker blow everything up. + panic::catch_unwind(|| waker.wake()).ok(); + } + + res + } + + /// Ready the reactor lock for polling. + /// + /// This code is reused in the `io_uring` implementation. + pub(super) fn prepare_for_polling( + &mut self, + timeout: Option, + wakers: &mut Vec, + ) -> (Option, usize) { + // Process ready timers. + let next_timer = self.reactor.process_timers(wakers); + + // compute the timeout for blocking on I/O events. + let timeout = match (next_timer, timeout) { + (None, None) => None, + (Some(t), None) | (None, Some(t)) => Some(t), + (Some(a), Some(b)) => Some(a.min(b)), + }; + + // Bump the ticker before polling I/O. + let tick = self + .reactor + .ticker + .fetch_add(1, Ordering::SeqCst) + .wrapping_add(1); + + self.events.clear(); + + (timeout, tick) + } + + /// Runs the reactor until we have received an event. + pub(super) fn pump_events( + &mut self, + timeout: Option, + tick: usize, + wakers: &mut Vec, + ) -> io::Result<()> { + match self.reactor.poller.wait(&mut self.events, timeout) { + // No I/O events occurred. + Ok(0) => { + if timeout != Some(Duration::from_secs(0)) { + // The non-zero timeout was hit so fire ready timers. + self.reactor.process_timers(wakers); + } + Ok(()) + } + + // At least one I/O event occurred. + Ok(_) => { + // Iterate over sources in the event list. + let sources = self.reactor.sources.lock().unwrap(); + + for ev in self.events.iter() { + // Check if there is a source in the table with this key. + if let Some(source) = sources.get(ev.key) { + let mut lock = source.state.lock().unwrap(); + let state = &mut lock.readiness; + + // Collect wakers if a writability event was emitted. + for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] { + if emitted { + state[dir].tick = tick; + state[dir].drain_into(wakers); + } + } + + // Re-register if there are still writers or readers. The can happen if + // e.g. we were previously interested in both readability and writability, + // but only one of them was emitted. + if !state[READ].is_empty() || !state[WRITE].is_empty() { + self.reactor.poller.modify( + source.raw, + Event { + key: source.key, + readable: !state[READ].is_empty(), + writable: !state[WRITE].is_empty(), + }, + )?; + } + } + } + + Ok(()) + } + + // The syscall was interrupted. + Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()), + + // An actual error occureed. + Err(err) => Err(err), + } + } +} diff --git a/src/reactor/uring.rs b/src/reactor/uring.rs new file mode 100644 index 0000000..e589c4a --- /dev/null +++ b/src/reactor/uring.rs @@ -0,0 +1,529 @@ +use crate::reactor::OperationStatus; + +use super::poll::{Reactor as PollReactor, ReactorLock as PollReactorLock}; +use super::{Source, READ_OP, WRITE_OP}; + +use std::io::{self, Read, Write}; +use std::mem::MaybeUninit; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::panic; +use std::slice; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; +use std::time::{Duration, Instant}; + +use io_uring::cqueue::Entry as CompletionEntry; +use io_uring::squeue::Entry as SubmissionEntry; +use io_uring::types::{Fd, SubmitArgs, Timespec}; +use io_uring::{CompletionQueue, IoUring}; + +use concurrent_queue::ConcurrentQueue; +use polling::Poller; + +const EPOLL_KEY: u64 = std::u64::MAX; +const EPOLL_IN: libc::c_short = libc::POLLIN | libc::POLLPRI | libc::POLLHUP | libc::POLLERR; + +const OPERATION_SHIFT: u64 = (std::mem::size_of::() - 1) as u64; +const OPERATION_MASK: u64 = 0b1 << OPERATION_SHIFT; + +/// A `Reactor` that augments the usual `polling`-based approach +/// with `io_uring`-based I/O. +/// +/// This is useful for quick reads and writes that do not require +/// the total flexibility of `epoll`. If `io_uring` is not available, +/// it falls back to the usual `polling`-based reactor. +pub(super) struct Reactor { + /// The internal `polling`-based reactor. + /// + /// This is used for all I/O that is not compatible with `io_uring`. + polling: PollReactor, + /// Interface to the `io_uring` system API. + /// + /// This is `None` if we failed to initialize the `io_uring` library, + /// likely due to not using a compatible version of Linux. + uring: Option, +} + +impl Reactor { + /// Creates a new `Reactor` instance. + pub(super) fn new() -> Self { + const DEFAULT_RING_CAPACITY: u32 = 1024; + + Reactor { + polling: PollReactor::new(), + uring: IoUring::new(DEFAULT_RING_CAPACITY).ok().map(|io_uring| { + Uring { + io_uring, + submit_lock: Mutex::new(()), + completion_buffer: Mutex::new({ + // SAFETY: MaybeUninit is allowed to be uninitialized + let mut buffer = Vec::with_capacity(1024); + unsafe { + buffer.set_len(1024); + } + buffer.into_boxed_slice() + }), + querying_epoll: AtomicBool::new(false), + submission_wait: ConcurrentQueue::unbounded(), + } + }), + } + } + + /// Get the `Poller` backing this reactor. + pub(super) fn poller(&self) -> &Poller { + self.polling.poller() + } + + /// Returns the current ticker. + pub(super) fn ticker(&self) -> usize { + self.polling.ticker() + } + + /// Registers an I/O source in the reactor. + pub(super) fn insert_io(&self, raw: RawFd) -> io::Result> { + self.polling.insert_io(raw) + } + + /// Deregisters an I/O source from the reactor. + pub(super) fn remove_io(&self, source: &Source) -> io::Result<()> { + self.polling.remove_io(source) + } + + /// Registers a timer in the reactor. + /// + /// Returns the ID of the timer. + pub(super) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize { + self.polling.insert_timer(when, waker) + } + + /// Deregisters a timer from the reactor. + pub(super) fn remove_timer(&self, when: Instant, id: usize) { + self.polling.remove_timer(when, id); + } + + /// Notifies the thread blocked on the reactor. + pub(super) fn notify(&self) { + self.polling.notify(); + } + + /// Try to poll for a `Read` event on the given source. + pub(crate) fn poll_read( + &self, + readable: &mut impl Read, + source: &Source, + buf: &mut [u8], + cx: &mut Context<'_>, + ) -> Poll> { + if let Some(ref uring) = self.uring { + unsafe { + uring.try_operation( + READ_OP, + source, + buf, + cx, + |buf, uring_buf| { + match readable.read(buf) { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + // Resize the buffer to the proper size. + let uring_buf = &mut *uring_buf; + if uring_buf.len() < buf.len() { + // SAFETY: MaybeUninit is allowed to be uninit. + uring_buf.reserve(buf.len() - uring_buf.len()); + uring_buf.set_len(buf.len()); + } + + Err(io_uring::opcode::Read::new( + Fd(source.raw), + uring_buf.as_mut_ptr().cast(), + buf.len() as _, + ) + .build()) + } + res => Ok(res), + } + }, + |buf, uring_buf, code| match code { + code if code < 0 => Err(io::Error::from_raw_os_error(-code as _)), + result => { + // memcpy from uring_buf to buf + let uring_buf = &mut *uring_buf; + let amt = result as usize; + + // SAFETY: we know at least amt bytes are initialized + buf[..amt].copy_from_slice(slice::from_raw_parts( + uring_buf.as_ptr().cast(), + amt, + )); + Ok(amt) + } + }, + ) + } + } else { + self.polling.poll_read(readable, source, buf, cx) + } + } + + /// Try to poll for a `Write` event on the given source. + pub(crate) fn poll_write( + &self, + writable: &mut impl Write, + source: &Source, + buf: &[u8], + cx: &mut Context<'_>, + ) -> Poll> { + if let Some(ref uring) = self.uring { + unsafe { + uring.try_operation( + WRITE_OP, + source, + (), + cx, + |(), uring_buf| { + match writable.write(buf) { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + // Fill the buffer with our bytes. + let uring_buf = &mut *uring_buf; + uring_buf.clear(); + uring_buf.extend_from_slice(slice::from_raw_parts( + buf.as_ptr().cast(), + buf.len(), + )); + + Err(io_uring::opcode::Write::new( + Fd(source.raw), + uring_buf.as_ptr().cast(), + buf.len() as _, + ) + .build()) + } + res => Ok(res), + } + }, + |(), _, code| match code { + code if code < 0 => Err(io::Error::from_raw_os_error(-code as _)), + result => Ok(result as usize), + }, + ) + } + } else { + self.polling.poll_write(writable, source, buf, cx) + } + } + + /// Acquires a lock on the reactor. + pub(super) fn lock(&self) -> ReactorLock<'_> { + let reactor = self; + let inner = self.polling.lock(); + ReactorLock { inner, reactor } + } + + /// Tries to acquire a lock on the reactor. + pub(super) fn try_lock(&self) -> Option> { + self.polling.try_lock().map(|inner| { + let reactor = self; + ReactorLock { inner, reactor } + }) + } +} + +/// A lock on the polling capabilities of the `Reactor`. +pub(super) struct ReactorLock<'a> { + /// The inner lock on the `PollReactor`. + inner: PollReactorLock<'a>, + /// A reference to the main reactor. + reactor: &'a Reactor, +} + +impl<'a> ReactorLock<'a> { + /// Processes new events, blocking until the first event or timeout. + pub(super) fn react(&mut self, timeout: Option) -> io::Result<()> { + if let Some(ref uring) = self.reactor.uring { + // Prepare timers/deadline for polling. + let mut wakers = vec![]; + let (timeout, tick) = self.inner.prepare_for_polling(timeout, &mut wakers); + + // Register polling into io_uring if we haven't already. + uring.register_polling(self.reactor.poller()); + + // Wait for io_uring events. + let submitter = uring.io_uring.submitter(); + let mut args = SubmitArgs::new(); + + let timespec = timeout.map(cvt_timeout); + if let Some(ref timespec) = timespec { + args = args.timespec(timespec); + } + + // Wait for at least one event. + let res = match submitter.submit_with_args(1, &args) { + Ok(_) => { + // Process the events that we've received. + let mut buffer = uring.completion_buffer.lock().unwrap(); + // SAFETY: we hold the lock, we can read the completion queue + let mut completion_queue = unsafe { uring.io_uring.completion_shared() }; + + // If there are not events, the timer deadline must have fired. + // Check to see if we need to wake any timers. + if completion_queue.is_empty() && timeout != Some(Duration::from_secs(0)) { + self.reactor.polling.process_timers(&mut wakers); + } + + self.process_queue(uring, tick, &mut completion_queue, &mut buffer, &mut wakers) + } + Err(e) if e.kind() == io::ErrorKind::Interrupted => Ok(()), + Err(e) => Err(e), + }; + + // Wake up ready tasks. + log::trace!("react: {} ready wakers", wakers.len()); + for waker in wakers { + // Prevent a panicking waker from blowing up. + panic::catch_unwind(|| waker.wake()).ok(); + } + + res + } else { + // Fall back to the polling reactor. + self.inner.react(timeout) + } + } + + /// Process the events received from `io_uring`. + fn process_queue( + &mut self, + uring: &Uring, + tick: usize, + completion_queue: &mut CompletionQueue<'_>, + buffer: &mut [MaybeUninit], + wakers: &mut Vec, + ) -> io::Result<()> { + let mut res = Ok(()); + + // Use the associated method here to avoid using the + // iterator's method. + while !CompletionQueue::is_empty(completion_queue) { + let entries = completion_queue.fill(buffer); + + // Iterate over the entries that we've received. + for entry in entries { + let data = entry.user_data(); + + // If this is the key used for epoll, process + // the epoll events. + if data == EPOLL_KEY { + // We are no longer querying epoll. + uring.querying_epoll.store(false, Ordering::SeqCst); + + res = res.and(self.inner.pump_events( + Some(Duration::from_secs(0)), + tick, + wakers, + )); + } else { + log::trace!("Found non-epoll entry: {:?}", data); + let sources = self.reactor.polling.sources.lock().unwrap(); + + // determine the operation involved in the event + // as well as the source we need + let key = (data & !OPERATION_MASK) as usize; + let operation = ((data & OPERATION_MASK) >> OPERATION_SHIFT) as usize; + + if let Some(source) = sources.get(key) { + // Get the operation in question. + let mut state = source.state.lock().unwrap(); + let operation = &mut state.operations[operation]; + + // Indicate that the operation is complete, and wake the waker + // if there is one. + operation.status = OperationStatus::Complete(entry.result() as isize); + wakers.extend(operation.waker.take()); + } + } + } + + // Also, since there is now more room in the submission queue, + // we can submit more events. + while let Ok(waker) = uring.submission_wait.pop() { + wakers.push(waker); + } + } + + res + } +} + +struct Uring { + /// The interface to the `io_uring` system API. + io_uring: IoUring, + /// A lock to protect the submission queue. + /// + /// Holding this lock implies the exclusive right to submit new + /// I/O operations to the `io_uring`. + submit_lock: Mutex<()>, + /// A buffer used to hold completion queue events. + /// + /// Holding this lock implies the exclusive right to read from the + /// completion queue. + completion_buffer: Mutex]>>, + /// Whether or not there is currently an entry in the ring + /// for polling `epoll`. + querying_epoll: AtomicBool, + /// Tasks waiting on there to be more room in the submission queue. + submission_wait: ConcurrentQueue, +} + +impl Uring { + /// Register an operation into the reactor. + /// + /// The `op_index` parameter defines the operation to use, and the + /// `Source` parameter is the source to register the operation with. + /// `op` attempts to run the operation; potentially creating an early + /// out, or returning the entry to be inserting into the ring. + /// `transform_result` transforms the `isize` returned by the + /// operation into the desired result. + /// + /// # Safety + /// + /// The `SubmissionEntry` returned by `op` must be a valid entry. + /// In addition, the rules for the buffer must be followed. + unsafe fn try_operation( + &self, + op_index: usize, + source: &Source, + param: T, + cx: &mut Context<'_>, + op: impl FnOnce(T, *mut Vec>) -> Result, + transform_result: impl FnOnce(T, *mut Vec>, isize) -> R, + ) -> Poll { + // Fetch the operation in question. + let mut state = source.state.lock().unwrap(); + let operation = &mut state.operations[op_index]; + + // Tell if the state has been completed, or if we need to start. + match operation.status { + OperationStatus::NotStarted => { + // Start the operation. + let buffer = operation.buffer.get(); + match op(param, buffer) { + Ok(result) => { + // The operation finished early. + Poll::Ready(result) + } + Err(entry) => { + // The operation must be entered into the queue. + match self.submit_entry(source.key, op_index, entry, cx) { + Err(e) => panic!("try_operation: failed to submit entry: {}", e), + Ok(true) => { + // Register the waker in the operation. + operation.waker = Some(cx.waker().clone()); + operation.status = OperationStatus::Pending; + } + _ => {} + } + Poll::Pending + } + } + } + OperationStatus::Pending => { + // The operation is still pending. Register the waker. + if let Some(w) = operation.waker.take() { + if w.will_wake(cx.waker()) { + operation.waker = Some(w); + return Poll::Pending; + } + // Wake the previous waker, since it will be replaced. + panic::catch_unwind(|| w.wake()).ok(); + } + + operation.waker = Some(cx.waker().clone()); + Poll::Pending + } + OperationStatus::Complete(result) => { + // We've retrieved our final result. + operation.status = OperationStatus::NotStarted; + Poll::Ready(transform_result(param, operation.buffer.get(), result)) + } + } + } + + /// Register the `polling` instance into the ring if it hasn't + /// been already. + fn register_polling(&self, poller: &Poller) { + if !self.querying_epoll.swap(true, Ordering::SeqCst) { + // Create a submission queue entry for the instance. + let entry = io_uring::opcode::PollAdd::new(Fd(poller.as_raw_fd()), EPOLL_IN as _) + .build() + .user_data(EPOLL_KEY); + + // Acquire the lock to submit new operations to the ring. + let _guard = self.submit_lock.lock().unwrap(); + + // SAFETY: We have acquired the lock, so we are the only ones + // submitting new operations to the ring. + let mut submit_queue = unsafe { self.io_uring.submission_shared() }; + + // SAFETY: `polling::as_raw_fd()` is a valid file descriptor that will + // remain valid for the lifetime of this entry. + unsafe { + submit_queue + .push(&entry) + .expect("No room left for the polling entry"); + } + } + } + + /// Register an event from the given source, of the given operation. + /// + /// # Safety + /// + /// The `Entry` should be a valid entry. + unsafe fn submit_entry( + &self, + key: usize, + operation: usize, + entry: SubmissionEntry, + task: &mut Context<'_>, + ) -> io::Result { + // Add user data combining the source's key and the operation key + // to the entry. + debug_assert!(key as u64 & OPERATION_MASK == 0); + let data = (key as u64) | ((operation as u64) << OPERATION_SHIFT); + debug_assert_ne!(data, EPOLL_KEY); + + let entry = entry.user_data(data); + + // Lock the submission queue. + let _guard = self.submit_lock.lock().unwrap(); + // SAFETY: We have acquired the lock, so we can access the queue. + let mut submit_queue = self.io_uring.submission_shared(); + + // If the queue is almost full, wait for space to become available. + // + // We always leave one space available for the epoll entry at the end. + // This way, nothing will ever hamper the epoll entry being added. + if submit_queue.len() >= submit_queue.capacity() - 1 { + self.submission_wait.push(task.waker().clone()).ok(); + return Ok(false); + } + + // Push the entry to the queue. + // SAFETY: The caller asserts that `entry` is a valid entry. + submit_queue + .push(&entry) + .expect("Submit queue cannot be pushed to."); + + self.io_uring.submitter().submit()?; + + Ok(true) + } +} + +/// Convert a `Duration` to a `timespec` suitable for passing to `io_uring_submit`. +fn cvt_timeout(timeout: Duration) -> Timespec { + Timespec::new() + .sec(timeout.as_secs()) + .nsec(timeout.subsec_nanos()) +}