Skip to content

Commit 9e3c838

Browse files
(follower_node)support beacon node client as blob provider (#988)
* support beacon node client as blob provider * fix * fix formatting * use url.JoinPath instead of path * don't move pos each time in blob_client_list --------- Co-authored-by: jonastheis <[email protected]>
1 parent 0b2fe3b commit 9e3c838

File tree

10 files changed

+231
-63
lines changed

10 files changed

+231
-63
lines changed

cmd/geth/main.go

+1
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ var (
175175
utils.DASnapshotFileFlag,
176176
utils.DABlockNativeAPIEndpointFlag,
177177
utils.DABlobScanAPIEndpointFlag,
178+
utils.DABeaconNodeAPIEndpointFlag,
178179
}
179180

180181
rpcFlags = []cli.Flag{

cmd/utils/flags.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -879,16 +879,19 @@ var (
879879
}
880880
DASnapshotFileFlag = cli.StringFlag{
881881
Name: "da.snapshot.file",
882-
Usage: "Snapshot file to sync from da",
882+
Usage: "Snapshot file to sync from DA",
883883
}
884884
DABlobScanAPIEndpointFlag = cli.StringFlag{
885885
Name: "da.blob.blobscan",
886-
Usage: "BlobScan blob api endpoint",
887-
Value: ethconfig.Defaults.DA.BlobScanAPIEndpoint,
886+
Usage: "BlobScan blob API endpoint",
888887
}
889888
DABlockNativeAPIEndpointFlag = cli.StringFlag{
890889
Name: "da.blob.blocknative",
891-
Usage: "BlockNative blob api endpoint",
890+
Usage: "BlockNative blob API endpoint",
891+
}
892+
DABeaconNodeAPIEndpointFlag = cli.StringFlag{
893+
Name: "da.blob.beaconnode",
894+
Usage: "Beacon node API endpoint",
892895
}
893896
)
894897

@@ -1625,6 +1628,9 @@ func setDA(ctx *cli.Context, cfg *ethconfig.Config) {
16251628
if ctx.GlobalIsSet(DABlockNativeAPIEndpointFlag.Name) {
16261629
cfg.DA.BlockNativeAPIEndpoint = ctx.GlobalString(DABlockNativeAPIEndpointFlag.Name)
16271630
}
1631+
if ctx.GlobalIsSet(DABeaconNodeAPIEndpointFlag.Name) {
1632+
cfg.DA.BeaconNodeAPIEndpoint = ctx.GlobalString(DABeaconNodeAPIEndpointFlag.Name)
1633+
}
16281634
}
16291635
}
16301636

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package blob_client
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
"net/url"
11+
"strconv"
12+
13+
"github.com/scroll-tech/go-ethereum/common"
14+
"github.com/scroll-tech/go-ethereum/crypto/kzg4844"
15+
"github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service"
16+
)
17+
18+
type BeaconNodeClient struct {
19+
apiEndpoint string
20+
l1Client *rollup_sync_service.L1Client
21+
genesisTime uint64
22+
secondsPerSlot uint64
23+
}
24+
25+
var (
26+
beaconNodeGenesisEndpoint = "/eth/v1/beacon/genesis"
27+
beaconNodeSpecEndpoint = "/eth/v1/config/spec"
28+
beaconNodeBlobEndpoint = "/eth/v1/beacon/blob_sidecars"
29+
)
30+
31+
func NewBeaconNodeClient(apiEndpoint string, l1Client *rollup_sync_service.L1Client) (*BeaconNodeClient, error) {
32+
// get genesis time
33+
genesisPath, err := url.JoinPath(apiEndpoint, beaconNodeGenesisEndpoint)
34+
if err != nil {
35+
return nil, fmt.Errorf("failed to join path, err: %w", err)
36+
}
37+
resp, err := http.Get(genesisPath)
38+
if err != nil {
39+
return nil, fmt.Errorf("cannot do request, err: %w", err)
40+
}
41+
defer resp.Body.Close()
42+
43+
if resp.StatusCode != http.StatusOK {
44+
body, _ := io.ReadAll(resp.Body)
45+
bodyStr := string(body)
46+
return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr)
47+
}
48+
49+
var genesisResp GenesisResp
50+
err = json.NewDecoder(resp.Body).Decode(&genesisResp)
51+
if err != nil {
52+
return nil, fmt.Errorf("failed to decode result into struct, err: %w", err)
53+
}
54+
genesisTime, err := strconv.ParseUint(genesisResp.Data.GenesisTime, 10, 64)
55+
if err != nil {
56+
return nil, fmt.Errorf("failed to decode genesis time %s, err: %w", genesisResp.Data.GenesisTime, err)
57+
}
58+
59+
// get seconds per slot from spec
60+
specPath, err := url.JoinPath(apiEndpoint, beaconNodeSpecEndpoint)
61+
if err != nil {
62+
return nil, fmt.Errorf("failed to join path, err: %w", err)
63+
}
64+
resp, err = http.Get(specPath)
65+
if err != nil {
66+
return nil, fmt.Errorf("cannot do request, err: %w", err)
67+
}
68+
defer resp.Body.Close()
69+
70+
if resp.StatusCode != http.StatusOK {
71+
body, _ := io.ReadAll(resp.Body)
72+
bodyStr := string(body)
73+
return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr)
74+
}
75+
76+
var specResp SpecResp
77+
err = json.NewDecoder(resp.Body).Decode(&specResp)
78+
if err != nil {
79+
return nil, fmt.Errorf("failed to decode result into struct, err: %w", err)
80+
}
81+
secondsPerSlot, err := strconv.ParseUint(specResp.Data.SecondsPerSlot, 10, 64)
82+
if err != nil {
83+
return nil, fmt.Errorf("failed to decode seconds per slot %s, err: %w", specResp.Data.SecondsPerSlot, err)
84+
}
85+
if secondsPerSlot == 0 {
86+
return nil, fmt.Errorf("failed to make new BeaconNodeClient, secondsPerSlot is 0")
87+
}
88+
89+
return &BeaconNodeClient{
90+
apiEndpoint: apiEndpoint,
91+
l1Client: l1Client,
92+
genesisTime: genesisTime,
93+
secondsPerSlot: secondsPerSlot,
94+
}, nil
95+
}
96+
97+
func (c *BeaconNodeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
98+
// get block timestamp to calculate slot
99+
header, err := c.l1Client.GetHeaderByNumber(blockNumber)
100+
if err != nil {
101+
return nil, fmt.Errorf("failed to get header by number, err: %w", err)
102+
}
103+
slot := (header.Time - c.genesisTime) / c.secondsPerSlot
104+
105+
// get blob sidecar for slot
106+
blobSidecarPath, err := url.JoinPath(c.apiEndpoint, beaconNodeBlobEndpoint, fmt.Sprintf("%d", slot))
107+
if err != nil {
108+
return nil, fmt.Errorf("failed to join path, err: %w", err)
109+
}
110+
resp, err := http.Get(blobSidecarPath)
111+
if err != nil {
112+
return nil, fmt.Errorf("cannot do request, err: %w", err)
113+
}
114+
defer resp.Body.Close()
115+
116+
if resp.StatusCode != http.StatusOK {
117+
body, _ := io.ReadAll(resp.Body)
118+
bodyStr := string(body)
119+
return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr)
120+
}
121+
122+
var blobSidecarResp BlobSidecarResp
123+
err = json.NewDecoder(resp.Body).Decode(&blobSidecarResp)
124+
if err != nil {
125+
return nil, fmt.Errorf("failed to decode result into struct, err: %w", err)
126+
}
127+
128+
// find blob with desired versionedHash
129+
for _, blob := range blobSidecarResp.Data {
130+
// calculate blob hash from commitment and check it with desired
131+
commitmentBytes := common.FromHex(blob.KzgCommitment)
132+
if len(commitmentBytes) != lenKZGCommitment {
133+
return nil, fmt.Errorf("len of kzg commitment is not correct, expected: %d, got: %d", lenKZGCommitment, len(commitmentBytes))
134+
}
135+
commitment := kzg4844.Commitment(commitmentBytes)
136+
blobVersionedHash := kzg4844.CalcBlobHashV1(sha256.New(), &commitment)
137+
138+
if blobVersionedHash == versionedHash {
139+
// found desired blob
140+
blobBytes := common.FromHex(blob.Blob)
141+
if len(blobBytes) != lenBlobBytes {
142+
return nil, fmt.Errorf("len of blob data is not correct, expected: %d, got: %d", lenBlobBytes, len(blobBytes))
143+
}
144+
145+
b := kzg4844.Blob(blobBytes)
146+
return &b, nil
147+
}
148+
}
149+
150+
return nil, fmt.Errorf("missing blob %v in slot %d, block number %d", versionedHash, slot, blockNumber)
151+
}
152+
153+
type GenesisResp struct {
154+
Data struct {
155+
GenesisTime string `json:"genesis_time"`
156+
} `json:"data"`
157+
}
158+
159+
type SpecResp struct {
160+
Data struct {
161+
SecondsPerSlot string `json:"SECONDS_PER_SLOT"`
162+
} `json:"data"`
163+
}
164+
165+
type BlobSidecarResp struct {
166+
Data []struct {
167+
Index string `json:"index"`
168+
Blob string `json:"blob"`
169+
KzgCommitment string `json:"kzg_commitment"`
170+
KzgProof string `json:"kzg_proof"`
171+
SignedBlockHeader struct {
172+
Message struct {
173+
Slot string `json:"slot"`
174+
ProposerIndex string `json:"proposer_index"`
175+
ParentRoot string `json:"parent_root"`
176+
StateRoot string `json:"state_root"`
177+
BodyRoot string `json:"body_root"`
178+
} `json:"message"`
179+
Signature string `json:"signature"`
180+
} `json:"signed_block_header"`
181+
KzgCommitmentInclusionProof []string `json:"kzg_commitment_inclusion_proof"`
182+
} `json:"data"`
183+
}

