Skip to content

Commit 5adc608

Browse files
spacejamStjepan Glavina
authored and
Stjepan Glavina
committed
Spawn several threads when we fail to enqueue work in the blocki… (#181)
* Rebase onto master * Switch to unbounded channels
1 parent 1a51ca4 commit 5adc608

File tree

1 file changed

+25
-21
lines changed

1 file changed

+25
-21
lines changed

src/task/spawn_blocking.rs

+25-21
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
22
use std::thread;
33
use std::time::Duration;
44

5-
use crossbeam_channel::{bounded, Receiver, Sender};
5+
use crossbeam_channel::{unbounded, Receiver, Sender};
66
use once_cell::sync::Lazy;
77

88
use crate::task::{JoinHandle, Task};
@@ -79,7 +79,7 @@ static POOL: Lazy<Pool> = Lazy::new(|| {
7979
// before being acted on by a core. This helps keep
8080
// latency snappy in the overall async system by
8181
// reducing bufferbloat.
82-
let (sender, receiver) = bounded(0);
82+
let (sender, receiver) = unbounded();
8383
Pool { sender, receiver }
8484
});
8585

@@ -95,27 +95,31 @@ fn maybe_create_another_blocking_thread() {
9595
return;
9696
}
9797

98-
// We want to avoid having all threads terminate at
99-
// exactly the same time, causing thundering herd
100-
// effects. We want to stagger their destruction over
101-
// 10 seconds or so to make the costs fade into
102-
// background noise.
103-
//
104-
// Generate a simple random number of milliseconds
105-
let rand_sleep_ms = u64::from(random(10_000));
98+
let n_to_spawn = std::cmp::min(2 + (workers / 10), 10);
10699

107-
thread::Builder::new()
108-
.name("async-std/blocking".to_string())
109-
.spawn(move || {
110-
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);
100+
for _ in 0..n_to_spawn {
101+
// We want to avoid having all threads terminate at
102+
// exactly the same time, causing thundering herd
103+
// effects. We want to stagger their destruction over
104+
// 10 seconds or so to make the costs fade into
105+
// background noise.
106+
//
107+
// Generate a simple random number of milliseconds
108+
let rand_sleep_ms = u64::from(random(10_000));
111109

112-
DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
113-
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
114-
abort_on_panic(|| task.run());
115-
}
116-
DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
117-
})
118-
.expect("cannot start a dynamic thread driving blocking tasks");
110+
thread::Builder::new()
111+
.name("async-std/blocking".to_string())
112+
.spawn(move || {
113+
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);
114+
115+
DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
116+
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
117+
abort_on_panic(|| task.run());
118+
}
119+
DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
120+
})
121+
.expect("cannot start a dynamic thread driving blocking tasks");
122+
}
119123
}
120124

121125
// Enqueues work, attempting to send to the threadpool in a

0 commit comments

Comments
 (0)