diff --git a/Cargo.lock b/Cargo.lock
index 60486df46..3b520259a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7116,6 +7116,7 @@ dependencies = [
"tikv-jemallocator",
"time",
"tokio",
+ "tokio-stream",
"tokio-tungstenite 0.26.2",
"tokio-util",
"tower 0.4.13",
diff --git a/crates/op-rbuilder/Cargo.toml b/crates/op-rbuilder/Cargo.toml
index 5f2d8800c..811485477 100644
--- a/crates/op-rbuilder/Cargo.toml
+++ b/crates/op-rbuilder/Cargo.toml
@@ -83,6 +83,7 @@ derive_more.workspace = true
metrics.workspace = true
serde_json.workspace = true
tokio-util.workspace = true
+tokio-stream.workspace = true
time = { version = "0.3.36", features = ["macros", "formatting", "parsing"] }
chrono = "0.4"
diff --git a/crates/op-rbuilder/src/payload_builder.rs b/crates/op-rbuilder/src/payload_builder.rs
index 11521d497..ac6e4ed89 100644
--- a/crates/op-rbuilder/src/payload_builder.rs
+++ b/crates/op-rbuilder/src/payload_builder.rs
@@ -1,3 +1,4 @@
+use crate::primitives::reth::PayloadBuilderService;
use crate::{
generator::{BlockCell, BlockPayloadJobGenerator, BuildArguments, PayloadBuilder},
primitives::reth::ExecutionInfo,
@@ -39,7 +40,6 @@ use reth_optimism_payload_builder::{
};
use reth_optimism_primitives::{OpPrimitives, OpReceipt, OpTransactionSigned};
use reth_optimism_txpool::OpPooledTx;
-use reth_payload_builder::PayloadBuilderService;
use reth_payload_builder_primitives::PayloadBuilderError;
use reth_payload_primitives::PayloadBuilderAttributes;
use reth_payload_util::{BestPayloadTransactions, PayloadTransactions};
diff --git a/crates/op-rbuilder/src/payload_builder_vanilla.rs b/crates/op-rbuilder/src/payload_builder_vanilla.rs
index c95bc0595..c5797f13c 100644
--- a/crates/op-rbuilder/src/payload_builder_vanilla.rs
+++ b/crates/op-rbuilder/src/payload_builder_vanilla.rs
@@ -1,3 +1,4 @@
+use crate::primitives::reth::PayloadBuilderService;
use crate::{
generator::{BlockCell, BlockPayloadJobGenerator, BuildArguments, PayloadBuilder},
metrics::OpRBuilderMetrics,
@@ -56,7 +57,6 @@ use reth_optimism_primitives::{
OpPrimitives, OpReceipt, OpTransactionSigned, ADDRESS_L2_TO_L1_MESSAGE_PASSER,
};
use reth_optimism_txpool::OpPooledTx;
-use reth_payload_builder::PayloadBuilderService;
use reth_payload_builder_primitives::PayloadBuilderError;
use reth_payload_primitives::PayloadBuilderAttributes;
use reth_payload_util::{BestPayloadTransactions, PayloadTransactions};
diff --git a/crates/op-rbuilder/src/primitives/reth/mod.rs b/crates/op-rbuilder/src/primitives/reth/mod.rs
index 4b6de4c5b..f21973d16 100644
--- a/crates/op-rbuilder/src/primitives/reth/mod.rs
+++ b/crates/op-rbuilder/src/primitives/reth/mod.rs
@@ -1,2 +1,6 @@
mod execution;
+mod payload_builder_service;
+
pub use execution::{ExecutedPayload, ExecutionInfo};
+
+pub use payload_builder_service::PayloadBuilderService;
diff --git a/crates/op-rbuilder/src/primitives/reth/payload_builder_service.rs b/crates/op-rbuilder/src/primitives/reth/payload_builder_service.rs
new file mode 100644
index 000000000..e5b8014fb
--- /dev/null
+++ b/crates/op-rbuilder/src/primitives/reth/payload_builder_service.rs
@@ -0,0 +1,358 @@
+//! Source <> and <>
+// The MIT License (MIT)
+//
+// Copyright (c) 2022-2025 Reth Contributors
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+use alloy_consensus::BlockHeader;
+use alloy_rpc_types_engine::PayloadId;
+use futures_util::{future::FutureExt, Stream, StreamExt};
+use reth_chain_state::CanonStateNotification;
+use reth_metrics::{
+ metrics::{Counter, Gauge},
+ Metrics,
+};
+use reth_payload_builder::{
+ KeepPayloadJobAlive, PayloadBuilderHandle, PayloadJob, PayloadJobGenerator,
+ PayloadServiceCommand,
+};
+use reth_payload_builder_primitives::Events;
+use reth_payload_primitives::{
+ BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError, PayloadKind, PayloadTypes,
+};
+use reth_primitives_traits::NodePrimitives;
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tokio::sync::{broadcast, mpsc};
+use tokio_stream::wrappers::UnboundedReceiverStream;
+use tracing::{debug, info, trace, warn};
+
+type PayloadFuture
= Pin> + Send + Sync>>;
+
+/// A service that manages payload building tasks.
+///
+/// This type is an endless future that manages the building of payloads.
+///
+/// It tracks active payloads and their build jobs that run in a worker pool.
+///
+/// By design, this type relies entirely on the [`PayloadJobGenerator`] to create new payloads and
+/// does know nothing about how to build them, it just drives their jobs to completion.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct PayloadBuilderService
+where
+ T: PayloadTypes,
+ Gen: PayloadJobGenerator,
+ Gen::Job: PayloadJob,
+{
+ /// The type that knows how to create new payloads.
+ generator: Gen,
+ /// All active payload jobs.
+ payload_jobs: Vec<(Gen::Job, PayloadId)>,
+ /// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand.
+ service_tx: mpsc::UnboundedSender>,
+ /// Receiver half of the command channel.
+ command_rx: UnboundedReceiverStream>,
+ /// Metrics for the payload builder service
+ metrics: PayloadBuilderServiceMetrics,
+ /// Chain events notification stream
+ chain_events: St,
+ /// Payload events handler, used to broadcast and subscribe to payload events.
+ payload_events: broadcast::Sender>,
+}
+
+const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
+
+// === impl PayloadBuilderService ===
+
+impl PayloadBuilderService
+where
+ T: PayloadTypes,
+ Gen: PayloadJobGenerator,
+ Gen::Job: PayloadJob,
+ ::BuiltPayload: Into,
+{
+ /// Creates a new payload builder service and returns the [`PayloadBuilderHandle`] to interact
+ /// with it.
+ ///
+ /// This also takes a stream of chain events that will be forwarded to the generator to apply
+ /// additional logic when new state is committed. See also
+ /// [`PayloadJobGenerator::on_new_state`].
+ pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle) {
+ let (service_tx, command_rx) = mpsc::unbounded_channel();
+ let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
+
+ let service = Self {
+ generator,
+ payload_jobs: Vec::new(),
+ service_tx,
+ command_rx: UnboundedReceiverStream::new(command_rx),
+ metrics: Default::default(),
+ chain_events,
+ payload_events,
+ };
+
+ let handle = service.handle();
+ (service, handle)
+ }
+
+ /// Returns a handle to the service.
+ pub fn handle(&self) -> PayloadBuilderHandle {
+ PayloadBuilderHandle::new(self.service_tx.clone())
+ }
+
+ /// Returns true if the given payload is currently being built.
+ fn contains_payload(&self, id: PayloadId) -> bool {
+ self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
+ }
+
+ /// Returns the best payload for the given identifier that has been built so far.
+ fn best_payload(&self, id: PayloadId) -> Option> {
+ let res = self
+ .payload_jobs
+ .iter()
+ .find(|(_, job_id)| *job_id == id)
+ .map(|(j, _)| j.best_payload().map(|p| p.into()));
+ if let Some(Ok(ref best)) = res {
+ self.metrics
+ .set_best_revenue(best.block().number(), f64::from(best.fees()));
+ }
+
+ res
+ }
+
+ /// Returns the best payload for the given identifier that has been built so far and terminates
+ /// the job if requested.
+ fn resolve(
+ &mut self,
+ id: PayloadId,
+ kind: PayloadKind,
+ ) -> Option> {
+ trace!(%id, "resolving payload job");
+
+ let job = self
+ .payload_jobs
+ .iter()
+ .position(|(_, job_id)| *job_id == id)?;
+ let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
+
+ if keep_alive == KeepPayloadJobAlive::No {
+ let (_, id) = self.payload_jobs.swap_remove(job);
+ trace!(%id, "terminated resolved job");
+ }
+
+ // Since the fees will not be known until the payload future is resolved / awaited, we wrap
+ // the future in a new future that will update the metrics.
+ let resolved_metrics = self.metrics.clone();
+ let payload_events = self.payload_events.clone();
+
+ let fut = async move {
+ let res = fut.await;
+ if let Ok(ref payload) = res {
+ payload_events
+ .send(Events::BuiltPayload(payload.clone().into()))
+ .ok();
+
+ resolved_metrics
+ .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
+ }
+ res.map(|p| p.into())
+ };
+
+ Some(Box::pin(fut))
+ }
+}
+
+impl PayloadBuilderService
+where
+ T: PayloadTypes,
+ Gen: PayloadJobGenerator,
+ Gen::Job: PayloadJob,
+ ::BuiltPayload: Into,
+{
+ /// Returns the payload attributes for the given payload.
+ fn payload_attributes(
+ &self,
+ id: PayloadId,
+ ) -> Option::PayloadAttributes, PayloadBuilderError>> {
+ let attributes = self
+ .payload_jobs
+ .iter()
+ .find(|(_, job_id)| *job_id == id)
+ .map(|(j, _)| j.payload_attributes());
+
+ if attributes.is_none() {
+ trace!(%id, "no matching payload job found to get attributes for");
+ }
+
+ attributes
+ }
+}
+
+impl Future for PayloadBuilderService
+where
+ T: PayloadTypes,
+ N: NodePrimitives,
+ Gen: PayloadJobGenerator + Unpin + 'static,
+ ::Job: Unpin + 'static,
+ St: Stream- > + Send + Unpin + 'static,
+ Gen::Job: PayloadJob,
+ ::BuiltPayload: Into,
+{
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
+ let this = self.get_mut();
+ loop {
+ // notify the generator of new chain events
+ while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
+ this.generator.on_new_state(new_head);
+ }
+
+ // we poll all jobs first, so we always have the latest payload that we can report if
+ // requests
+ // we don't care about the order of the jobs, so we can just swap_remove them
+ for idx in (0..this.payload_jobs.len()).rev() {
+ let (mut job, id) = this.payload_jobs.swap_remove(idx);
+
+ // drain better payloads from the job
+ match job.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => {
+ this.metrics.set_active_jobs(this.payload_jobs.len());
+ trace!(%id, "payload job finished");
+ }
+ Poll::Ready(Err(err)) => {
+ warn!(%err, ?id, "Payload builder job failed; resolving payload");
+ this.metrics.inc_failed_jobs();
+ this.metrics.set_active_jobs(this.payload_jobs.len());
+ }
+ Poll::Pending => {
+ // still pending, put it back
+ this.payload_jobs.push((job, id));
+ }
+ }
+ }
+
+ // marker for exit condition
+ let mut new_job = false;
+
+ // drain all requests
+ while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
+ match cmd {
+ PayloadServiceCommand::BuildNewPayload(attr, tx) => {
+ let id = attr.payload_id();
+ let mut res = Ok(id);
+
+ if this.contains_payload(id) {
+ debug!(%id, parent = %attr.parent(), "Payload job already in progress, ignoring.");
+ } else {
+ // no job for this payload yet, create one
+ let parent = attr.parent();
+ match this.generator.new_payload_job(attr.clone()) {
+ Ok(job) => {
+ info!(%id, %parent, "New payload job created");
+ this.metrics.inc_initiated_jobs();
+ new_job = true;
+ this.payload_jobs.push((job, id));
+ this.payload_events
+ .send(Events::Attributes(attr.clone()))
+ .ok();
+ }
+ Err(err) => {
+ this.metrics.inc_failed_jobs();
+ warn!(%err, %id, "Failed to create payload builder job");
+ res = Err(err);
+ }
+ }
+ }
+
+ // return the id of the payload
+ let _ = tx.send(res);
+ }
+ PayloadServiceCommand::BestPayload(id, tx) => {
+ let _ = tx.send(this.best_payload(id));
+ }
+ PayloadServiceCommand::PayloadAttributes(id, tx) => {
+ let attributes = this.payload_attributes(id);
+ let _ = tx.send(attributes);
+ }
+ PayloadServiceCommand::Resolve(id, strategy, tx) => {
+ let _ = tx.send(this.resolve(id, strategy));
+ }
+ PayloadServiceCommand::Subscribe(tx) => {
+ let new_rx = this.payload_events.subscribe();
+ let _ = tx.send(new_rx);
+ }
+ }
+ }
+
+ if !new_job {
+ return Poll::Pending;
+ }
+ }
+ }
+}
+
+/// Payload builder service metrics
+#[derive(Metrics, Clone)]
+#[metrics(scope = "payloads")]
+pub(crate) struct PayloadBuilderServiceMetrics {
+ /// Number of active jobs
+ pub(crate) active_jobs: Gauge,
+ /// Total number of initiated jobs
+ pub(crate) initiated_jobs: Counter,
+ /// Total number of failed jobs
+ pub(crate) failed_jobs: Counter,
+ /// Coinbase revenue for best payloads
+ pub(crate) best_revenue: Gauge,
+ /// Current block returned as the best payload
+ pub(crate) best_block: Gauge,
+ /// Coinbase revenue for resolved payloads
+ pub(crate) resolved_revenue: Gauge,
+ /// Current block returned as the resolved payload
+ pub(crate) resolved_block: Gauge,
+}
+
+impl PayloadBuilderServiceMetrics {
+ pub(crate) fn inc_initiated_jobs(&self) {
+ self.initiated_jobs.increment(1);
+ }
+
+ pub(crate) fn inc_failed_jobs(&self) {
+ self.failed_jobs.increment(1);
+ }
+
+ pub(crate) fn set_active_jobs(&self, value: usize) {
+ self.active_jobs.set(value as f64)
+ }
+
+ pub(crate) fn set_best_revenue(&self, block: u64, value: f64) {
+ self.best_block.set(block as f64);
+ self.best_revenue.set(value)
+ }
+
+ pub(crate) fn set_resolved_revenue(&self, block: u64, value: f64) {
+ self.resolved_block.set(block as f64);
+ self.resolved_revenue.set(value)
+ }
+}