rollup/da_syncer/blob_client/blob_client.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import (
88
)
99

1010
const (
11-
okStatusCode int = 200
12-
lenBlobBytes int = 131072
11+
lenBlobBytes int = 131072
12+
lenKZGCommitment int = 48
1313
)
1414

1515
type BlobClient interface {
16-
GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error)
16+
GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error)
1717
}

rollup/da_syncer/blob_client/blob_client_list.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,17 @@ func NewBlobClientList(blobClients ...BlobClient) *BlobClientList {
2323
}
2424
}
2525

26-
func (c *BlobClientList) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) {
26+
func (c *BlobClientList) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
2727
if len(c.list) == 0 {
2828
return nil, fmt.Errorf("BlobClientList.GetBlobByVersionedHash: list of BlobClients is empty")
2929
}
3030

3131
for i := 0; i < len(c.list); i++ {
32-
blob, err := c.list[c.nextPos()].GetBlobByVersionedHash(ctx, versionedHash)
32+
blob, err := c.list[c.curPos].GetBlobByVersionedHashAndBlockNumber(ctx, versionedHash, blockNumber)
3333
if err == nil {
3434
return blob, nil
3535
}
36-
36+
c.nextPos()
3737
// there was an error, try the next blob client in following iteration
3838
log.Warn("BlobClientList: failed to get blob by versioned hash from BlobClient", "err", err, "blob client pos in BlobClientList", c.curPos)
3939
}
@@ -42,9 +42,8 @@ func (c *BlobClientList) GetBlobByVersionedHash(ctx context.Context, versionedHa
4242
return nil, serrors.NewTemporaryError(errors.New("BlobClientList.GetBlobByVersionedHash: failed to get blob by versioned hash from all BlobClients"))
4343
}
4444

45-
func (c *BlobClientList) nextPos() int {
45+
func (c *BlobClientList) nextPos() {
4646
c.curPos = (c.curPos + 1) % len(c.list)
47-
return c.curPos
4847
}
4948

5049
func (c *BlobClientList) AddBlobClient(blobClient BlobClient) {

rollup/da_syncer/blob_client/blob_scan_client.go

+4-38
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func NewBlobScanClient(apiEndpoint string) *BlobScanClient {
2424
}
2525
}
2626

27-
func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) {
27+
func (c *BlobScanClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
2828
// blobscan api docs https://api.blobscan.com/#/blobs/blob-getByBlobId
2929
path, err := url.JoinPath(c.apiEndpoint, versionedHash.String())
3030
if err != nil {
@@ -40,8 +40,8 @@ func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHa
4040
return nil, fmt.Errorf("cannot do request, err: %w", err)
4141
}
4242
defer resp.Body.Close()
43-
if resp.StatusCode != okStatusCode {
44-
if resp.StatusCode == 404 {
43+
if resp.StatusCode != http.StatusOK {
44+
if resp.StatusCode == http.StatusNotFound {
4545
return nil, fmt.Errorf("no blob with versioned hash : %s", versionedHash.String())
4646
}
4747
var res ErrorRespBlobScan
@@ -69,44 +69,10 @@ func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHa
6969
}
7070

7171
type BlobRespBlobScan struct {
72-
Commitment string `json:"commitment"`
73-
Proof string `json:"proof"`
74-
Size int `json:"size"`
75-
VersionedHash string `json:"versionedHash"`
76-
Data string `json:"data"`
77-
DataStorageReferences []struct {
78-
BlobStorage string `json:"blobStorage"`
79-
DataReference string `json:"dataReference"`
80-
} `json:"dataStorageReferences"`
81-
Transactions []struct {
82-
Hash string `json:"hash"`
83-
Index int `json:"index"`
84-
Block struct {
85-
Number int `json:"number"`
86-
BlobGasUsed string `json:"blobGasUsed"`
87-
BlobAsCalldataGasUsed string `json:"blobAsCalldataGasUsed"`
88-
BlobGasPrice string `json:"blobGasPrice"`
89-
ExcessBlobGas string `json:"excessBlobGas"`
90-
Hash string `json:"hash"`
91-
Timestamp string `json:"timestamp"`
92-
Slot int `json:"slot"`
93-
} `json:"block"`
94-
From string `json:"from"`
95-
To string `json:"to"`
96-
MaxFeePerBlobGas string `json:"maxFeePerBlobGas"`
97-
BlobAsCalldataGasUsed string `json:"blobAsCalldataGasUsed"`
98-
Rollup string `json:"rollup"`
99-
BlobAsCalldataGasFee string `json:"blobAsCalldataGasFee"`
100-
BlobGasBaseFee string `json:"blobGasBaseFee"`
101-
BlobGasMaxFee string `json:"blobGasMaxFee"`
102-
BlobGasUsed string `json:"blobGasUsed"`
103-
} `json:"transactions"`
72+
Data string `json:"data"`
10473
}
10574

10675
type ErrorRespBlobScan struct {
10776
Message string `json:"message"`
10877
Code string `json:"code"`
109-
Issues []struct {
110-
Message string `json:"message"`
111-
} `json:"issues"`
11278
}

rollup/da_syncer/blob_client/block_native_client.go

+3-8
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func NewBlockNativeClient(apiEndpoint string) *BlockNativeClient {
2222
}
2323
}
2424

25-
func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) {
25+
func (c *BlockNativeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
2626
// blocknative api docs https://docs.blocknative.com/blocknative-data-archive/blob-archive
2727
path, err := url.JoinPath(c.apiEndpoint, versionedHash.String())
2828
if err != nil {
@@ -33,7 +33,7 @@ func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versione
3333
return nil, fmt.Errorf("cannot do request, err: %w", err)
3434
}
3535
defer resp.Body.Close()
36-
if resp.StatusCode != okStatusCode {
36+
if resp.StatusCode != http.StatusOK {
3737
var res ErrorRespBlockNative
3838
err = json.NewDecoder(resp.Body).Decode(&res)
3939
if err != nil {
@@ -59,12 +59,7 @@ func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versione
5959

6060
type BlobRespBlockNative struct {
6161
Blob struct {
62-
VersionedHash string `json:"versionedHash"`
63-
Commitment string `json:"commitment"`
64-
Proof string `json:"proof"`
65-
ZeroBytes int `json:"zeroBytes"`
66-
NonZeroBytes int `json:"nonZeroBytes"`
67-
Data string `json:"data"`
62+
Data string `json:"data"`
6863
} `json:"blob"`
6964
}
7065

rollup/da_syncer/da/commitV1.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func NewCommitBatchDAV1WithBlobDecodeFunc(ctx context.Context, db ethdb.Database
5555
return nil, fmt.Errorf("failed to fetch blob hash, err: %w", err)
5656
}
5757

58-
blob, err := blobClient.GetBlobByVersionedHash(ctx, versionedHash)
58+
blob, err := blobClient.GetBlobByVersionedHashAndBlockNumber(ctx, versionedHash, vLog.BlockNumber)
5959
if err != nil {
6060
return nil, fmt.Errorf("failed to fetch blob from blob client, err: %w", err)
6161
}

0 commit comments

Comments
 (0)