diff --git a/Cargo.lock b/Cargo.lock index 60486df46..3b520259a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7116,6 +7116,7 @@ dependencies = [ "tikv-jemallocator", "time", "tokio", + "tokio-stream", "tokio-tungstenite 0.26.2", "tokio-util", "tower 0.4.13", diff --git a/crates/op-rbuilder/Cargo.toml b/crates/op-rbuilder/Cargo.toml index 5f2d8800c..811485477 100644 --- a/crates/op-rbuilder/Cargo.toml +++ b/crates/op-rbuilder/Cargo.toml @@ -83,6 +83,7 @@ derive_more.workspace = true metrics.workspace = true serde_json.workspace = true tokio-util.workspace = true +tokio-stream.workspace = true time = { version = "0.3.36", features = ["macros", "formatting", "parsing"] } chrono = "0.4" diff --git a/crates/op-rbuilder/src/payload_builder.rs b/crates/op-rbuilder/src/payload_builder.rs index 11521d497..ac6e4ed89 100644 --- a/crates/op-rbuilder/src/payload_builder.rs +++ b/crates/op-rbuilder/src/payload_builder.rs @@ -1,3 +1,4 @@ +use crate::primitives::reth::PayloadBuilderService; use crate::{ generator::{BlockCell, BlockPayloadJobGenerator, BuildArguments, PayloadBuilder}, primitives::reth::ExecutionInfo, @@ -39,7 +40,6 @@ use reth_optimism_payload_builder::{ }; use reth_optimism_primitives::{OpPrimitives, OpReceipt, OpTransactionSigned}; use reth_optimism_txpool::OpPooledTx; -use reth_payload_builder::PayloadBuilderService; use reth_payload_builder_primitives::PayloadBuilderError; use reth_payload_primitives::PayloadBuilderAttributes; use reth_payload_util::{BestPayloadTransactions, PayloadTransactions}; diff --git a/crates/op-rbuilder/src/payload_builder_vanilla.rs b/crates/op-rbuilder/src/payload_builder_vanilla.rs index c95bc0595..c5797f13c 100644 --- a/crates/op-rbuilder/src/payload_builder_vanilla.rs +++ b/crates/op-rbuilder/src/payload_builder_vanilla.rs @@ -1,3 +1,4 @@ +use crate::primitives::reth::PayloadBuilderService; use crate::{ generator::{BlockCell, BlockPayloadJobGenerator, BuildArguments, PayloadBuilder}, metrics::OpRBuilderMetrics, @@ -56,7 +57,6 @@ use reth_optimism_primitives::{ OpPrimitives, OpReceipt, OpTransactionSigned, ADDRESS_L2_TO_L1_MESSAGE_PASSER, }; use reth_optimism_txpool::OpPooledTx; -use reth_payload_builder::PayloadBuilderService; use reth_payload_builder_primitives::PayloadBuilderError; use reth_payload_primitives::PayloadBuilderAttributes; use reth_payload_util::{BestPayloadTransactions, PayloadTransactions}; diff --git a/crates/op-rbuilder/src/primitives/reth/mod.rs b/crates/op-rbuilder/src/primitives/reth/mod.rs index 4b6de4c5b..f21973d16 100644 --- a/crates/op-rbuilder/src/primitives/reth/mod.rs +++ b/crates/op-rbuilder/src/primitives/reth/mod.rs @@ -1,2 +1,6 @@ mod execution; +mod payload_builder_service; + pub use execution::{ExecutedPayload, ExecutionInfo}; + +pub use payload_builder_service::PayloadBuilderService; diff --git a/crates/op-rbuilder/src/primitives/reth/payload_builder_service.rs b/crates/op-rbuilder/src/primitives/reth/payload_builder_service.rs new file mode 100644 index 000000000..e5b8014fb --- /dev/null +++ b/crates/op-rbuilder/src/primitives/reth/payload_builder_service.rs @@ -0,0 +1,358 @@ +//! Source <> and <> +// The MIT License (MIT) +// +// Copyright (c) 2022-2025 Reth Contributors +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +use alloy_consensus::BlockHeader; +use alloy_rpc_types_engine::PayloadId; +use futures_util::{future::FutureExt, Stream, StreamExt}; +use reth_chain_state::CanonStateNotification; +use reth_metrics::{ + metrics::{Counter, Gauge}, + Metrics, +}; +use reth_payload_builder::{ + KeepPayloadJobAlive, PayloadBuilderHandle, PayloadJob, PayloadJobGenerator, + PayloadServiceCommand, +}; +use reth_payload_builder_primitives::Events; +use reth_payload_primitives::{ + BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError, PayloadKind, PayloadTypes, +}; +use reth_primitives_traits::NodePrimitives; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::sync::{broadcast, mpsc}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::{debug, info, trace, warn}; + +type PayloadFuture

= Pin> + Send + Sync>>; + +/// A service that manages payload building tasks. +/// +/// This type is an endless future that manages the building of payloads. +/// +/// It tracks active payloads and their build jobs that run in a worker pool. +/// +/// By design, this type relies entirely on the [`PayloadJobGenerator`] to create new payloads and +/// does know nothing about how to build them, it just drives their jobs to completion. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct PayloadBuilderService +where + T: PayloadTypes, + Gen: PayloadJobGenerator, + Gen::Job: PayloadJob, +{ + /// The type that knows how to create new payloads. + generator: Gen, + /// All active payload jobs. + payload_jobs: Vec<(Gen::Job, PayloadId)>, + /// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand. + service_tx: mpsc::UnboundedSender>, + /// Receiver half of the command channel. + command_rx: UnboundedReceiverStream>, + /// Metrics for the payload builder service + metrics: PayloadBuilderServiceMetrics, + /// Chain events notification stream + chain_events: St, + /// Payload events handler, used to broadcast and subscribe to payload events. + payload_events: broadcast::Sender>, +} + +const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20; + +// === impl PayloadBuilderService === + +impl PayloadBuilderService +where + T: PayloadTypes, + Gen: PayloadJobGenerator, + Gen::Job: PayloadJob, + ::BuiltPayload: Into, +{ + /// Creates a new payload builder service and returns the [`PayloadBuilderHandle`] to interact + /// with it. + /// + /// This also takes a stream of chain events that will be forwarded to the generator to apply + /// additional logic when new state is committed. See also + /// [`PayloadJobGenerator::on_new_state`]. + pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle) { + let (service_tx, command_rx) = mpsc::unbounded_channel(); + let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE); + + let service = Self { + generator, + payload_jobs: Vec::new(), + service_tx, + command_rx: UnboundedReceiverStream::new(command_rx), + metrics: Default::default(), + chain_events, + payload_events, + }; + + let handle = service.handle(); + (service, handle) + } + + /// Returns a handle to the service. + pub fn handle(&self) -> PayloadBuilderHandle { + PayloadBuilderHandle::new(self.service_tx.clone()) + } + + /// Returns true if the given payload is currently being built. + fn contains_payload(&self, id: PayloadId) -> bool { + self.payload_jobs.iter().any(|(_, job_id)| *job_id == id) + } + + /// Returns the best payload for the given identifier that has been built so far. + fn best_payload(&self, id: PayloadId) -> Option> { + let res = self + .payload_jobs + .iter() + .find(|(_, job_id)| *job_id == id) + .map(|(j, _)| j.best_payload().map(|p| p.into())); + if let Some(Ok(ref best)) = res { + self.metrics + .set_best_revenue(best.block().number(), f64::from(best.fees())); + } + + res + } + + /// Returns the best payload for the given identifier that has been built so far and terminates + /// the job if requested. + fn resolve( + &mut self, + id: PayloadId, + kind: PayloadKind, + ) -> Option> { + trace!(%id, "resolving payload job"); + + let job = self + .payload_jobs + .iter() + .position(|(_, job_id)| *job_id == id)?; + let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind); + + if keep_alive == KeepPayloadJobAlive::No { + let (_, id) = self.payload_jobs.swap_remove(job); + trace!(%id, "terminated resolved job"); + } + + // Since the fees will not be known until the payload future is resolved / awaited, we wrap + // the future in a new future that will update the metrics. + let resolved_metrics = self.metrics.clone(); + let payload_events = self.payload_events.clone(); + + let fut = async move { + let res = fut.await; + if let Ok(ref payload) = res { + payload_events + .send(Events::BuiltPayload(payload.clone().into())) + .ok(); + + resolved_metrics + .set_resolved_revenue(payload.block().number(), f64::from(payload.fees())); + } + res.map(|p| p.into()) + }; + + Some(Box::pin(fut)) + } +} + +impl PayloadBuilderService +where + T: PayloadTypes, + Gen: PayloadJobGenerator, + Gen::Job: PayloadJob, + ::BuiltPayload: Into, +{ + /// Returns the payload attributes for the given payload. + fn payload_attributes( + &self, + id: PayloadId, + ) -> Option::PayloadAttributes, PayloadBuilderError>> { + let attributes = self + .payload_jobs + .iter() + .find(|(_, job_id)| *job_id == id) + .map(|(j, _)| j.payload_attributes()); + + if attributes.is_none() { + trace!(%id, "no matching payload job found to get attributes for"); + } + + attributes + } +} + +impl Future for PayloadBuilderService +where + T: PayloadTypes, + N: NodePrimitives, + Gen: PayloadJobGenerator + Unpin + 'static, + ::Job: Unpin + 'static, + St: Stream> + Send + Unpin + 'static, + Gen::Job: PayloadJob, + ::BuiltPayload: Into, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + loop { + // notify the generator of new chain events + while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) { + this.generator.on_new_state(new_head); + } + + // we poll all jobs first, so we always have the latest payload that we can report if + // requests + // we don't care about the order of the jobs, so we can just swap_remove them + for idx in (0..this.payload_jobs.len()).rev() { + let (mut job, id) = this.payload_jobs.swap_remove(idx); + + // drain better payloads from the job + match job.poll_unpin(cx) { + Poll::Ready(Ok(_)) => { + this.metrics.set_active_jobs(this.payload_jobs.len()); + trace!(%id, "payload job finished"); + } + Poll::Ready(Err(err)) => { + warn!(%err, ?id, "Payload builder job failed; resolving payload"); + this.metrics.inc_failed_jobs(); + this.metrics.set_active_jobs(this.payload_jobs.len()); + } + Poll::Pending => { + // still pending, put it back + this.payload_jobs.push((job, id)); + } + } + } + + // marker for exit condition + let mut new_job = false; + + // drain all requests + while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) { + match cmd { + PayloadServiceCommand::BuildNewPayload(attr, tx) => { + let id = attr.payload_id(); + let mut res = Ok(id); + + if this.contains_payload(id) { + debug!(%id, parent = %attr.parent(), "Payload job already in progress, ignoring."); + } else { + // no job for this payload yet, create one + let parent = attr.parent(); + match this.generator.new_payload_job(attr.clone()) { + Ok(job) => { + info!(%id, %parent, "New payload job created"); + this.metrics.inc_initiated_jobs(); + new_job = true; + this.payload_jobs.push((job, id)); + this.payload_events + .send(Events::Attributes(attr.clone())) + .ok(); + } + Err(err) => { + this.metrics.inc_failed_jobs(); + warn!(%err, %id, "Failed to create payload builder job"); + res = Err(err); + } + } + } + + // return the id of the payload + let _ = tx.send(res); + } + PayloadServiceCommand::BestPayload(id, tx) => { + let _ = tx.send(this.best_payload(id)); + } + PayloadServiceCommand::PayloadAttributes(id, tx) => { + let attributes = this.payload_attributes(id); + let _ = tx.send(attributes); + } + PayloadServiceCommand::Resolve(id, strategy, tx) => { + let _ = tx.send(this.resolve(id, strategy)); + } + PayloadServiceCommand::Subscribe(tx) => { + let new_rx = this.payload_events.subscribe(); + let _ = tx.send(new_rx); + } + } + } + + if !new_job { + return Poll::Pending; + } + } + } +} + +/// Payload builder service metrics +#[derive(Metrics, Clone)] +#[metrics(scope = "payloads")] +pub(crate) struct PayloadBuilderServiceMetrics { + /// Number of active jobs + pub(crate) active_jobs: Gauge, + /// Total number of initiated jobs + pub(crate) initiated_jobs: Counter, + /// Total number of failed jobs + pub(crate) failed_jobs: Counter, + /// Coinbase revenue for best payloads + pub(crate) best_revenue: Gauge, + /// Current block returned as the best payload + pub(crate) best_block: Gauge, + /// Coinbase revenue for resolved payloads + pub(crate) resolved_revenue: Gauge, + /// Current block returned as the resolved payload + pub(crate) resolved_block: Gauge, +} + +impl PayloadBuilderServiceMetrics { + pub(crate) fn inc_initiated_jobs(&self) { + self.initiated_jobs.increment(1); + } + + pub(crate) fn inc_failed_jobs(&self) { + self.failed_jobs.increment(1); + } + + pub(crate) fn set_active_jobs(&self, value: usize) { + self.active_jobs.set(value as f64) + } + + pub(crate) fn set_best_revenue(&self, block: u64, value: f64) { + self.best_block.set(block as f64); + self.best_revenue.set(value) + } + + pub(crate) fn set_resolved_revenue(&self, block: u64, value: f64) { + self.resolved_block.set(block as f64); + self.resolved_revenue.set(value) + } +}