Skip to content

Commit c1546bd

Browse files
committed
Barrier waits in order of completion/failure.
1 parent a5b59e1 commit c1546bd

File tree

6 files changed

+60
-16
lines changed

6 files changed

+60
-16
lines changed

lib/async/barrier.rb

+29-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
require_relative "list"
77
require_relative "task"
8+
require_relative "queue"
89

910
module Async
1011
# A general purpose synchronisation primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore}.
@@ -16,6 +17,7 @@ class Barrier
1617
# @public Since *Async v1*.
1718
def initialize(parent: nil)
1819
@tasks = List.new
20+
@finished = Queue.new
1921

2022
@parent = parent
2123
end
@@ -41,11 +43,15 @@ def size
4143
# Execute a child task and add it to the barrier.
4244
# @asynchronous Executes the given block concurrently.
4345
def async(*arguments, parent: (@parent or Task.current), **options, &block)
44-
task = parent.async(*arguments, **options, &block)
46+
waiting = nil
4547

46-
@tasks.append(TaskNode.new(task))
47-
48-
return task
48+
parent.async(*arguments, **options) do |task, *arguments|
49+
waiting = TaskNode.new(task)
50+
@tasks.append(waiting)
51+
block.call(task, *arguments)
52+
ensure
53+
@finished.signal(waiting)
54+
end
4955
end
5056

5157
# Whether there are any tasks being held by the barrier.
@@ -55,14 +61,27 @@ def empty?
5561
end
5662

5763
# Wait for all tasks to complete by invoking {Task#wait} on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.
64+
#
65+
# @yields {|task| ...} If a block is given, the unwaited task is yielded. You must invoke {Task#wait} yourself. In addition, you may `break` if you have captured enough results.
66+
#
5867
# @asynchronous Will wait for tasks to finish executing.
5968
def wait
60-
@tasks.each do |waiting|
69+
while !@tasks.empty?
70+
# Wait for a task to finish (we get the task node):
71+
return unless waiting = @finished.wait
72+
73+
# Remove the task as it is now finishing:
74+
@tasks.remove?(waiting)
75+
76+
# Get the task:
6177
task = waiting.task
62-
begin
78+
79+
# If a block is given, the user can implement their own behaviour:
80+
if block_given?
81+
yield task
82+
else
83+
# Wait for it to either complete or raise an error:
6384
task.wait
64-
ensure
65-
@tasks.remove?(waiting) unless task.alive?
6685
end
6786
end
6887
end
@@ -73,6 +92,8 @@ def stop
7392
@tasks.each do |waiting|
7493
waiting.task.stop
7594
end
95+
96+
@finished.close
7697
end
7798
end
7899
end

lib/async/list.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def remove(node)
134134
return removed(node)
135135
end
136136

137-
# @returns [Boolean] Returns true if the list is empty.
137+
# @returns [Boolean] True if the list is empty.
138138
def empty?
139139
@size == 0
140140
end

lib/async/notification.rb

+4-2
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@ module Async
1010
# @public Since *Async v1*.
1111
class Notification < Condition
1212
# Signal to a given task that it should resume operations.
13+
#
14+
# @returns [Boolean] if a task was signalled.
1315
def signal(value = nil, task: Task.current)
14-
return if @waiting.empty?
16+
return false if @waiting.empty?
1517

1618
Fiber.scheduler.push Signal.new(self.exchange, value)
1719

18-
return nil
20+
return true
1921
end
2022

2123
Signal = Struct.new(:waiting, :value) do

lib/async/queue.rb

+13
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,19 @@ class Queue
2020
# @parameter available [Notification] The notification to use for signaling when items are available.
2121
def initialize(parent: nil, available: Notification.new)
2222
@items = []
23+
@closed = false
2324
@parent = parent
2425
@available = available
2526
end
2627

28+
def close
29+
@closed = true
30+
31+
while @available.waiting?
32+
@available.signal(nil)
33+
end
34+
end
35+
2736
# @attribute [Array] The items in the queue.
2837
attr :items
2938

@@ -59,6 +68,10 @@ def enqueue(*items)
5968
# Remove and return the next item from the queue.
6069
def dequeue
6170
while @items.empty?
71+
if @closed
72+
return nil
73+
end
74+
6275
@available.wait
6376
end
6477

releases.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
- Ruby v3.1 support is dropped.
66
- `Async::Wrapper` which was previously deprecated, is now removed.
7+
- `Async::Barrier` now waits in order of completion rather than order of creation. This means that if you create a barrier with 3 tasks, and one of them completes (or fails) before the others, it will be the first to be yielded to the barrier.
78

89
## v2.23.0
910

test/async/barrier.rb

+12-5
Original file line numberDiff line numberDiff line change
@@ -85,21 +85,28 @@
8585

8686
# It's possible for Barrier#wait to be interrupted with an unexpected exception, and this should not cause the barrier to incorrectly remove that task from the wait list.
8787
it "waits for tasks with timeouts" do
88+
repeats = 5
89+
count = 0
90+
8891
begin
89-
reactor.with_timeout(5/100.0/2) do
90-
5.times do |i|
92+
reactor.with_timeout(repeats/100.0/2) do
93+
repeats.times do |i|
9194
barrier.async do |task|
9295
sleep(i/100.0)
9396
end
9497
end
9598

96-
expect(barrier.tasks.size).to be == 5
97-
barrier.wait
99+
expect(barrier.tasks.size).to be == repeats
100+
101+
barrier.wait do |task|
102+
task.wait
103+
count += 1
104+
end
98105
end
99106
rescue Async::TimeoutError
100107
# Expected.
101108
ensure
102-
expect(barrier.tasks.size).to be == 2
109+
expect(barrier.tasks.size).to be == (repeats - count)
103110
barrier.stop
104111
end
105112
end

0 commit comments

Comments
 (0)