-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathexternal_queue.go
114 lines (91 loc) · 2.6 KB
/
external_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package gocq
import (
"fmt"
"strings"
"time"
)
type externalQueue[T, R any] struct {
*worker[T, R]
}
// IExternalQueue is the root interface of concurrent queue operations.
type IExternalQueue[T, R any] interface {
IBaseQueue
// Worker returns the worker.
Worker() Worker[T, R]
// JobById returns the job with the given id.
JobById(id string) (EnqueuedJob[R], error)
// GroupsJobById returns the groups job with the given id.
GroupsJobById(id string) (EnqueuedSingleGroupJob[R], error)
// WaitUntilFinished waits until all pending Jobs in the queue are processed.
// Time complexity: O(n) where n is the number of pending Jobs
WaitUntilFinished()
// WaitAndClose waits until all pending Jobs in the queue are processed and then closes the queue.
// Time complexity: O(n) where n is the number of pending Jobs
WaitAndClose() error
}
func newExternalQueue[T, R any](worker *worker[T, R]) *externalQueue[T, R] {
return &externalQueue[T, R]{
worker: worker,
}
}
func (wbq *externalQueue[T, R]) postEnqueue(j iJob[T, R]) {
defer wbq.notifyToPullNextJobs()
j.ChangeStatus(queued)
if id := j.ID(); id != "" {
wbq.Cache.Store(id, j)
}
}
func (wbq *externalQueue[T, R]) PendingCount() int {
return wbq.Queue.Len()
}
func (wbq *externalQueue[T, R]) Worker() Worker[T, R] {
return wbq.worker
}
func (wbq *externalQueue[T, R]) JobById(id string) (EnqueuedJob[R], error) {
val, ok := wbq.Cache.Load(id)
if !ok {
return nil, fmt.Errorf("job not found for id: %s", id)
}
return val.(EnqueuedJob[R]), nil
}
func (wbq *externalQueue[T, R]) GroupsJobById(id string) (EnqueuedSingleGroupJob[R], error) {
if !strings.HasPrefix(id, groupIdPrefixed) {
id = generateGroupId(id)
}
val, ok := wbq.Cache.Load(id)
if !ok {
return nil, fmt.Errorf("groups job not found for id: %s", id)
}
return val.(EnqueuedSingleGroupJob[R]), nil
}
func (wbq *externalQueue[T, R]) WaitUntilFinished() {
// to ignore deadlock error if the queue is paused
if wbq.IsPaused() {
wbq.Resume()
}
wbq.sync.wg.Wait()
// wait until all ongoing processes are done if still pending
for wbq.PendingCount() > 0 || wbq.CurrentProcessingCount() > 0 {
time.Sleep(10 * time.Millisecond)
}
}
func (wbq *externalQueue[T, R]) Purge() {
prevValues := wbq.Queue.Values()
wbq.Queue.Purge()
// close all pending channels to avoid routine leaks
for _, val := range prevValues {
if j, ok := val.(iJob[T, R]); ok {
j.CloseResultChannel()
}
}
}
func (q *externalQueue[T, R]) Close() error {
q.Purge()
q.Stop()
q.WaitUntilFinished()
return nil
}
func (q *externalQueue[T, R]) WaitAndClose() error {
q.WaitUntilFinished()
return q.Close()
}