Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.

Commit d3b034a

Browse files
avaloncheRuteri
andauthored
Add SSE subscription to builder (#53)
* Add SSE subscription to builder * withdrawals marshalling * add stop channel * pr comments * Add handling multiple beacon clients (#57) * Add handling multiple beacon clients * Initialize stop channel in builder.Builder * fix withdrawals array pointer * Build on a single head (#59) * Build on a single head * Forcibly stop building process for old sse events --------- Co-authored-by: avalonche <[email protected]> * linting --------- Co-authored-by: Mateusz Morusiewicz <[email protected]>
1 parent 470a183 commit d3b034a

24 files changed

+302
-79
lines changed

accounts/abi/bind/backends/simulated.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func NewSimulatedBackendChain(database ethdb.Database, blockchain *core.BlockCha
119119
filterBackend := &filterBackend{database, blockchain, backend}
120120
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
121121
backend.events = filters.NewEventSystem(backend.filterSystem, false)
122-
122+
123123
header := backend.blockchain.CurrentBlock()
124124
block := backend.blockchain.GetBlock(header.Hash(), header.Number.Uint64())
125125

builder/beacon_client.go

+188-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package builder
22

33
import (
4+
"context"
45
"encoding/json"
56
"errors"
67
"fmt"
@@ -10,11 +11,23 @@ import (
1011
"sync"
1112
"time"
1213

14+
"github.com/attestantio/go-eth2-client/spec/capella"
1315
"github.com/ethereum/go-ethereum/common"
1416
"github.com/ethereum/go-ethereum/common/hexutil"
17+
"github.com/ethereum/go-ethereum/core/types"
1518
"github.com/ethereum/go-ethereum/log"
19+
"github.com/r3labs/sse"
20+
"golang.org/x/exp/slices"
1621
)
1722

23+
type IBeaconClient interface {
24+
isValidator(pubkey PubkeyHex) bool
25+
getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error)
26+
SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes)
27+
Start() error
28+
Stop()
29+
}
30+
1831
type testBeaconClient struct {
1932
validator *ValidatorPrivateData
2033
slot uint64
@@ -30,8 +43,110 @@ func (b *testBeaconClient) isValidator(pubkey PubkeyHex) bool {
3043
func (b *testBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
3144
return PubkeyHex(hexutil.Encode(b.validator.Pk)), nil
3245
}
33-
func (b *testBeaconClient) Start() error {
34-
return nil
46+
47+
func (b *testBeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
48+
}
49+
50+
func (b *testBeaconClient) Start() error { return nil }
51+
52+
type NilBeaconClient struct{}
53+
54+
func (b *NilBeaconClient) isValidator(pubkey PubkeyHex) bool {
55+
return false
56+
}
57+
58+
func (b *NilBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
59+
return PubkeyHex(""), nil
60+
}
61+
62+
func (b *NilBeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
63+
}
64+
65+
func (b *NilBeaconClient) Start() error { return nil }
66+
67+
func (b *NilBeaconClient) Stop() {}
68+
69+
type MultiBeaconClient struct {
70+
clients []*BeaconClient
71+
closeCh chan struct{}
72+
}
73+
74+
func NewMultiBeaconClient(endpoints []string, slotsInEpoch uint64, secondsInSlot uint64) *MultiBeaconClient {
75+
clients := []*BeaconClient{}
76+
for _, endpoint := range endpoints {
77+
client := NewBeaconClient(endpoint, slotsInEpoch, secondsInSlot)
78+
clients = append(clients, client)
79+
}
80+
81+
return &MultiBeaconClient{
82+
clients: clients,
83+
closeCh: make(chan struct{}),
84+
}
85+
}
86+
87+
func (m *MultiBeaconClient) isValidator(pubkey PubkeyHex) bool {
88+
for _, c := range m.clients {
89+
// Pick the first one, always true
90+
return c.isValidator(pubkey)
91+
}
92+
93+
return false
94+
}
95+
96+
func (m *MultiBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
97+
var allErrs error
98+
for _, c := range m.clients {
99+
pk, err := c.getProposerForNextSlot(requestedSlot)
100+
if err != nil {
101+
allErrs = errors.Join(allErrs, err)
102+
continue
103+
}
104+
105+
return pk, nil
106+
}
107+
return PubkeyHex(""), allErrs
108+
}
109+
110+
func payloadAttributesMatch(l types.BuilderPayloadAttributes, r types.BuilderPayloadAttributes) bool {
111+
if l.Timestamp != r.Timestamp ||
112+
l.Random != r.Random ||
113+
l.SuggestedFeeRecipient != r.SuggestedFeeRecipient ||
114+
l.Slot != r.Slot ||
115+
l.HeadHash != r.HeadHash ||
116+
l.GasLimit != r.GasLimit {
117+
return false
118+
}
119+
120+
if !slices.Equal(l.Withdrawals, r.Withdrawals) {
121+
return false
122+
}
123+
124+
return true
125+
}
126+
127+
func (m *MultiBeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
128+
for _, c := range m.clients {
129+
go c.SubscribeToPayloadAttributesEvents(payloadAttrC)
130+
}
131+
}
132+
133+
func (m *MultiBeaconClient) Start() error {
134+
var allErrs error
135+
for _, c := range m.clients {
136+
err := c.Start()
137+
if err != nil {
138+
allErrs = errors.Join(allErrs, err)
139+
}
140+
}
141+
return allErrs
142+
}
143+
144+
func (m *MultiBeaconClient) Stop() {
145+
for _, c := range m.clients {
146+
c.Stop()
147+
}
148+
149+
close(m.closeCh)
35150
}
36151

37152
type BeaconClient struct {
@@ -42,21 +157,24 @@ type BeaconClient struct {
42157
mu sync.Mutex
43158
slotProposerMap map[uint64]PubkeyHex
44159

45-
closeCh chan struct{}
160+
ctx context.Context
161+
cancelFn context.CancelFunc
46162
}
47163

48164
func NewBeaconClient(endpoint string, slotsInEpoch uint64, secondsInSlot uint64) *BeaconClient {
165+
ctx, cancelFn := context.WithCancel(context.Background())
49166
return &BeaconClient{
50167
endpoint: endpoint,
51168
slotsInEpoch: slotsInEpoch,
52169
secondsInSlot: secondsInSlot,
53170
slotProposerMap: make(map[uint64]PubkeyHex),
54-
closeCh: make(chan struct{}),
171+
ctx: ctx,
172+
cancelFn: cancelFn,
55173
}
56174
}
57175

58176
func (b *BeaconClient) Stop() {
59-
close(b.closeCh)
177+
b.cancelFn()
60178
}
61179

62180
func (b *BeaconClient) isValidator(pubkey PubkeyHex) bool {
@@ -109,7 +227,7 @@ func (b *BeaconClient) UpdateValidatorMapForever() {
109227
defer timer.Stop()
110228
for true {
111229
select {
112-
case <-b.closeCh:
230+
case <-b.ctx.Done():
113231
return
114232
case <-timer.C:
115233
}
@@ -154,6 +272,70 @@ func (b *BeaconClient) UpdateValidatorMapForever() {
154272
}
155273
}
156274

275+
// PayloadAttributesEvent represents the data of a payload_attributes event
276+
// {"version": "capella", "data": {"proposer_index": "123", "proposal_slot": "10", "parent_block_number": "9", "parent_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "parent_block_hash": "0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "payload_attributes": {"timestamp": "123456", "prev_randao": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "suggested_fee_recipient": "0x0000000000000000000000000000000000000000", "withdrawals": [{"index": "5", "validator_index": "10", "address": "0x0000000000000000000000000000000000000000", "amount": "15640"}]}}}
277+
type PayloadAttributesEvent struct {
278+
Version string `json:"version"`
279+
Data PayloadAttributesEventData `json:"data"`
280+
}
281+
282+
type PayloadAttributesEventData struct {
283+
ProposalSlot uint64 `json:"proposal_slot,string"`
284+
ParentBlockHash common.Hash `json:"parent_block_hash"`
285+
PayloadAttributes PayloadAttributes `json:"payload_attributes"`
286+
}
287+
288+
type PayloadAttributes struct {
289+
Timestamp uint64 `json:"timestamp,string"`
290+
PrevRandao common.Hash `json:"prev_randao"`
291+
SuggestedFeeRecipient common.Address `json:"suggested_fee_recipient"`
292+
Withdrawals []*capella.Withdrawal `json:"withdrawals"`
293+
}
294+
295+
// SubscribeToPayloadAttributesEvents subscribes to payload attributes events to validate fields such as prevrandao and withdrawals
296+
func (b *BeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
297+
payloadAttributesResp := new(PayloadAttributesEvent)
298+
299+
eventsURL := fmt.Sprintf("%s/eth/v1/events?topics=payload_attributes", b.endpoint)
300+
log.Info("subscribing to payload_attributes events")
301+
302+
for {
303+
client := sse.NewClient(eventsURL)
304+
err := client.SubscribeRawWithContext(b.ctx, func(msg *sse.Event) {
305+
err := json.Unmarshal(msg.Data, payloadAttributesResp)
306+
if err != nil {
307+
log.Error("could not unmarshal payload_attributes event", "err", err)
308+
} else {
309+
// convert capella.Withdrawal to types.Withdrawal
310+
var withdrawals []*types.Withdrawal
311+
for _, w := range payloadAttributesResp.Data.PayloadAttributes.Withdrawals {
312+
withdrawals = append(withdrawals, &types.Withdrawal{
313+
Index: uint64(w.Index),
314+
Validator: uint64(w.ValidatorIndex),
315+
Address: common.Address(w.Address),
316+
Amount: uint64(w.Amount),
317+
})
318+
}
319+
320+
data := types.BuilderPayloadAttributes{
321+
Slot: payloadAttributesResp.Data.ProposalSlot,
322+
HeadHash: payloadAttributesResp.Data.ParentBlockHash,
323+
Timestamp: hexutil.Uint64(payloadAttributesResp.Data.PayloadAttributes.Timestamp),
324+
Random: payloadAttributesResp.Data.PayloadAttributes.PrevRandao,
325+
SuggestedFeeRecipient: payloadAttributesResp.Data.PayloadAttributes.SuggestedFeeRecipient,
326+
Withdrawals: withdrawals,
327+
}
328+
payloadAttrC <- data
329+
}
330+
})
331+
if err != nil {
332+
log.Error("failed to subscribe to payload_attributes events", "err", err)
333+
time.Sleep(1 * time.Second)
334+
}
335+
log.Warn("beaconclient SubscribeRaw ended, reconnecting")
336+
}
337+
}
338+
157339
func fetchCurrentSlot(endpoint string) (uint64, error) {
158340
headerRes := &struct {
159341
Data []struct {

builder/builder.go

+43-17
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,6 @@ type ValidatorData struct {
3737
GasLimit uint64
3838
}
3939

40-
type IBeaconClient interface {
41-
isValidator(pubkey PubkeyHex) bool
42-
getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error)
43-
Start() error
44-
Stop()
45-
}
46-
4740
type IRelay interface {
4841
SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, vd ValidatorData) error
4942
SubmitBlockCapella(msg *capellaapi.SubmitBlockRequest, vd ValidatorData) error
@@ -64,6 +57,7 @@ type Builder struct {
6457
eth IEthereumService
6558
dryRun bool
6659
validator *blockvalidation.BlockValidationAPI
60+
beaconClient IBeaconClient
6761
builderSecretKey *bls.SecretKey
6862
builderPublicKey boostTypes.PublicKey
6963
builderSigningDomain boostTypes.Domain
@@ -75,9 +69,11 @@ type Builder struct {
7569
slotAttrs []types.BuilderPayloadAttributes
7670
slotCtx context.Context
7771
slotCtxCancel context.CancelFunc
72+
73+
stop chan struct{}
7874
}
7975

80-
func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService, dryRun bool, validator *blockvalidation.BlockValidationAPI) *Builder {
76+
func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService, dryRun bool, validator *blockvalidation.BlockValidationAPI, beaconClient IBeaconClient) *Builder {
8177
pkBytes := bls.PublicKeyFromSecretKey(sk).Compress()
8278
pk := boostTypes.PublicKey{}
8379
pk.FromSlice(pkBytes)
@@ -89,6 +85,7 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe
8985
eth: eth,
9086
dryRun: dryRun,
9187
validator: validator,
88+
beaconClient: beaconClient,
9289
builderSecretKey: sk,
9390
builderPublicKey: pk,
9491
builderSigningDomain: builderSigningDomain,
@@ -97,14 +94,42 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe
9794
slot: 0,
9895
slotCtx: slotCtx,
9996
slotCtxCancel: slotCtxCancel,
97+
98+
stop: make(chan struct{}, 1),
10099
}
101100
}
102101

103102
func (b *Builder) Start() error {
103+
// Start regular payload attributes updates
104+
go func() {
105+
c := make(chan types.BuilderPayloadAttributes)
106+
go b.beaconClient.SubscribeToPayloadAttributesEvents(c)
107+
108+
currentSlot := uint64(0)
109+
110+
for {
111+
select {
112+
case <-b.stop:
113+
return
114+
case payloadAttributes := <-c:
115+
// Right now we are building only on a single head. This might change in the future!
116+
if payloadAttributes.Slot < currentSlot {
117+
continue
118+
} else if payloadAttributes.Slot == currentSlot {
119+
b.OnPayloadAttribute(&payloadAttributes)
120+
} else if payloadAttributes.Slot > currentSlot {
121+
currentSlot = payloadAttributes.Slot
122+
b.OnPayloadAttribute(&payloadAttributes)
123+
}
124+
}
125+
}
126+
}()
127+
104128
return b.relay.Start()
105129
}
106130

107131
func (b *Builder) Stop() error {
132+
close(b.stop)
108133
return nil
109134
}
110135

@@ -277,18 +302,19 @@ func (b *Builder) OnPayloadAttribute(attrs *types.BuilderPayloadAttributes) erro
277302
b.slotMu.Lock()
278303
defer b.slotMu.Unlock()
279304

280-
if b.slot != attrs.Slot {
281-
if b.slotCtxCancel != nil {
282-
b.slotCtxCancel()
283-
}
305+
// Forcibly cancel previous building job, build on top of reorgable blocks as this is the behaviour relays expect.
306+
// This will change in the future
284307

285-
slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second)
286-
b.slot = attrs.Slot
287-
b.slotAttrs = nil
288-
b.slotCtx = slotCtx
289-
b.slotCtxCancel = slotCtxCancel
308+
if b.slotCtxCancel != nil {
309+
b.slotCtxCancel()
290310
}
291311

312+
slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second)
313+
b.slot = attrs.Slot
314+
b.slotAttrs = nil
315+
b.slotCtx = slotCtx
316+
b.slotCtxCancel = slotCtxCancel
317+
292318
for _, currentAttrs := range b.slotAttrs {
293319
if attrs.Equal(&currentAttrs) {
294320
log.Debug("ignoring known payload attribute", "slot", attrs.Slot, "hash", attrs.HeadHash)

builder/builder_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func TestOnPayloadAttributes(t *testing.T) {
7373

7474
testEthService := &testEthereumService{synced: true, testExecutableData: testExecutableData, testBlock: testBlock, testBlockValue: big.NewInt(10)}
7575

76-
builder := NewBuilder(sk, flashbotsextra.NilDbService{}, &testRelay, bDomain, testEthService, false, nil)
76+
builder := NewBuilder(sk, flashbotsextra.NilDbService{}, &testRelay, bDomain, testEthService, false, nil, &testBeacon)
7777
builder.Start()
7878
defer builder.Stop()
7979

0 commit comments

Comments
 (0)