Skip to content

Commit 399dcfc

Browse files
committed
feat: introduce custom errors and mark RPC related errors as temporary so that they can be retried in pipeline
1 parent f8eadfe commit 399dcfc

File tree

9 files changed

+111
-30
lines changed

9 files changed

+111
-30
lines changed

rollup/da_syncer/blob_client/blob_client_list.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ package blob_client
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
6-
"io"
77

88
"github.com/scroll-tech/go-ethereum/common"
99
"github.com/scroll-tech/go-ethereum/crypto/kzg4844"
1010
"github.com/scroll-tech/go-ethereum/log"
11+
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
1112
)
1213

1314
type BlobClientList struct {
@@ -37,8 +38,8 @@ func (c *BlobClientList) GetBlobByVersionedHash(ctx context.Context, versionedHa
3738
log.Warn("BlobClientList: failed to get blob by versioned hash from BlobClient", "err", err, "blob client pos in BlobClientList", c.curPos)
3839
}
3940

40-
// if we iterated over entire list, return EOF error that will be handled in syncing_pipeline with a backoff and retry
41-
return nil, io.EOF
41+
// if we iterated over entire list, return a temporary error that will be handled in syncing_pipeline with a backoff and retry
42+
return nil, serrors.NewTemporaryError(errors.New("BlobClientList.GetBlobByVersionedHash: failed to get blob by versioned hash from all BlobClients"))
4243
}
4344

4445
func (c *BlobClientList) nextPos() int {

rollup/da_syncer/block_queue.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@ func (bq *BlockQueue) getBlocksFromBatch(ctx context.Context) error {
4545
return fmt.Errorf("unexpected type of daEntry: %T", daEntry)
4646
}
4747

48-
bq.blocks, err = entryWithBlocks.Blocks()
49-
if err != nil {
50-
return fmt.Errorf("failed to get blocks from daEntry: %w", err)
51-
}
48+
bq.blocks = entryWithBlocks.Blocks()
5249

5350
return nil
5451
}

rollup/da_syncer/da/calldata_blob_source.go

+11-8
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/scroll-tech/go-ethereum/ethdb"
1212
"github.com/scroll-tech/go-ethereum/log"
1313
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client"
14+
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
1415
"github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service"
1516
)
1617

