-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpersistent_queue.go
83 lines (66 loc) · 2.31 KB
/
persistent_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package gocq
var errJobIdRequired = "job id is required for persistent queue"
// PersistentQueue is an interface that extends Queue to support persistent job operations
// where jobs can be recovered even after application restarts. All jobs must have unique IDs.
type PersistentQueue[T, R any] interface {
Queue[T, R]
}
type persistentQueue[T, R any] struct {
*queue[T, R]
}
// newPersistentQueue creates a new persistent queue with the given worker and internal queue
// The worker's queue is set to the provided persistent queue implementation
func newPersistentQueue[T, R any](w *worker[T, R], pq IQueue) PersistentQueue[T, R] {
w.setQueue(pq)
return &persistentQueue[T, R]{queue: &queue[T, R]{
externalQueue: newExternalQueue(w),
internalQueue: pq,
}}
}
// Add adds a job with the given data to the persistent queue
// It requires a job ID to be provided in the job config for persistence
// It will panic if no job ID is provided
// Returns an EnqueuedJob that can be used to track the job's status and result
func (q *persistentQueue[T, R]) Add(data T, configs ...JobConfigFunc) EnqueuedJob[R] {
jobConfig := loadJobConfigs(q.configs, configs...)
if jobConfig.Id == "" {
panic(errJobIdRequired)
}
j := newJob[T, R](data, jobConfig)
val, _ := j.Json()
q.internalQueue.Enqueue(val)
q.postEnqueue(j)
return j
}
// AddAll adds multiple jobs to the persistent queue at once
// Each item must have a unique ID for persistence
// Returns an EnqueuedGroupJob that can be used to track all jobs' statuses and results
// Will panic if any job is missing an ID
func (q *persistentQueue[T, R]) AddAll(items []Item[T]) EnqueuedGroupJob[R] {
groupJob := newGroupJob[T, R](uint32(len(items)))
for _, item := range items {
jConfigs := loadJobConfigs(q.configs, WithJobId(item.ID))
if jConfigs.Id == "" {
panic(errJobIdRequired)
}
j := groupJob.NewJob(item.Value, jConfigs)
val, _ := j.Json()
ok := q.internalQueue.Enqueue(val)
if !ok {
groupJob.wg.Done()
continue
}
q.postEnqueue(j)
}
return groupJob
}
// Purge removes all jobs from the queue
func (q *persistentQueue[T, R]) Purge() {
q.queue.Purge()
}
// Close stops the worker and closes the underlying queue
// Returns any error encountered while closing the queue
func (q *persistentQueue[T, R]) Close() error {
defer q.Stop()
return q.Queue.Close()
}