Skip to content

Commit 6fd8dfd

Browse files
committed
[refactor] Simplify worker code
1 parent 93ba355 commit 6fd8dfd

File tree

1 file changed

+45
-35
lines changed

1 file changed

+45
-35
lines changed

worker.go

+45-35
Original file line numberDiff line numberDiff line change
@@ -49,25 +49,23 @@ func (w *worker[Resource]) endBatchesAfterDeadline() {
4949
return
5050
}
5151

52-
ctx, cancel := context.WithDeadline(context.Background(), _batch.deadline)
52+
w.endBatch(_batch)
5353

54-
err := w.saveResource(ctx, _batch.key, _batch.resource)
55-
_batch.publishResult(err)
56-
delete(w.batchByResourceKey, _batch.key)
54+
delete(w.batchByResourceKey, _batch.resourceKey)
5755
w.batchByDeadline = w.batchByDeadline[1:]
58-
59-
cancel()
6056
}
6157
}
6258

63-
func (w *worker[Resource]) endAllBatches() {
64-
for key, _batch := range w.batchByResourceKey {
65-
ctx, cancel := context.WithDeadline(context.Background(), _batch.deadline)
66-
67-
err := w.saveResource(ctx, key, _batch.resource)
68-
_batch.publishResult(err)
59+
func (w *worker[Resource]) endBatch(_batch *batch[Resource]) {
60+
ctx, cancel := context.WithDeadline(context.Background(), _batch.deadline)
61+
err := w.saveResource(ctx, _batch.resourceKey, _batch.resource)
62+
_batch.publishResult(err)
63+
cancel()
64+
}
6965

70-
cancel()
66+
func (w *worker[Resource]) endAllBatches() {
67+
for _, _batch := range w.batchByDeadline {
68+
w.endBatch(_batch)
7169
}
7270

7371
w.batchByResourceKey = map[string]*batch[Resource]{}
@@ -77,48 +75,60 @@ func (w *worker[Resource]) endAllBatches() {
7775
func (w *worker[Resource]) runOperation(_operation operation[Resource]) {
7876
_batch, found := w.batchByResourceKey[_operation.resourceKey]
7977
if !found {
80-
now := time.Now()
81-
deadline := now.Add(w.minDuration)
82-
83-
ctx, cancel := context.WithDeadline(context.Background(), deadline)
84-
defer cancel()
85-
86-
resource, err := w.loadResource(ctx, _operation.resourceKey)
78+
var err error
79+
_batch, err = w.newBatch(_operation.resourceKey)
8780
if err != nil {
8881
_operation.result <- err
8982
return
9083
}
9184

92-
_batch = &batch[Resource]{
93-
key: _operation.resourceKey,
94-
resource: resource,
95-
deadline: deadline,
96-
}
97-
98-
w.batchByResourceKey[_operation.resourceKey] = _batch
99-
w.batchByDeadline = append(w.batchByDeadline, _batch)
85+
w.addBatch(_batch)
10086
}
10187

102-
_batch.results = append(_batch.results, _operation.result)
88+
_batch.operationResults = append(_batch.operationResults, _operation.result)
10389

10490
_operation.run(_batch.resource)
10591
}
10692

93+
func (w *worker[Resource]) newBatch(resourceKey string) (*batch[Resource], error) {
94+
now := time.Now()
95+
deadline := now.Add(w.minDuration)
96+
97+
ctx, cancel := context.WithDeadline(context.Background(), deadline)
98+
defer cancel()
99+
100+
resource, err := w.loadResource(ctx, resourceKey)
101+
if err != nil {
102+
return nil, err
103+
}
104+
105+
return &batch[Resource]{
106+
resourceKey: resourceKey,
107+
resource: resource,
108+
deadline: deadline,
109+
}, nil
110+
}
111+
112+
func (w *worker[Resource]) addBatch(b *batch[Resource]) {
113+
w.batchByResourceKey[b.resourceKey] = b
114+
w.batchByDeadline = append(w.batchByDeadline, b)
115+
}
116+
107117
type operation[Resource any] struct {
108118
resourceKey string
109119
run func(Resource)
110120
result chan error
111121
}
112122

113123
type batch[Resource any] struct {
114-
key string
115-
resource Resource
116-
results []chan error
117-
deadline time.Time
124+
resourceKey string
125+
resource Resource
126+
operationResults []chan error
127+
deadline time.Time
118128
}
119129

120130
func (b *batch[Resource]) publishResult(result error) {
121-
for _, c := range b.results {
122-
c <- result
131+
for _, r := range b.operationResults {
132+
r <- result
123133
}
124134
}

0 commit comments

Comments
 (0)