@@ -72,7 +73,7 @@ func (ds *CalldataBlobSource) NextData() (Entries, error) {
7273
if to > ds.l1Finalized {
7374
ds.l1Finalized, err = ds.l1Client.GetLatestFinalizedBlockNumber()
7475
if err != nil {
75-
return nil, fmt.Errorf("failed to query GetLatestFinalizedBlockNumber, error: %v", err)
76+
return nil, serrors.NewTemporaryError(fmt.Errorf("failed to query GetLatestFinalizedBlockNumber, error: %v", err))
7677
}
7778
// make sure we don't request more than finalized blocks
7879
to = min(to, ds.l1Finalized)
@@ -84,13 +85,15 @@ func (ds *CalldataBlobSource) NextData() (Entries, error) {
8485

8586
logs, err := ds.l1Client.FetchRollupEventsInRange(ds.l1height, to)
8687
if err != nil {
87-
return nil, fmt.Errorf("cannot get events, l1height: %d, error: %v", ds.l1height, err)
88+
return nil, serrors.NewTemporaryError(fmt.Errorf("cannot get events, l1height: %d, error: %v", ds.l1height, err))
8889
}
8990
da, err := ds.processLogsToDA(logs)
90-
if err == nil {
91-
ds.l1height = to + 1
91+
if err != nil {
92+
return nil, serrors.NewTemporaryError(fmt.Errorf("failed to process logs to DA, error: %v", err))
9293
}
93-
return da, err
94+
95+
ds.l1height = to + 1
96+
return da, nil
9497
}
9598

9699
func (ds *CalldataBlobSource) L1Height() uint64 {
@@ -119,7 +122,7 @@ func (ds *CalldataBlobSource) processLogsToDA(logs []types.Log) (Entries, error)
119122

120123
case ds.l1RevertBatchEventSignature:
121124
event := &rollup_sync_service.L1RevertBatchEvent{}
122-
if err := rollup_sync_service.UnpackLog(ds.scrollChainABI, event, revertBatchEventName, vLog); err != nil {
125+
if err = rollup_sync_service.UnpackLog(ds.scrollChainABI, event, revertBatchEventName, vLog); err != nil {
123126
return nil, fmt.Errorf("failed to unpack revert rollup event log, err: %w", err)
124127
}
125128

@@ -129,7 +132,7 @@ func (ds *CalldataBlobSource) processLogsToDA(logs []types.Log) (Entries, error)
129132

130133
case ds.l1FinalizeBatchEventSignature:
131134
event := &rollup_sync_service.L1FinalizeBatchEvent{}
132-
if err := rollup_sync_service.UnpackLog(ds.scrollChainABI, event, finalizeBatchEventName, vLog); err != nil {
135+
if err = rollup_sync_service.UnpackLog(ds.scrollChainABI, event, finalizeBatchEventName, vLog); err != nil {
133136
return nil, fmt.Errorf("failed to unpack finalized rollup event log, err: %w", err)
134137
}
135138

@@ -188,7 +191,7 @@ func (ds *CalldataBlobSource) getCommitBatchDA(batchIndex uint64, vLog *types.Lo
188191

189192
txData, err := ds.l1Client.FetchTxData(vLog)
190193
if err != nil {
191-
return nil, err
194+
return nil, fmt.Errorf("failed to fetch tx data, tx hash: %v, err: %w", vLog.TxHash.Hex(), err)
192195
}
193196
if len(txData) < methodIDLength {
194197
return nil, fmt.Errorf("transaction data is too short, length of tx data: %v, minimum length required: %v", len(txData), methodIDLength)

rollup/da_syncer/da/commitV0.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ package da
33
import (
44
"encoding/binary"
55
"fmt"
6-
"io"
76

87
"github.com/scroll-tech/da-codec/encoding"
98
"github.com/scroll-tech/da-codec/encoding/codecv0"
109

1110
"github.com/scroll-tech/go-ethereum/core/rawdb"
1211
"github.com/scroll-tech/go-ethereum/core/types"
1312
"github.com/scroll-tech/go-ethereum/ethdb"
13+
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
1414
)
1515

1616
type CommitBatchDAV0 struct {
@@ -92,7 +92,7 @@ func (c *CommitBatchDAV0) CompareTo(other Entry) int {
9292
return 0
9393
}
9494

95-
func (c *CommitBatchDAV0) Blocks() ([]*PartialBlock, error) {
95+
func (c *CommitBatchDAV0) Blocks() []*PartialBlock {
9696
var blocks []*PartialBlock
9797
l1TxPointer := 0
9898

@@ -125,7 +125,8 @@ func (c *CommitBatchDAV0) Blocks() ([]*PartialBlock, error) {
125125
blocks = append(blocks, block)
126126
}
127127
}
128-
return blocks, nil
128+
129+
return blocks
129130
}
130131

131132
func getTotalMessagesPoppedFromChunks(decodedChunks []*codecv0.DAChunkRawTx) int {
@@ -156,8 +157,8 @@ func getL1Messages(db ethdb.Database, parentTotalL1MessagePopped uint64, skipped
156157
l1Tx := rawdb.ReadL1Message(db, currentIndex)
157158
if l1Tx == nil {
158159
// message not yet available
159-
// we return io.EOF as this will be handled in the syncing pipeline with a backoff and retry
160-
return nil, io.EOF
160+
// we return serrors.EOFError as this will be handled in the syncing pipeline with a backoff and retry
161+
return nil, serrors.EOFError
161162
}
162163
txs = append(txs, l1Tx)
163164
currentIndex++

rollup/da_syncer/da/da.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type Entry interface {
3131

3232
type EntryWithBlocks interface {
3333
Entry
34-
Blocks() ([]*PartialBlock, error)
34+
Blocks() []*PartialBlock
3535
}
3636

3737
type Entries []Entry

rollup/da_syncer/da_queue.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66

77
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/da"
8+
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
89
)
910

1011
// DAQueue is a pipeline stage that reads DA entries from a DataSource and provides them to the next stage.
@@ -54,7 +55,9 @@ func (dq *DAQueue) getNextData(ctx context.Context) error {
5455
if errors.Is(err, da.ErrSourceExhausted) {
5556
dq.l1height = dq.dataSource.L1Height()
5657
dq.dataSource = nil
57-
return dq.getNextData(ctx)
58+
59+
// we return EOFError to be handled in pipeline
60+
return serrors.EOFError
5861
}
5962

6063
return err

rollup/da_syncer/serrors/errors.go

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package serrors
2+
3+
import (
4+
"fmt"
5+
)
6+
7+
const (
8+
temporary Type = iota
9+
eof
10+
)
11+
12+
var (
13+
TemporaryError = NewTemporaryError(nil)
14+
EOFError = NewEOFError(nil)
15+
)
16+
17+
type Type uint8
18+
19+
func (t Type) String() string {
20+
switch t {
21+
case temporary:
22+
return "temporary"
23+
case eof:
24+
return "EOF"
25+
default:
26+
return "unknown"
27+
}
28+
}
29+
30+
type syncError struct {
31+
t Type
32+
err error
33+
}
34+
35+
func NewTemporaryError(err error) error {
36+
return &syncError{t: temporary, err: err}
37+
}
38+
39+
func NewEOFError(err error) error {
40+
return &syncError{t: eof, err: err}
41+
}
42+
43+
func (s *syncError) Error() string {
44+
return fmt.Sprintf("%s: %v", s.t, s.err)
45+
}
46+
47+
func (s *syncError) Unwrap() error {
48+
return s.err
49+
}
50+
51+
func (s *syncError) Is(target error) bool {
52+
if target == nil {
53+
return s == nil
54+
}
55+
56+
targetSyncErr, ok := target.(*syncError)
57+
if !ok {
58+
return false
59+
}
60+
61+
return s.t == targetSyncErr.t
62+
}

rollup/da_syncer/syncing_pipeline.go

+21-7
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"io"
87
"sync"
98
"time"
109

@@ -15,6 +14,7 @@ import (
1514
"github.com/scroll-tech/go-ethereum/log"
1615
"github.com/scroll-tech/go-ethereum/params"
1716
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client"
17+
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
1818
"github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service"
1919
"github.com/scroll-tech/go-ethereum/rollup/sync_service"
2020
)
@@ -114,6 +114,7 @@ func (s *SyncingPipeline) mainLoop() {
114114
stepCh := make(chan struct{}, 1)
115115
var delayedStepCh <-chan time.Time
116116
var resetCounter int
117+
var tempErrorCounter int
117118

118119
// reqStep is a helper function to request a step to be executed.
119120
// If delay is true, it will request a delayed step with exponential backoff, otherwise it will request an immediate step.
@@ -157,29 +158,42 @@ func (s *SyncingPipeline) mainLoop() {
157158
reqStep(false)
158159
s.expBackoff.Reset()
159160
resetCounter = 0
161+
tempErrorCounter = 0
160162
continue
161163
}
162164

163-
if errors.Is(err, io.EOF) {
165+
if errors.Is(err, serrors.EOFError) {
164166
// pipeline is empty, request a delayed step
167+
// TODO: eventually (with state manager) this should not trigger a delayed step because external events will trigger a new step anyway
165168
reqStep(true)
169+
tempErrorCounter = 0
166170
continue
167-
}
168-
if errors.Is(err, ErrBlockTooLow) {
171+
} else if errors.Is(err, serrors.TemporaryError) {
172+
log.Warn("syncing pipeline step failed due to temporary error, retrying", "err", err)
173+
if tempErrorCounter > 100 {
174+
log.Warn("syncing pipeline step failed due to 100 consecutive temporary errors, stopping pipeline worker", "last err", err)
175+
return
176+
}
177+
178+
// temporary error, request a delayed step
179+
reqStep(true)
180+
tempErrorCounter++
181+
continue
182+
} else if errors.Is(err, ErrBlockTooLow) {
169183
// block number returned by the block queue is too low,
170184
// we skip the blocks until we reach the correct block number again.
171185
reqStep(false)
186+
tempErrorCounter = 0
172187
continue
173188
} else if errors.Is(err, ErrBlockTooHigh) {
174189
// block number returned by the block queue is too high,
175190
// reset the pipeline and move backwards from the last L1 block we read
176191
s.reset(resetCounter)
177192
resetCounter++
178193
reqStep(false)
194+
tempErrorCounter = 0
179195
continue
180-
}
181-
182-
if errors.Is(err, context.Canceled) {
196+
} else if errors.Is(err, context.Canceled) {
183197
log.Info("syncing pipeline stopped due to cancelled context", "err", err)
184198
return
185199
}

rollup/rollup_sync_service/l1client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func NewL1Client(ctx context.Context, l1Client sync_service.EthClient, l1ChainId
5555
return &client, nil
5656
}
5757

58-
// fetcRollupEventsInRange retrieves and parses commit/revert/finalize rollup events between block numbers: [from, to].
58+
// FetchRollupEventsInRange retrieves and parses commit/revert/finalize rollup events between block numbers: [from, to].
5959
func (c *L1Client) FetchRollupEventsInRange(from, to uint64) ([]types.Log, error) {
6060
log.Trace("L1Client fetchRollupEventsInRange", "fromBlock", from, "toBlock", to)
6161

0 commit comments

Comments
 (0)