diff --git a/.gitignore b/.gitignore index efe3eb1..acf25a3 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,6 @@ Cargo.lock # Added by cargo /target + +# VSCode debug launcher +.vscode/launch.json \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index e4e8899..df68f0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,11 +22,24 @@ name = "transaction-submitter" path = "bin/submit_transaction.rs" [dependencies] -init4-bin-base = "0.1.0" +init4-bin-base = "0.3" -zenith-types = "0.13" +signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } +signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } +signet-bundle = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } +signet-sim = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } -alloy = { version = "0.7.3", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp"] } +trevm = { version = "0.20.10", features = ["concurrent-db", "test-utils"] } + +alloy = { version = "0.12.6", features = [ + "full", + "json-rpc", + "signer-aws", + "rpc-types-mev", + "rlp", + "node-bindings", + "serde", +] } aws-config = "1.1.7" aws-sdk-kms = "1.15.0" @@ -48,3 +61,5 @@ tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] } async-trait = "0.1.80" oauth2 = "4.4.2" +tracing-subscriber = "0.3.19" +chrono = "0.4.41" diff --git a/bin/builder.rs b/bin/builder.rs index 193a3b9..8a7cc0e 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -1,34 +1,39 @@ -#![allow(dead_code)] - -use builder::config::BuilderConfig; -use builder::service::serve_builder_with_span; -use builder::tasks::block::BlockBuilder; -use builder::tasks::metrics::MetricsTask; -use builder::tasks::oauth::Authenticator; -use builder::tasks::submit::SubmitTask; - +use builder::{ + config::BuilderConfig, + service::serve_builder_with_span, + tasks::{ + block::Simulator, bundler, metrics::MetricsTask, oauth::Authenticator, submit::SubmitTask, + tx_poller, + }, +}; +use signet_sim::SimCache; +use signet_types::SlotCalculator; +use std::sync::Arc; use tokio::select; -#[tokio::main] +// Note: Must be set to `multi_thread` to support async tasks. +// See: https://docs.rs/tokio/latest/tokio/attr.main.html +#[tokio::main(flavor = "multi_thread")] async fn main() -> eyre::Result<()> { let _guard = init4_bin_base::init4(); let span = tracing::info_span!("zenith-builder"); let config = BuilderConfig::load_from_env()?.clone(); - let host_provider = config.connect_host_provider().await?; - let ru_provider = config.connect_ru_provider().await?; + let constants = config.load_pecorino_constants(); let authenticator = Authenticator::new(&config); - tracing::debug!(rpc_url = config.host_rpc_url.as_ref(), "instantiated provider"); + let (host_provider, ru_provider, sequencer_signer) = tokio::try_join!( + config.connect_host_provider(), + config.connect_ru_provider(), + config.connect_sequencer_signer(), + )?; - let sequencer_signer = config.connect_sequencer_signer().await?; let zenith = config.connect_zenith(host_provider.clone()); let metrics = MetricsTask { host_provider: host_provider.clone() }; let (tx_channel, metrics_jh) = metrics.spawn(); - let builder = BlockBuilder::new(&config, authenticator.clone(), ru_provider.clone()); let submit = SubmitTask { authenticator: authenticator.clone(), host_provider, @@ -39,14 +44,43 @@ async fn main() -> eyre::Result<()> { outbound_tx_channel: tx_channel, }; + let tx_poller = tx_poller::TxPoller::new(&config); + let (tx_receiver, tx_poller_jh) = tx_poller.spawn(); + + let bundle_poller = bundler::BundlePoller::new(&config, authenticator.clone()); + let (bundle_receiver, bundle_poller_jh) = bundle_poller.spawn(); + let authenticator_jh = authenticator.spawn(); + let (submit_channel, submit_jh) = submit.spawn(); - let build_jh = builder.spawn(submit_channel); + + let sim_items = SimCache::new(); + let slot_calculator = + SlotCalculator::new(config.start_timestamp, config.chain_offset, config.target_slot_time); + + let sim = Arc::new(Simulator::new(&config, ru_provider.clone(), slot_calculator)); + + let (basefee_jh, sim_cache_jh) = + sim.clone().spawn_cache_tasks(tx_receiver, bundle_receiver, sim_items.clone()); + + let build_jh = sim.clone().spawn_simulator_task(constants, sim_items.clone(), submit_channel); let port = config.builder_port; let server = serve_builder_with_span(([0, 0, 0, 0], port), span); select! { + _ = tx_poller_jh => { + tracing::info!("tx_poller finished"); + }, + _ = bundle_poller_jh => { + tracing::info!("bundle_poller finished"); + }, + _ = sim_cache_jh => { + tracing::info!("sim cache task finished"); + } + _ = basefee_jh => { + tracing::info!("basefee task finished"); + } _ = submit_jh => { tracing::info!("submit finished"); }, diff --git a/bin/submit_transaction.rs b/bin/submit_transaction.rs index 3955a49..6aec853 100644 --- a/bin/submit_transaction.rs +++ b/bin/submit_transaction.rs @@ -6,7 +6,7 @@ use alloy::{ signers::aws::AwsSigner, }; use aws_config::BehaviorVersion; -use builder::config::{Provider, load_address, load_string, load_u64, load_url}; +use builder::config::{HostProvider, load_address, load_string, load_u64, load_url}; use init4_bin_base::{ deps::metrics::{counter, histogram}, init4, @@ -30,7 +30,7 @@ async fn main() { } } -async fn send_transaction(provider: Provider, recipient_address: Address) { +async fn send_transaction(provider: HostProvider, recipient_address: Address) { // construct simple transaction to send ETH to a recipient let tx = TransactionRequest::default() .with_from(provider.default_signer_address()) @@ -67,7 +67,7 @@ async fn send_transaction(provider: Provider, recipient_address: Address) { histogram!("txn_submitter.tx_mine_time").record(mine_time as f64); } -async fn connect_from_config() -> (Provider, Address, u64) { +async fn connect_from_config() -> (HostProvider, Address, u64) { // load signer config values from .env let rpc_url = load_url("RPC_URL").unwrap(); let chain_id = load_u64("CHAIN_ID").unwrap(); @@ -82,9 +82,8 @@ async fn connect_from_config() -> (Provider, Address, u64) { let signer = AwsSigner::new(client, kms_key_id.to_string(), Some(chain_id)).await.unwrap(); let provider = ProviderBuilder::new() - .with_recommended_fillers() .wallet(EthereumWallet::from(signer)) - .on_builtin(&rpc_url) + .connect(&rpc_url) .await .unwrap(); diff --git a/src/config.rs b/src/config.rs index 1e8a6fa..b442212 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,7 @@ -use crate::signer::{LocalOrAws, SignerError}; +use crate::{ + constants, + signer::{LocalOrAws, SignerError}, +}; use alloy::{ network::{Ethereum, EthereumWallet}, primitives::Address, @@ -9,10 +12,12 @@ use alloy::{ WalletFiller, }, }, - transports::BoxTransport, }; +use eyre::Result; +use oauth2::url; +use signet_types::config::{HostConfig, PredeployTokens, RollupConfig, SignetSystemConstants}; +use signet_zenith::Zenith; use std::{borrow::Cow, env, num, str::FromStr}; -use zenith_types::Zenith; // Keys for .env variables that need to be set to configure the builder. const HOST_CHAIN_ID: &str = "HOST_CHAIN_ID"; @@ -38,6 +43,8 @@ const OAUTH_CLIENT_ID: &str = "OAUTH_CLIENT_ID"; const OAUTH_CLIENT_SECRET: &str = "OAUTH_CLIENT_SECRET"; const OAUTH_AUTHENTICATE_URL: &str = "OAUTH_AUTHENTICATE_URL"; const OAUTH_TOKEN_URL: &str = "OAUTH_TOKEN_URL"; +const CONCURRENCY_LIMIT: &str = "CONCURRENCY_LIMIT"; +const START_TIMESTAMP: &str = "START_TIMESTAMP"; /// Configuration for a builder running a specific rollup on a specific host /// chain. @@ -93,6 +100,10 @@ pub struct BuilderConfig { pub oauth_token_url: String, /// The oauth token refresh interval in seconds. pub oauth_token_refresh_interval: u64, + /// The max number of simultaneous block simulations to run. + pub concurrency_limit: usize, + /// The anchor for slot time and number calculations before adjusting for chain offset. + pub start_timestamp: u64, } /// Error loading the configuration. @@ -116,6 +127,9 @@ pub enum ConfigError { /// Error connecting to the signer #[error("failed to connect to signer: {0}")] Signer(#[from] SignerError), + /// I/O error + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), } impl ConfigError { @@ -125,8 +139,8 @@ impl ConfigError { } } -/// Provider type used to read & write. -pub type Provider = FillProvider< +/// Type alias for the provider used to build and submit blocks to the host. +pub type HostProvider = FillProvider< JoinFill< JoinFill< Identity, @@ -134,26 +148,15 @@ pub type Provider = FillProvider< >, WalletFiller, >, - RootProvider, - BoxTransport, + RootProvider, Ethereum, >; -/// Provider type used to read-only. -pub type WalletlessProvider = FillProvider< - JoinFill< - Identity, - JoinFill>>, - >, - RootProvider, - BoxTransport, - Ethereum, ->; +/// Type alias for the provider used to simulate against rollup state. +pub type RuProvider = RootProvider; -/// A Zenith contract instance, using some provider `P` (defaults to -/// [`Provider`]). -pub type ZenithInstance

= - Zenith::ZenithInstance; +/// A [`Zenith`] contract instance using [`Provider`] as the provider. +pub type ZenithInstance

= Zenith::ZenithInstance<(), P, alloy::network::Ethereum>; impl BuilderConfig { /// Load the builder configuration from environment variables. @@ -189,6 +192,8 @@ impl BuilderConfig { oauth_authenticate_url: load_string(OAUTH_AUTHENTICATE_URL)?, oauth_token_url: load_string(OAUTH_TOKEN_URL)?, oauth_token_refresh_interval: load_u64(AUTH_TOKEN_REFRESH_INTERVAL)?, + concurrency_limit: load_concurrency_limit()?, + start_timestamp: load_u64(START_TIMESTAMP)?, }) } @@ -209,42 +214,67 @@ impl BuilderConfig { } /// Connect to the Rollup rpc provider. - pub async fn connect_ru_provider(&self) -> Result { - ProviderBuilder::new() - .with_recommended_fillers() - .on_builtin(&self.ru_rpc_url) - .await - .map_err(Into::into) + pub async fn connect_ru_provider(&self) -> Result, ConfigError> { + let url = url::Url::parse(&self.ru_rpc_url).expect("failed to parse URL"); + let provider = RootProvider::::new_http(url); + Ok(provider) } /// Connect to the Host rpc provider. - pub async fn connect_host_provider(&self) -> Result { + pub async fn connect_host_provider(&self) -> Result { let builder_signer = self.connect_builder_signer().await?; - ProviderBuilder::new() - .with_recommended_fillers() + let provider = ProviderBuilder::new() .wallet(EthereumWallet::from(builder_signer)) - .on_builtin(&self.host_rpc_url) + .connect(&self.host_rpc_url) .await - .map_err(Into::into) + .map_err(ConfigError::Provider)?; + + Ok(provider) } /// Connect additional broadcast providers. - pub async fn connect_additional_broadcast( - &self, - ) -> Result>, ConfigError> { - let mut providers = Vec::with_capacity(self.tx_broadcast_urls.len()); - for url in self.tx_broadcast_urls.iter() { - let provider = - ProviderBuilder::new().on_builtin(url).await.map_err(Into::::into)?; + pub async fn connect_additional_broadcast(&self) -> Result, ConfigError> { + let mut providers: Vec = Vec::with_capacity(self.tx_broadcast_urls.len()); + + for url_str in self.tx_broadcast_urls.iter() { + let url = url::Url::parse(url_str).expect("failed to parse URL"); + let provider = RootProvider::new_http(url); providers.push(provider); } + Ok(providers) } /// Connect to the Zenith instance, using the specified provider. - pub const fn connect_zenith(&self, provider: Provider) -> ZenithInstance { + pub const fn connect_zenith(&self, provider: HostProvider) -> ZenithInstance { Zenith::new(self.zenith_address, provider) } + + /// Loads the Signet system constants for Pecorino. + pub const fn load_pecorino_constants(&self) -> SignetSystemConstants { + let host = HostConfig::new( + self.host_chain_id, + constants::PECORINO_DEPLOY_HEIGHT, + self.zenith_address, + constants::HOST_ORDERS, + constants::HOST_PASSAGE, + constants::HOST_TRANSACTOR, + PredeployTokens::new(constants::HOST_USDC, constants::HOST_USDT, constants::HOST_WBTC), + ); + let rollup = RollupConfig::new( + self.ru_chain_id, + constants::ROLLUP_ORDERS, + constants::ROLLUP_PASSAGE, + constants::BASE_FEE_RECIPIENT, + PredeployTokens::new( + constants::ROLLUP_USDC, + constants::ROLLUP_USDT, + constants::ROLLUP_WBTC, + ), + ); + + SignetSystemConstants::new(host, rollup) + } } /// Load a string from an environment variable. @@ -278,5 +308,18 @@ pub fn load_url(key: &str) -> Result, ConfigError> { /// Load an address from an environment variable. pub fn load_address(key: &str) -> Result { let address = load_string(key)?; - Address::from_str(&address).map_err(Into::into) + Address::from_str(&address) + .map_err(|_| ConfigError::Var(format!("Invalid address format for {}", key))) +} + +/// Checks the configured concurrency parameter and, if none is set, checks the available +/// system concurrency with `std::thread::available_parallelism` and returns that. +pub fn load_concurrency_limit() -> Result { + match load_u16(CONCURRENCY_LIMIT) { + Ok(env) => Ok(env as usize), + Err(_) => { + let limit = std::thread::available_parallelism()?.get(); + Ok(limit) + } + } } diff --git a/src/constants.rs b/src/constants.rs new file mode 100644 index 0000000..9a55d30 --- /dev/null +++ b/src/constants.rs @@ -0,0 +1,36 @@ +//! Constants used in the builder. + +use alloy::primitives::{Address, address}; + +/// The default basefee to use for simulation if RPC fails. +pub const BASEFEE_DEFAULT: u64 = 7; +/// Pecorino Host Chain ID used for the Pecorino network. +pub const PECORINO_HOST_CHAIN_ID: u64 = 3151908; +/// Pecorino Chain ID used for the Pecorino network. +pub const PECORINO_CHAIN_ID: u64 = 14174; +/// Block number at which the Pecorino rollup contract is deployed. +pub const PECORINO_DEPLOY_HEIGHT: u64 = 149984; +/// Address of the orders contract on the host. +pub const HOST_ORDERS: Address = address!("0x4E8cC181805aFC307C83298242271142b8e2f249"); +/// Address of the passage contract on the host. +pub const HOST_PASSAGE: Address = address!("0xd553C4CA4792Af71F4B61231409eaB321c1Dd2Ce"); +/// Address of the transactor contract on the host. +pub const HOST_TRANSACTOR: Address = address!("0x1af3A16857C28917Ab2C4c78Be099fF251669200"); +/// Address of the USDC token contract on the host. +pub const HOST_USDC: Address = address!("0x885F8DB528dC8a38aA3DDad9D3F619746B4a6A81"); +/// Address of the USDT token contract on the host. +pub const HOST_USDT: Address = address!("0x7970D259D4a96764Fa9B23FF0715A35f06f52D1A"); +/// Address of the WBTC token contract on the host. +pub const HOST_WBTC: Address = address!("0x7970D259D4a96764Fa9B23FF0715A35f06f52D1A"); +/// Address of the orders contract on the rollup. +pub const ROLLUP_ORDERS: Address = address!("0x4E8cC181805aFC307C83298242271142b8e2f249"); +/// Address of the passage contract on the rollup. +pub const ROLLUP_PASSAGE: Address = address!("0xd553C4CA4792Af71F4B61231409eaB321c1Dd2Ce"); +/// Base fee recipient address. +pub const BASE_FEE_RECIPIENT: Address = address!("0xe0eDA3701D44511ce419344A4CeD30B52c9Ba231"); +/// Address of the USDC token contract on the rollup. +pub const ROLLUP_USDC: Address = address!("0x0B8BC5e60EE10957E0d1A0d95598fA63E65605e2"); +/// Address of the USDT token contract on the rollup. +pub const ROLLUP_USDT: Address = address!("0xF34326d3521F1b07d1aa63729cB14A372f8A737C"); +/// Address of the WBTC token contract on the rollup. +pub const ROLLUP_WBTC: Address = address!("0xE3d7066115f7d6b65F88Dff86288dB4756a7D733"); diff --git a/src/lib.rs b/src/lib.rs index 5d4e743..1f64552 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,9 @@ #![deny(unused_must_use, rust_2018_idioms)] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +/// Constants for the Builder. +pub mod constants; + /// Configuration for the Builder binary. pub mod config; @@ -27,4 +30,9 @@ pub mod tasks; /// Utilities. pub mod utils; +/// Test utilitites +pub mod test_utils; + +// Anonymous import suppresses warnings about unused imports. use openssl as _; +use tracing_subscriber as _; diff --git a/src/tasks/block.rs b/src/tasks/block.rs index 6ba1e8e..345dd68 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -1,329 +1,515 @@ -use super::bundler::{Bundle, BundlePoller}; -use super::oauth::Authenticator; -use super::tx_poller::TxPoller; -use crate::config::{BuilderConfig, WalletlessProvider}; +//! `block.rs` contains the Simulator and everything that wires it into an +//! actor that handles the simulation of a stream of bundles and transactions +//! and turns them into valid Pecorino blocks for network submission. +use crate::{ + config::{BuilderConfig, RuProvider}, + constants::{BASEFEE_DEFAULT, PECORINO_CHAIN_ID}, + tasks::bundler::Bundle, +}; use alloy::{ - consensus::{SidecarBuilder, SidecarCoder, TxEnvelope}, - eips::eip2718::Decodable2718, - primitives::{B256, Bytes, keccak256}, - providers::Provider as _, - rlp::Buf, + consensus::TxEnvelope, + eips::{BlockId, BlockNumberOrTag::Latest}, + network::Ethereum, + primitives::{Address, B256, FixedBytes, U256}, + providers::Provider, +}; +use chrono::{DateTime, Utc}; +use eyre::Report; +use signet_sim::{BlockBuild, BuiltBlock, SimCache}; +use signet_types::{SlotCalculator, config::SignetSystemConstants}; +use std::{ + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; +use thiserror::Error; +use tokio::{ + select, + sync::mpsc::{self}, + task::JoinHandle, + time::sleep, +}; +use trevm::{ + Block, + revm::{ + context::{BlockEnv, CfgEnv}, + context_interface::block::BlobExcessGasAndPrice, + database::{AlloyDB, WrapDatabaseAsync}, + inspector::NoOpInspector, + primitives::hardfork::SpecId::{self}, + }, }; -use std::time::{SystemTime, UNIX_EPOCH}; -use std::{sync::OnceLock, time::Duration}; -use tokio::{sync::mpsc, task::JoinHandle}; -use tracing::{Instrument, debug, error, info, trace}; -use zenith_types::{Alloy2718Coder, ZenithEthBundle, encode_txns}; - -/// Ethereum's slot time in seconds. -pub const ETHEREUM_SLOT_TIME: u64 = 12; - -#[derive(Debug, Default, Clone)] -/// A block in progress. -pub struct InProgressBlock { - transactions: Vec, - raw_encoding: OnceLock, - hash: OnceLock, -} - -impl InProgressBlock { - /// Create a new `InProgressBlock` - pub const fn new() -> Self { - Self { transactions: Vec::new(), raw_encoding: OnceLock::new(), hash: OnceLock::new() } - } - /// Get the number of transactions in the block. - pub fn len(&self) -> usize { - self.transactions.len() - } +/// Different error types that the Simulator handles +#[derive(Debug, Error)] +pub enum SimulatorError { + /// Wraps errors encountered when interacting with the RPC + #[error("RPC error: {0}")] + Rpc(#[source] Report), +} - /// Check if the block is empty. - pub fn is_empty(&self) -> bool { - self.transactions.is_empty() - } +/// `Simulator` is responsible for periodically building blocks and submitting them for +/// signing and inclusion in the blockchain. It wraps a rollup provider and a slot +/// calculator with a builder configuration. +#[derive(Debug)] +pub struct Simulator { + /// Configuration for the builder. + pub config: BuilderConfig, + /// A provider that cannot sign transactions, used for interacting with the rollup. + pub ru_provider: RuProvider, + /// The slot calculator for determining when to wake up and build blocks. + pub slot_calculator: SlotCalculator, +} - /// Unseal the block - fn unseal(&mut self) { - self.raw_encoding.take(); - self.hash.take(); +type AlloyDatabaseProvider = WrapDatabaseAsync>; + +impl Simulator { + /// Creates a new `Simulator` instance. + /// + /// # Arguments + /// + /// - `config`: The configuration for the builder. + /// - `ru_provider`: A provider for interacting with the rollup. + /// - `slot_calculator`: A slot calculator for managing block timing. + /// + /// # Returns + /// + /// A new `Simulator` instance. + pub fn new( + config: &BuilderConfig, + ru_provider: RuProvider, + slot_calculator: SlotCalculator, + ) -> Self { + Self { config: config.clone(), ru_provider, slot_calculator } } - /// Seal the block by encoding the transactions and calculating the contentshash. - fn seal(&self) { - self.raw_encoding.get_or_init(|| encode_txns::(&self.transactions).into()); - self.hash.get_or_init(|| keccak256(self.raw_encoding.get().unwrap().as_ref())); + /// Handles building a single block. + /// + /// # Arguments + /// + /// - `constants`: The system constants for the rollup. + /// - `sim_items`: The simulation cache containing transactions and bundles. + /// - `finish_by`: The deadline by which the block must be built. + /// + /// # Returns + /// + /// A `Result` containing the built block or an error. + pub async fn handle_build( + &self, + constants: SignetSystemConstants, + sim_items: SimCache, + finish_by: Instant, + block: PecorinoBlockEnv, + ) -> Result { + let db = self.create_db().await.unwrap(); + + let block_build: BlockBuild<_, NoOpInspector> = BlockBuild::new( + db, + constants, + PecorinoCfg {}, + block, + finish_by, + self.config.concurrency_limit, + sim_items, + self.config.rollup_block_gas_limit, + ); + + let block = block_build.build().await; + tracing::debug!(block = ?block, "finished block simulation"); + + Ok(block) } - /// Ingest a transaction into the in-progress block. Fails - pub fn ingest_tx(&mut self, tx: &TxEnvelope) { - trace!(hash = %tx.tx_hash(), "ingesting tx"); - self.unseal(); - self.transactions.push(tx.clone()); + /// Spawns two tasks: one to handle incoming transactions and bundles, + /// adding them to the simulation cache, and one to track the latest basefee. + /// + /// # Arguments + /// + /// - `tx_receiver`: A channel receiver for incoming transactions. + /// - `bundle_receiver`: A channel receiver for incoming bundles. + /// - `cache`: The simulation cache to store the received items. + /// + /// # Returns + /// + /// A `JoinHandle` for the basefee updater and a `JoinHandle` for the + /// cache handler. + pub fn spawn_cache_tasks( + self: Arc, + tx_receiver: mpsc::UnboundedReceiver, + bundle_receiver: mpsc::UnboundedReceiver, + cache: SimCache, + ) -> (JoinHandle<()>, JoinHandle<()>) { + tracing::debug!("starting up cache handler"); + + let basefee_price = Arc::new(AtomicU64::new(0_u64)); + let basefee_reader = Arc::clone(&basefee_price); + + // Update the basefee on a per-block cadence + let basefee_jh = tokio::spawn(async move { self.basefee_updater(basefee_price).await }); + + // Update the sim cache whenever a transaction or bundle is received with respect to the basefee + let cache_jh = tokio::spawn(async move { + cache_updater(tx_receiver, bundle_receiver, cache, basefee_reader).await + }); + + (basefee_jh, cache_jh) } - /// Remove a transaction from the in-progress block. - pub fn remove_tx(&mut self, tx: &TxEnvelope) { - trace!(hash = %tx.tx_hash(), "removing tx"); - self.unseal(); - self.transactions.retain(|t| t.tx_hash() != tx.tx_hash()); + /// Periodically updates the shared basefee by querying the latest block. + /// + /// This function calculates the remaining time until the next slot, + /// sleeps until that time, and then retrieves the latest basefee from the rollup provider. + /// The updated basefee is stored in the provided `AtomicU64`. + /// + /// This function runs continuously. + /// + /// # Arguments + /// + /// - `price`: A shared `Arc` used to store the updated basefee value. + async fn basefee_updater(self: Arc, price: Arc) { + tracing::debug!("starting basefee updater"); + loop { + // calculate start of next slot plus a small buffer + let time_remaining = self.slot_calculator.slot_duration() + - self.slot_calculator.current_timepoint_within_slot() + + 1; + tracing::debug!(time_remaining = ?time_remaining, "basefee updater sleeping until next slot"); + + // wait until that point in time + sleep(Duration::from_secs(time_remaining)).await; + + // update the basefee with that price + self.check_basefee(&price).await; + } } - /// Ingest a bundle into the in-progress block. - /// Ignores Signed Orders for now. - pub fn ingest_bundle(&mut self, bundle: Bundle) { - trace!(bundle = %bundle.id, "ingesting bundle"); - - let txs = bundle - .bundle - .bundle - .txs - .into_iter() - .map(|tx| TxEnvelope::decode_2718(&mut tx.chunk())) - .collect::, _>>(); - - if let Ok(txs) = txs { - self.unseal(); - // extend the transactions with the decoded transactions. - // As this builder does not provide bundles landing "top of block", its fine to just extend. - self.transactions.extend(txs); + /// Queries the latest block from the rollup provider and updates the shared + /// basefee value if a block is found. + /// + /// This function retrieves the latest block using the provider, extracts the + /// `base_fee_per_gas` field from the block header (defaulting to zero if missing), + /// and updates the shared `AtomicU64` price tracker. If no block is available, + /// it logs a message without updating the price. + /// + /// # Arguments + /// + /// - `price`: A shared `Arc` used to store the updated basefee. + async fn check_basefee(&self, price: &Arc) { + let resp = self.ru_provider.get_block_by_number(Latest).await.inspect_err(|e| { + tracing::error!(error = %e, "RPC error during basefee update"); + }); + + if let Ok(Some(block)) = resp { + let basefee = block.header.base_fee_per_gas.unwrap_or(0); + price.store(basefee, Ordering::Relaxed); + tracing::debug!(basefee = basefee, "basefee updated"); } else { - error!("failed to decode bundle. dropping"); + tracing::warn!("get basefee failed - an error likely occurred"); } } - /// Encode the in-progress block - fn encode_raw(&self) -> &Bytes { - self.seal(); - self.raw_encoding.get().unwrap() + /// Spawns the simulator task, which handles the setup and sets the deadline + /// for the each round of simulation. + /// + /// # Arguments + /// + /// - `constants`: The system constants for the rollup. + /// - `cache`: The simulation cache containing transactions and bundles. + /// - `submit_sender`: A channel sender for submitting built blocks. + /// + /// # Returns + /// + /// A `JoinHandle` for the spawned task. + pub fn spawn_simulator_task( + self: Arc, + constants: SignetSystemConstants, + cache: SimCache, + submit_sender: mpsc::UnboundedSender, + ) -> JoinHandle<()> { + tracing::debug!("starting builder task"); + + tokio::spawn(async move { self.run_simulator(constants, cache, submit_sender).await }) } - /// Calculate the hash of the in-progress block, finishing the block. - pub fn contents_hash(&self) -> B256 { - self.seal(); - *self.hash.get().unwrap() - } - - /// Convert the in-progress block to sign request contents. - pub fn encode_calldata(&self) -> &Bytes { - self.encode_raw() - } - - /// Convert the in-progress block to a blob transaction sidecar. - pub fn encode_blob(&self) -> SidecarBuilder { - let mut coder = SidecarBuilder::::default(); - coder.ingest(self.encode_raw()); - coder - } -} - -/// BlockBuilder is a task that periodically builds a block then sends it for -/// signing and submission. -#[derive(Debug)] -pub struct BlockBuilder { - /// Configuration. - pub config: BuilderConfig, - /// A provider that cannot sign transactions. - pub ru_provider: WalletlessProvider, - /// A poller for fetching transactions. - pub tx_poller: TxPoller, - /// A poller for fetching bundles. - pub bundle_poller: BundlePoller, -} - -impl BlockBuilder { - /// Create a new block builder with the given config. - pub fn new( - config: &BuilderConfig, - authenticator: Authenticator, - ru_provider: WalletlessProvider, - ) -> Self { - Self { - config: config.clone(), - ru_provider, - tx_poller: TxPoller::new(config), - bundle_poller: BundlePoller::new(config, authenticator), - } - } - - /// Fetches transactions from the cache and ingests them into the in - /// progress block - async fn get_transactions(&mut self, in_progress: &mut InProgressBlock) { - trace!("query transactions from cache"); - let txns = self.tx_poller.check_tx_cache().await; - match txns { - Ok(txns) => { - trace!("got transactions response"); - for txn in txns.into_iter() { - in_progress.ingest_tx(&txn); + /// Continuously runs the block simulation and submission loop. + /// + /// This function clones the simulation cache, calculates a deadline for block building, + /// attempts to build a block using the latest cache and constants, and submits the built + /// block through the provided channel. If an error occurs during block building or submission, + /// it logs the error and continues the loop. + /// + /// This function runs indefinitely and never returns. + /// + /// # Arguments + /// + /// - `constants`: The system constants for the rollup. + /// - `cache`: The simulation cache containing transactions and bundles. + /// - `submit_sender`: A channel sender used to submit built blocks. + async fn run_simulator( + self: Arc, + constants: SignetSystemConstants, + cache: SimCache, + submit_sender: mpsc::UnboundedSender, + ) { + loop { + let sim_cache = cache.clone(); + let finish_by = self.calculate_deadline(); + + let block_env = match self.next_block_env(finish_by).await { + Ok(block) => block, + Err(err) => { + tracing::error!(err = %err, "failed to configure next block"); + break; } - } - Err(e) => { - error!(error = %e, "error polling transactions"); - } - } - } + }; + tracing::info!(block_env = ?block_env, "created block"); - /// Fetches bundles from the cache and ingests them into the in progress block - async fn get_bundles(&mut self, in_progress: &mut InProgressBlock) { - trace!("query bundles from cache"); - let bundles = self.bundle_poller.check_bundle_cache().await; - match bundles { - Ok(bundles) => { - for bundle in bundles { - match self.simulate_bundle(&bundle.bundle).await { - Ok(()) => in_progress.ingest_bundle(bundle.clone()), - Err(e) => error!(error = %e, id = ?bundle.id, "bundle simulation failed"), - } + match self.handle_build(constants, sim_cache, finish_by, block_env).await { + Ok(block) => { + tracing::debug!(block = ?block, "built block"); + let _ = submit_sender.send(block); + } + Err(e) => { + tracing::error!(err = %e, "failed to build block"); + continue; } - } - Err(e) => { - error!(error = %e, "error polling bundles"); } } - self.bundle_poller.evict(); } - /// Simulates a Zenith bundle against the rollup state - async fn simulate_bundle(&mut self, bundle: &ZenithEthBundle) -> eyre::Result<()> { - // TODO: Simulate bundles with the Simulation Engine - // [ENG-672](https://linear.app/initiates/issue/ENG-672/add-support-for-bundles) - debug!(hash = ?bundle.bundle.bundle_hash(), block_number = ?bundle.block_number(), "bundle simulations is not implemented yet - skipping simulation"); - Ok(()) + /// Calculates the deadline for the current block simulation. + /// + /// # Returns + /// + /// An `Instant` representing the simulation deadline, as calculated by determining + /// the time left in the current slot and adding that to the current timestamp in UNIX seconds. + pub fn calculate_deadline(&self) -> Instant { + // Calculate the current timestamp in seconds since the UNIX epoch + let now = SystemTime::now(); + let unix_seconds = now.duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs(); + // Calculate the time remaining in the current slot + let remaining = self.slot_calculator.calculate_timepoint_within_slot(unix_seconds); + // Deadline is equal to the start of the next slot plus the time remaining in this slot + Instant::now() + Duration::from_secs(remaining) } - async fn filter_transactions(&self, in_progress: &mut InProgressBlock) { - // query the rollup node to see which transaction(s) have been included - let mut confirmed_transactions = Vec::new(); - for transaction in in_progress.transactions.iter() { - let tx = self - .ru_provider - .get_transaction_by_hash(*transaction.tx_hash()) - .await - .expect("failed to get receipt"); - if tx.is_some() { - confirmed_transactions.push(transaction.clone()); + /// Creates an `AlloyDB` instance from the rollup provider. + /// + /// # Returns + /// + /// An `Option` containing the wrapped database or `None` if an error occurs. + async fn create_db(&self) -> Option { + // Fetch latest block number + let latest = match self.ru_provider.get_block_number().await { + Ok(block_number) => block_number, + Err(e) => { + tracing::error!(error = %e, "failed to get latest block number"); + return None; } - } - trace!(confirmed = confirmed_transactions.len(), "found confirmed transactions"); + }; - // remove already-confirmed transactions - for transaction in confirmed_transactions { - in_progress.remove_tx(&transaction); - } + // Make an AlloyDB instance from the rollup provider with that latest block number + let alloy_db: AlloyDB = + AlloyDB::new(self.ru_provider.clone(), BlockId::from(latest)); + + // Wrap the AlloyDB instance in a WrapDatabaseAsync and return it. + // This is safe to unwrap because the main function sets the proper runtime settings. + // + // See: https://docs.rs/tokio/latest/tokio/attr.main.html + let wrapped_db: AlloyDatabaseProvider = WrapDatabaseAsync::new(alloy_db).unwrap(); + Some(wrapped_db) } - // calculate the duration in seconds until the beginning of the next block slot. - fn secs_to_next_slot(&self) -> u64 { - let curr_timestamp: u64 = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); - let current_slot_time = (curr_timestamp - self.config.chain_offset) % ETHEREUM_SLOT_TIME; - (ETHEREUM_SLOT_TIME - current_slot_time) % ETHEREUM_SLOT_TIME + /// Prepares the next block environment. + /// + /// Prepares the next block environment to load into the simulator by fetching the latest block number, + /// assigning the correct next block number, checking the basefee, and setting the timestamp, + /// reward address, and gas configuration for the block environment based on builder configuration. + /// + /// # Arguments + /// + /// - finish_by: The deadline at which block simulation will end. + async fn next_block_env(&self, finish_by: Instant) -> Result { + let remaining = finish_by.duration_since(Instant::now()); + let finish_time = SystemTime::now() + remaining; + let deadline: DateTime = finish_time.into(); + tracing::debug!(deadline = %deadline, "preparing block env"); + + // Fetch the latest block number and increment it by 1 + let latest_block_number = match self.ru_provider.get_block_number().await { + Ok(num) => num, + Err(err) => { + tracing::error!(error = %err, "RPC error during block build"); + return Err(SimulatorError::Rpc(Report::new(err))); + } + }; + tracing::debug!(next_block_num = latest_block_number + 1, "preparing block env"); + + // Fetch the basefee from previous block to calculate gas for this block + let basefee = match self.get_basefee().await? { + Some(basefee) => basefee, + None => { + tracing::warn!("get basefee failed - RPC error likely occurred"); + BASEFEE_DEFAULT + } + }; + tracing::debug!(basefee = basefee, "setting basefee"); + + // Craft the Block environment to pass to the simulator + let block_env = PecorinoBlockEnv::new( + self.config.clone(), + latest_block_number + 1, + deadline.timestamp() as u64, + basefee, + ); + tracing::debug!(block_env = ?block_env, "prepared block env"); + + Ok(block_env) } - // add a buffer to the beginning of the block slot. - fn secs_to_next_target(&self) -> u64 { - self.secs_to_next_slot() + self.config.target_slot_time + /// Returns the basefee of the latest block. + /// + /// # Returns + /// + /// The basefee of the previous (latest) block if the request was successful, + /// or a sane default if the RPC failed. + async fn get_basefee(&self) -> Result, SimulatorError> { + match self.ru_provider.get_block_by_number(Latest).await { + Ok(maybe_block) => match maybe_block { + Some(block) => { + tracing::debug!(basefee = ?block.header.base_fee_per_gas, "basefee found"); + Ok(block.header.base_fee_per_gas) + } + None => Ok(None), + }, + Err(err) => Err(SimulatorError::Rpc(err.into())), + } } +} - /// Spawn the block builder task, returning the inbound channel to it, and - /// a handle to the running task. - pub fn spawn(mut self, outbound: mpsc::UnboundedSender) -> JoinHandle<()> { - tokio::spawn( - async move { - loop { - // sleep the buffer time - tokio::time::sleep(Duration::from_secs(self.secs_to_next_target())).await; - info!("beginning block build cycle"); - - // Build a block - let mut in_progress = InProgressBlock::default(); - self.get_transactions(&mut in_progress).await; - self.get_bundles(&mut in_progress).await; - - // Filter confirmed transactions from the block - self.filter_transactions(&mut in_progress).await; - - // submit the block if it has transactions - if !in_progress.is_empty() { - debug!(txns = in_progress.len(), "sending block to submit task"); - let in_progress_block = std::mem::take(&mut in_progress); - if outbound.send(in_progress_block).is_err() { - error!("downstream task gone"); - break; - } - } else { - debug!("no transactions, skipping block submission"); - } +/// Continuously updates the simulation cache with incoming transactions and bundles. +/// +/// This function listens for new transactions and bundles on their respective +/// channels and adds them to the simulation cache using the latest observed basefee. +/// +/// # Arguments +/// +/// - `tx_receiver`: A receiver channel for incoming Ethereum transactions. +/// - `bundle_receiver`: A receiver channel for incoming transaction bundles. +/// - `cache`: The simulation cache used to store transactions and bundles. +/// - `price_reader`: An `Arc` providing the latest basefee for simulation pricing. +async fn cache_updater( + mut tx_receiver: mpsc::UnboundedReceiver< + alloy::consensus::EthereumTxEnvelope, + >, + mut bundle_receiver: mpsc::UnboundedReceiver, + cache: SimCache, + price_reader: Arc, +) -> ! { + loop { + let p = price_reader.load(Ordering::Relaxed); + select! { + maybe_tx = tx_receiver.recv() => { + if let Some(tx) = maybe_tx { + tracing::debug!(tx = ?tx.hash(), "received transaction"); + cache.add_item(tx, p); } } - .in_current_span(), - ) + maybe_bundle = bundle_receiver.recv() => { + if let Some(bundle) = maybe_bundle { + tracing::debug!(bundle = ?bundle.id, "received bundle"); + cache.add_item(bundle.bundle, p); + } + } + } } } -#[cfg(test)] -mod tests { - use super::*; - use alloy::primitives::Address; - use alloy::{ - eips::eip2718::Encodable2718, - network::{EthereumWallet, TransactionBuilder}, - rpc::types::{TransactionRequest, mev::EthSendBundle}, - signers::local::PrivateKeySigner, - }; - use zenith_types::ZenithEthBundle; - - /// Create a mock bundle for testing with a single transaction - async fn create_mock_bundle(wallet: &EthereumWallet) -> Bundle { - let tx = TransactionRequest::default() - .to(Address::ZERO) - .from(wallet.default_signer().address()) - .nonce(1) - .max_fee_per_gas(2) - .max_priority_fee_per_gas(3) - .gas_limit(4) - .build(wallet) - .await - .unwrap() - .encoded_2718(); - - let eth_bundle = EthSendBundle { - txs: vec![tx.into()], - block_number: 1, - min_timestamp: Some(u64::MIN), - max_timestamp: Some(u64::MAX), - reverting_tx_hashes: vec![], - replacement_uuid: Some("replacement_uuid".to_owned()), - }; - - let zenith_bundle = ZenithEthBundle { bundle: eth_bundle, host_fills: None }; - - Bundle { id: "mock_bundle".to_owned(), bundle: zenith_bundle } +/// PecorinoCfg holds network-level configuration values. +#[derive(Debug, Clone, Copy)] +pub struct PecorinoCfg {} + +impl trevm::Cfg for PecorinoCfg { + /// Fills the configuration environment with Pecorino-specific values. + /// + /// # Arguments + /// + /// - `cfg_env`: The configuration environment to be filled. + fn fill_cfg_env(&self, cfg_env: &mut CfgEnv) { + let CfgEnv { chain_id, spec, .. } = cfg_env; + + *chain_id = PECORINO_CHAIN_ID; + *spec = SpecId::default(); } +} - #[tokio::test] - async fn test_ingest_bundle() { - // Setup random creds - let signer = PrivateKeySigner::random(); - let wallet = EthereumWallet::from(signer); - - // Create an empty InProgressBlock and bundle - let mut in_progress_block = InProgressBlock::new(); - let bundle = create_mock_bundle(&wallet).await; - - // Save previous hash for comparison - let prev_hash = in_progress_block.contents_hash(); - - // Ingest the bundle - in_progress_block.ingest_bundle(bundle); - - // Assert hash is changed after ingest - assert_ne!(prev_hash, in_progress_block.contents_hash(), "Bundle should change block hash"); +/// PecorinoBlockEnv holds block-level configurations for Pecorino blocks. +#[derive(Debug, Clone, Copy)] +pub struct PecorinoBlockEnv { + /// The block number for this block. + pub number: u64, + /// The address the block reward should be sent to. + pub beneficiary: Address, + /// Timestamp for the block. + pub timestamp: u64, + /// The gas limit for this block environment. + pub gas_limit: u64, + /// The basefee to use for calculating gas usage. + pub basefee: u64, + /// The prevrandao to use for this block. + pub prevrandao: Option>, +} - // Assert that the transaction was persisted into block - assert_eq!(in_progress_block.len(), 1, "Bundle should be persisted"); +/// Implements [`trevm::Block`] for the Pecorino block. +impl Block for PecorinoBlockEnv { + /// Fills the block environment with the Pecorino specific values + fn fill_block_env(&self, block_env: &mut trevm::revm::context::BlockEnv) { + // Destructure the fields off of the block_env and modify them + let BlockEnv { + number, + beneficiary, + timestamp, + gas_limit, + basefee, + difficulty, + prevrandao, + blob_excess_gas_and_price, + } = block_env; + *number = self.number; + *beneficiary = self.beneficiary; + *timestamp = self.timestamp; + *gas_limit = self.gas_limit; + *basefee = self.basefee; + *prevrandao = self.prevrandao; + + // NB: The following fields are set to sane defaults because they + // are not supported by the rollup + *difficulty = U256::ZERO; + *blob_excess_gas_and_price = + Some(BlobExcessGasAndPrice { excess_blob_gas: 0, blob_gasprice: 0 }); + } +} - // Assert that the block is properly sealed - let raw_encoding = in_progress_block.encode_raw(); - assert!(!raw_encoding.is_empty(), "Raw encoding should not be empty"); +impl PecorinoBlockEnv { + /// Returns a new PecorinoBlockEnv with the specified values. + /// + /// # Arguments + /// + /// - config: The BuilderConfig for the builder. + /// - number: The block number of this block, usually the latest block number plus 1, + /// unless simulating blocks in the past. + /// - timestamp: The timestamp of the block, typically set to the deadline of the + /// block building task. + fn new(config: BuilderConfig, number: u64, timestamp: u64, basefee: u64) -> Self { + PecorinoBlockEnv { + number, + beneficiary: config.builder_rewards_address, + timestamp, + gas_limit: config.rollup_block_gas_limit, + basefee, + prevrandao: Some(B256::random()), + } } } diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index 670ecc5..8664874 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -1,59 +1,71 @@ -//! Bundler service responsible for managing bundles. -use super::oauth::Authenticator; - +//! Bundler service responsible for fetching bundles and sending them to the simulator. pub use crate::config::BuilderConfig; - +use crate::tasks::oauth::Authenticator; use oauth2::TokenResponse; -use reqwest::Url; +use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::time::{Duration, Instant}; -use zenith_types::ZenithEthBundle; - -/// A bundle response from the tx-pool endpoint, containing a UUID and a -/// [`ZenithEthBundle`]. +use signet_bundle::SignetEthBundle; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; +use tokio::task::JoinHandle; +use tokio::time; +use tracing::{Instrument, debug, trace}; +/// Holds a bundle from the cache with a unique ID and a Zenith bundle. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Bundle { - /// The bundle id (a UUID) + /// Cache identifier for the bundle. pub id: String, - /// The bundle itself - pub bundle: ZenithEthBundle, + /// The corresponding Signet bundle. + pub bundle: SignetEthBundle, } /// Response from the tx-pool containing a list of bundles. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TxPoolBundleResponse { - /// the list of bundles + /// Bundle responses are available on the bundles property. pub bundles: Vec, } -/// The BundlePoller polls the tx-pool for bundles and manages the seen bundles. -#[derive(Debug)] +/// The BundlePoller polls the tx-pool for bundles. +#[derive(Debug, Clone)] pub struct BundlePoller { - /// Configuration + /// The builder configuration values. pub config: BuilderConfig, - /// [`Authenticator`] for fetching OAuth tokens + /// Authentication module that periodically fetches and stores auth tokens. pub authenticator: Authenticator, - /// Already seen bundle UUIDs - pub seen_uuids: HashMap, + /// Holds a Reqwest client + pub client: Client, + /// Defines the interval at which the bundler polls the tx-pool for bundles. + pub poll_interval_ms: u64, } -/// Implements a poller for the block builder to pull bundles from the tx cache. +/// Implements a poller for the block builder to pull bundles from the tx-pool. impl BundlePoller { /// Creates a new BundlePoller from the provided builder config. pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self { - Self { config: config.clone(), authenticator, seen_uuids: HashMap::new() } + Self { + config: config.clone(), + authenticator, + client: Client::new(), + poll_interval_ms: 1000, + } } - /// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache. - pub async fn check_bundle_cache(&mut self) -> eyre::Result> { - let mut unique: Vec = Vec::new(); + /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms. + pub fn new_with_poll_interval_ms( + config: &BuilderConfig, + authenticator: Authenticator, + poll_interval_ms: u64, + ) -> Self { + Self { config: config.clone(), authenticator, client: Client::new(), poll_interval_ms } + } + /// Fetches bundles from the transaction cache and returns them. + pub async fn check_bundle_cache(&mut self) -> eyre::Result> { let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?; let token = self.authenticator.fetch_oauth_token().await?; - // Add the token to the request headers - let result = reqwest::Client::new() + let result = self + .client .get(bundle_url) .bearer_auth(token.access_token().secret()) .send() @@ -61,38 +73,52 @@ impl BundlePoller { .error_for_status()?; let body = result.bytes().await?; - let bundles: TxPoolBundleResponse = serde_json::from_slice(&body)?; + let resp: TxPoolBundleResponse = serde_json::from_slice(&body)?; - bundles.bundles.iter().for_each(|bundle| { - self.check_seen_bundles(bundle.clone(), &mut unique); - }); - - Ok(unique) + Ok(resp.bundles) } - /// Checks if the bundle has been seen before and if not, adds it to the unique bundles list. - fn check_seen_bundles(&mut self, bundle: Bundle, unique: &mut Vec) { - self.seen_uuids.entry(bundle.id.clone()).or_insert_with(|| { - // add to the set of unique bundles - unique.push(bundle.clone()); - Instant::now() + Duration::from_secs(self.config.tx_pool_cache_duration) - }); - } + async fn task_future(mut self, outbound: UnboundedSender) { + loop { + let span = tracing::debug_span!("BundlePoller::loop", url = %self.config.tx_pool_url); - /// Evicts expired bundles from the cache. - pub fn evict(&mut self) { - let expired_keys: Vec = self - .seen_uuids - .iter() - .filter_map( - |(key, expiry)| { - if expiry.elapsed().is_zero() { Some(key.clone()) } else { None } - }, - ) - .collect(); + // Enter the span for the next check. + let _guard = span.enter(); - for key in expired_keys { - self.seen_uuids.remove(&key); + // Check this here to avoid making the web request if we know + // we don't need the results. + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + // exit the span after the check. + drop(_guard); + + match self.check_bundle_cache().instrument(span.clone()).await { + Ok(bundles) => { + tracing::debug!(count = ?bundles.len(), "found bundles"); + for bundle in bundles.into_iter() { + if let Err(err) = outbound.send(bundle) { + tracing::error!(err = ?err, "Failed to send bundle - channel is dropped"); + } + } + } + // If fetching was an error, we log and continue. We expect + // these to be transient network issues. + Err(e) => { + debug!(error = %e, "Error fetching bundles"); + } + } + time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; } } + + /// Spawns a task that sends bundles it finds to its channel sender. + pub fn spawn(self) -> (UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = unbounded_channel(); + + let jh = tokio::spawn(self.task_future(outbound)); + + (inbound, jh) + } } diff --git a/src/tasks/metrics.rs b/src/tasks/metrics.rs index 57d0334..6b41196 100644 --- a/src/tasks/metrics.rs +++ b/src/tasks/metrics.rs @@ -1,4 +1,4 @@ -use crate::config::Provider; +use crate::config::HostProvider; use alloy::{primitives::TxHash, providers::Provider as _}; use init4_bin_base::deps::metrics::{counter, histogram}; use std::time::Instant; @@ -9,7 +9,7 @@ use tracing::{debug, error}; #[derive(Debug, Clone)] pub struct MetricsTask { /// Ethereum Provider - pub host_provider: Provider, + pub host_provider: HostProvider, } impl MetricsTask { diff --git a/src/tasks/oauth.rs b/src/tasks/oauth.rs index 64c8859..69d3049 100644 --- a/src/tasks/oauth.rs +++ b/src/tasks/oauth.rs @@ -122,53 +122,21 @@ impl Authenticator { } mod tests { - use crate::config::BuilderConfig; - use alloy::primitives::Address; - use eyre::Result; - #[ignore = "integration test"] #[tokio::test] - async fn test_authenticator() -> Result<()> { + async fn test_authenticator() -> eyre::Result<()> { use super::*; + use crate::test_utils::setup_test_config; use oauth2::TokenResponse; let config = setup_test_config()?; let auth = Authenticator::new(&config); - let token = auth.fetch_oauth_token().await?; - dbg!(&token); + + let _ = auth.fetch_oauth_token().await?; + let token = auth.token().await.unwrap(); - println!("{:?}", token); + assert!(!token.access_token().secret().is_empty()); Ok(()) } - - #[allow(dead_code)] - fn setup_test_config() -> Result { - let config = BuilderConfig { - host_chain_id: 17000, - ru_chain_id: 17001, - host_rpc_url: "host-rpc.example.com".into(), - ru_rpc_url: "ru-rpc.example.com".into(), - zenith_address: Address::default(), - quincey_url: "http://localhost:8080".into(), - builder_port: 8080, - sequencer_key: None, - builder_key: "0000000000000000000000000000000000000000000000000000000000000000".into(), - block_confirmation_buffer: 1, - chain_offset: 0, - target_slot_time: 1, - builder_rewards_address: Address::default(), - rollup_block_gas_limit: 100_000, - tx_pool_url: "http://localhost:9000/".into(), - tx_pool_cache_duration: 5, - oauth_client_id: "some_client_id".into(), - oauth_client_secret: "some_client_secret".into(), - oauth_authenticate_url: "http://localhost:9000".into(), - oauth_token_url: "http://localhost:9000".into(), - tx_broadcast_urls: vec!["http://localhost:9000".into()], - oauth_token_refresh_interval: 300, // 5 minutes - builder_helper_address: Address::default(), - }; - Ok(config) - } } diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 0f1b0bf..925ebb1 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -1,7 +1,6 @@ use crate::{ - config::{Provider, ZenithInstance}, + config::{HostProvider, ZenithInstance}, signer::LocalOrAws, - tasks::block::InProgressBlock, utils::extract_signature_components, }; use alloy::{ @@ -18,14 +17,15 @@ use alloy::{ use eyre::{bail, eyre}; use init4_bin_base::deps::metrics::{counter, histogram}; use oauth2::TokenResponse; +use signet_sim::BuiltBlock; +use signet_types::{SignRequest, SignResponse}; +use signet_zenith::{ + BundleHelper::{self, BlockHeader, FillPermit2, submitCall}, + Zenith::IncorrectHostBlock, +}; use std::time::Instant; use tokio::{sync::mpsc, task::JoinHandle}; use tracing::{debug, error, instrument, trace}; -use zenith_types::{ - BundleHelper::{self, FillPermit2}, - SignRequest, SignResponse, - Zenith::IncorrectHostBlock, -}; macro_rules! spawn_provider_send { ($provider:expr, $tx:expr) => { @@ -58,7 +58,7 @@ pub enum ControlFlow { #[derive(Debug, Clone)] pub struct SubmitTask { /// Ethereum Provider - pub host_provider: Provider, + pub host_provider: HostProvider, /// Zenith pub zenith: ZenithInstance, /// Reqwest @@ -103,7 +103,7 @@ impl SubmitTask { /// Constructs the signing request from the in-progress block passed to it and assigns the /// correct height, chain ID, gas limit, and rollup reward address. #[instrument(skip_all)] - async fn construct_sig_request(&self, contents: &InProgressBlock) -> eyre::Result { + async fn construct_sig_request(&self, contents: &BuiltBlock) -> eyre::Result { let ru_chain_id = U256::from(self.config.ru_chain_id); let next_block_height = self.next_host_block_height().await?; @@ -113,7 +113,7 @@ impl SubmitTask { ru_chain_id, gas_limit: U256::from(self.config.rollup_block_gas_limit), ru_reward_address: self.config.builder_rewards_address, - contents: contents.contents_hash(), + contents: *contents.contents_hash(), }) } @@ -125,9 +125,9 @@ impl SubmitTask { v: u8, r: FixedBytes<32>, s: FixedBytes<32>, - in_progress: &InProgressBlock, + in_progress: &BuiltBlock, ) -> eyre::Result { - let data = zenith_types::BundleHelper::submitCall { fills, header, v, r, s }.abi_encode(); + let data = submitCall { fills, header, v, r, s }.abi_encode(); let sidecar = in_progress.encode_blob::().build()?; Ok(TransactionRequest::default() @@ -147,16 +147,16 @@ impl SubmitTask { async fn submit_transaction( &self, resp: &SignResponse, - in_progress: &InProgressBlock, + in_progress: &BuiltBlock, ) -> eyre::Result { let (v, r, s) = extract_signature_components(&resp.sig); - let header = zenith_types::BundleHelper::BlockHeader { + let header = BlockHeader { hostBlockNumber: resp.req.host_block_number, rollupChainId: U256::from(self.config.ru_chain_id), gasLimit: resp.req.gas_limit, rewardAddress: resp.req.ru_reward_address, - blockDataHash: in_progress.contents_hash(), + blockDataHash: *in_progress.contents_hash(), }; let fills = vec![]; // NB: ignored until fills are implemented @@ -167,7 +167,7 @@ impl SubmitTask { .with_gas_limit(1_000_000); if let Err(TransportError::ErrorResp(e)) = - self.host_provider.call(&tx).block(BlockNumberOrTag::Pending.into()).await + self.host_provider.call(tx.clone()).block(BlockNumberOrTag::Pending.into()).await { error!( code = e.code, @@ -233,9 +233,9 @@ impl SubmitTask { } #[instrument(skip_all, err)] - async fn handle_inbound(&self, in_progress: &InProgressBlock) -> eyre::Result { - tracing::info!(txns = in_progress.len(), "handling inbound block"); - let sig_request = match self.construct_sig_request(in_progress).await { + async fn handle_inbound(&self, block: &BuiltBlock) -> eyre::Result { + tracing::info!(txns = block.tx_count(), "handling inbound block"); + let sig_request = match self.construct_sig_request(block).await { Ok(sig_request) => sig_request, Err(e) => { tracing::error!(error = %e, "error constructing signature request"); @@ -274,11 +274,11 @@ impl SubmitTask { resp }; - self.submit_transaction(&signed, in_progress).await + self.submit_transaction(&signed, block).await } /// Spawns the in progress block building task - pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { + pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { let (sender, mut inbound) = mpsc::unbounded_channel(); let handle = tokio::spawn(async move { loop { diff --git a/src/tasks/tx_poller.rs b/src/tasks/tx_poller.rs index 4a83577..bbf2b53 100644 --- a/src/tasks/tx_poller.rs +++ b/src/tasks/tx_poller.rs @@ -1,39 +1,95 @@ +//! Transaction service responsible for fetching and sending trasnsactions to the simulator. +use crate::config::BuilderConfig; use alloy::consensus::TxEnvelope; use eyre::Error; use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; use serde_json::from_slice; +use tokio::{sync::mpsc, task::JoinHandle, time}; +use tracing::{Instrument, debug, trace}; -pub use crate::config::BuilderConfig; - -/// Response from the tx-pool endpoint. +/// Models a response from the transaction pool. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TxPoolResponse { + /// Holds the transactions property as a list on the response. transactions: Vec, } /// Implements a poller for the block builder to pull transactions from the transaction pool. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TxPoller { - /// config for the builder + /// Config values from the Builder. pub config: BuilderConfig, - /// Reqwest client for fetching transactions from the tx-pool + /// Reqwest Client for fetching transactions from the cache. pub client: Client, + /// Defines the interval at which the service should poll the cache. + pub poll_interval_ms: u64, } -/// TxPoller implements a poller that fetches unique transactions from the transaction pool. +/// [`TxPoller`] implements a poller task that fetches transactions from the transaction pool +/// and sends them into the provided channel sender. impl TxPoller { - /// returns a new TxPoller with the given config. + /// Returns a new [`TxPoller`] with the given config. + /// * Defaults to 1000ms poll interval (1s). pub fn new(config: &BuilderConfig) -> Self { - Self { config: config.clone(), client: Client::new() } + Self { config: config.clone(), client: Client::new(), poll_interval_ms: 1000 } + } + + /// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds. + pub fn new_with_poll_interval_ms(config: &BuilderConfig, poll_interval_ms: u64) -> Self { + Self { config: config.clone(), client: Client::new(), poll_interval_ms } } - /// polls the tx-pool for unique transactions and evicts expired transactions. - /// unique transactions that haven't been seen before are sent into the builder pipeline. + /// Polls the transaction cache for transactions. pub async fn check_tx_cache(&mut self) -> Result, Error> { let url: Url = Url::parse(&self.config.tx_pool_url)?.join("transactions")?; let result = self.client.get(url).send().await?; let response: TxPoolResponse = from_slice(result.text().await?.as_bytes())?; Ok(response.transactions) } + + async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + loop { + let span = tracing::debug_span!("TxPoller::loop", url = %self.config.tx_pool_url); + + // Enter the span for the next check. + let _guard = span.enter(); + + // Check this here to avoid making the web request if we know + // we don't need the results. + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + // exit the span after the check. + drop(_guard); + + match self.check_tx_cache().instrument(span.clone()).await { + Ok(transactions) => { + let _guard = span.entered(); + debug!(count = ?transactions.len(), "found transactions"); + for tx in transactions.into_iter() { + if outbound.send(tx).is_err() { + // If there are no receivers, we can shut down + trace!("No receivers left, shutting down"); + break; + } + } + } + // If fetching was an error, we log and continue. We expect + // these to be transient network issues. + Err(e) => { + debug!(error = %e, "Error fetching transactions"); + } + } + time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; + } + } + + /// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender. + pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = mpsc::unbounded_channel(); + let jh = tokio::spawn(self.task_future(outbound)); + (inbound, jh) + } } diff --git a/src/test_utils.rs b/src/test_utils.rs new file mode 100644 index 0000000..2e6fc8d --- /dev/null +++ b/src/test_utils.rs @@ -0,0 +1,102 @@ +//! Test utilities for testing builder tasks +use crate::{ + config::BuilderConfig, + constants::{PECORINO_CHAIN_ID, PECORINO_HOST_CHAIN_ID}, + tasks::block::PecorinoBlockEnv, +}; +use alloy::{ + consensus::{SignableTransaction, TxEip1559, TxEnvelope}, + primitives::{Address, FixedBytes, TxKind, U256}, + signers::{SignerSync, local::PrivateKeySigner}, +}; +use chrono::{DateTime, Utc}; +use eyre::Result; +use std::{ + str::FromStr, + time::{Instant, SystemTime}, +}; +use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt}; + +/// Sets up a block builder with test values +pub fn setup_test_config() -> Result { + let config = BuilderConfig { + host_chain_id: PECORINO_HOST_CHAIN_ID, + ru_chain_id: PECORINO_CHAIN_ID, + host_rpc_url: "https://host-rpc.pecorino.signet.sh".into(), + ru_rpc_url: "https://rpc.pecorino.signet.sh".into(), + tx_broadcast_urls: vec!["http://localhost:9000".into()], + zenith_address: Address::default(), + quincey_url: "http://localhost:8080".into(), + builder_port: 8080, + sequencer_key: None, + builder_key: "0000000000000000000000000000000000000000000000000000000000000000".into(), + block_confirmation_buffer: 1, + chain_offset: 0, + target_slot_time: 1, + builder_rewards_address: Address::default(), + rollup_block_gas_limit: 3_000_000_000, + tx_pool_url: "http://localhost:9000/".into(), + tx_pool_cache_duration: 5, + oauth_client_id: "some_client_id".into(), + oauth_client_secret: "some_client_secret".into(), + oauth_authenticate_url: "http://localhost:8080".into(), + oauth_token_url: "http://localhost:8080".into(), + oauth_token_refresh_interval: 300, // 5 minutes + builder_helper_address: Address::default(), + concurrency_limit: 1000, + start_timestamp: 1740681556, // pecorino start timestamp as sane default + }; + Ok(config) +} + +/// Returns a new signed test transaction with the provided nonce, value, and mpfpg. +pub fn new_signed_tx( + wallet: &PrivateKeySigner, + nonce: u64, + value: U256, + mpfpg: u128, +) -> Result { + let tx = TxEip1559 { + chain_id: PECORINO_CHAIN_ID, + nonce, + max_fee_per_gas: 50_000, + max_priority_fee_per_gas: mpfpg, + to: TxKind::Call(Address::from_str("0x0000000000000000000000000000000000000000").unwrap()), + value, + gas_limit: 50_000, + ..Default::default() + }; + let signature = wallet.sign_hash_sync(&tx.signature_hash())?; + Ok(TxEnvelope::Eip1559(tx.into_signed(signature))) +} + +/// Initializes a logger that prints during testing +pub fn setup_logging() { + // Initialize logging + let filter = EnvFilter::from_default_env(); + let fmt = tracing_subscriber::fmt::layer().with_filter(filter); + let registry = tracing_subscriber::registry().with(fmt); + let _ = registry.try_init(); +} + +/// Returns a Pecorino block environment for simulation with the timestamp set to `finish_by`, +/// the block number set to latest + 1, system gas configs, and a beneficiary address. +pub fn test_block_env( + config: BuilderConfig, + number: u64, + basefee: u64, + finish_by: Instant, +) -> PecorinoBlockEnv { + let remaining = finish_by.duration_since(Instant::now()); + let finish_time = SystemTime::now() + remaining; + let deadline: DateTime = finish_time.into(); + + PecorinoBlockEnv { + number, + beneficiary: Address::repeat_byte(0), + timestamp: deadline.timestamp() as u64, + gas_limit: config.rollup_block_gas_limit, + basefee, + prevrandao: Some(FixedBytes::random()), + } +} diff --git a/tests/block_builder_test.rs b/tests/block_builder_test.rs new file mode 100644 index 0000000..f1126f9 --- /dev/null +++ b/tests/block_builder_test.rs @@ -0,0 +1,143 @@ +//! Tests for the block building task. +#[cfg(test)] +mod tests { + use alloy::{ + network::Ethereum, + node_bindings::Anvil, + primitives::U256, + providers::{Provider, RootProvider}, + signers::local::PrivateKeySigner, + }; + use builder::{ + constants::PECORINO_CHAIN_ID, + tasks::block::Simulator, + test_utils::{new_signed_tx, setup_logging, setup_test_config, test_block_env}, + }; + use signet_sim::{SimCache, SimItem}; + use signet_types::SlotCalculator; + use std::{ + sync::Arc, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, + }; + use tokio::{sync::mpsc::unbounded_channel, time::timeout}; + + /// Tests the `handle_build` method of the `Simulator`. + /// + /// This test sets up a simulated environment using Anvil, creates a block builder, + /// and verifies that the block builder can successfully build a block containing + /// transactions from multiple senders. + #[ignore = "integration test"] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_handle_build() { + setup_logging(); + + // Make a test config + let config = setup_test_config().unwrap(); + let constants = config.load_pecorino_constants(); + + // Create an anvil instance for testing + let anvil_instance = Anvil::new().chain_id(PECORINO_CHAIN_ID).spawn(); + + // Create a wallet + let keys = anvil_instance.keys(); + let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into()); + let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into()); + + // Create a rollup provider + let ru_provider = RootProvider::::new_http(anvil_instance.endpoint_url()); + + // Create a block builder with a slot calculator for testing + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Clock may have gone backwards") + .as_secs(); + + let slot_calculator = SlotCalculator::new(now, 0, 12); + let block_builder = Simulator::new(&config, ru_provider.clone(), slot_calculator); + + // Setup a sim cache + let sim_items = SimCache::new(); + + // Add two transactions from two senders to the sim cache + let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap(); + sim_items.add_item(SimItem::Tx(tx_1), 0); + + let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap(); + sim_items.add_item(SimItem::Tx(tx_2), 0); + + // Setup the block env + let finish_by = Instant::now() + Duration::from_secs(2); + let block_number = ru_provider.get_block_number().await.unwrap(); + let block_env = test_block_env(config, block_number, 7, finish_by); + + // Spawn the block builder task + let got = block_builder.handle_build(constants, sim_items, finish_by, block_env).await; + + // Assert on the built block + assert!(got.is_ok()); + assert!(got.unwrap().tx_count() == 2); + } + + /// Tests the full block builder loop, including transaction ingestion and block simulation. + /// + /// This test sets up a simulated environment using Anvil, creates a block builder, + /// and verifies that the builder can process incoming transactions and produce a block + /// within a specified timeout. + #[ignore = "integration test"] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_spawn() { + setup_logging(); + + // Make a test config + let config = setup_test_config().unwrap(); + let constants = config.load_pecorino_constants(); + + // Create an anvil instance for testing + let anvil_instance = Anvil::new().chain_id(PECORINO_CHAIN_ID).spawn(); + + // Create a wallet + let keys = anvil_instance.keys(); + let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into()); + let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into()); + + // Plumb inputs for the test setup + let (tx_sender, tx_receiver) = unbounded_channel(); + let (_, bundle_receiver) = unbounded_channel(); + let (block_sender, mut block_receiver) = unbounded_channel(); + + // Create a rollup provider + let ru_provider = RootProvider::::new_http(anvil_instance.endpoint_url()); + + // Create a builder with a test slot calculator + let slot_calculator = SlotCalculator::new( + config.start_timestamp, + config.chain_offset, + config.target_slot_time, + ); + let sim = Arc::new(Simulator::new(&config, ru_provider.clone(), slot_calculator)); + + // Create a shared sim cache + let sim_cache = SimCache::new(); + + // Create a sim cache and start filling it with items + sim.clone().spawn_cache_tasks(tx_receiver, bundle_receiver, sim_cache.clone()); + + // Finally, Kick off the block builder task. + sim.clone().spawn_simulator_task(constants, sim_cache.clone(), block_sender); + + // Feed in transactions to the tx_sender and wait for the block to be simulated + let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap(); + let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap(); + tx_sender.send(tx_1).unwrap(); + tx_sender.send(tx_2).unwrap(); + + // Wait for a block with timeout + let result = timeout(Duration::from_secs(5), block_receiver.recv()).await; + assert!(result.is_ok(), "Did not receive block within 5 seconds"); + + // Assert on the block + let block = result.unwrap(); + assert!(block.is_some(), "Block channel closed without receiving a block"); + assert!(block.unwrap().tx_count() == 2); // TODO: Why is this failing? I'm seeing EVM errors but haven't tracked them down yet. + } +} diff --git a/tests/bundle_poller_test.rs b/tests/bundle_poller_test.rs index 53fa51c..995419d 100644 --- a/tests/bundle_poller_test.rs +++ b/tests/bundle_poller_test.rs @@ -1,48 +1,17 @@ mod tests { - use alloy::primitives::Address; - use builder::{config::BuilderConfig, tasks::oauth::Authenticator}; + use builder::{tasks::oauth::Authenticator, test_utils}; use eyre::Result; #[ignore = "integration test"] #[tokio::test] async fn test_bundle_poller_roundtrip() -> Result<()> { - let config = setup_test_config().await.unwrap(); + let config = test_utils::setup_test_config().unwrap(); let auth = Authenticator::new(&config); + let mut bundle_poller = builder::tasks::bundler::BundlePoller::new(&config, auth); - let got = bundle_poller.check_bundle_cache().await?; - dbg!(got); + let _ = bundle_poller.check_bundle_cache().await?; Ok(()) } - - async fn setup_test_config() -> Result { - let config = BuilderConfig { - host_chain_id: 17000, - ru_chain_id: 17001, - host_rpc_url: "host-rpc.example.com".into(), - ru_rpc_url: "ru-rpc.example.com".into(), - zenith_address: Address::default(), - quincey_url: "http://localhost:8080".into(), - builder_port: 8080, - sequencer_key: None, - builder_key: "0000000000000000000000000000000000000000000000000000000000000000".into(), - block_confirmation_buffer: 1, - chain_offset: 0, - target_slot_time: 1, - builder_rewards_address: Address::default(), - rollup_block_gas_limit: 100_000, - tx_pool_url: "http://localhost:9000/".into(), - // tx_pool_url: "https://transactions.holesky.signet.sh".into(), - tx_pool_cache_duration: 5, - oauth_client_id: "some_client_id".into(), - oauth_client_secret: "some_client_secret".into(), - oauth_authenticate_url: "http://localhost:8080".into(), - oauth_token_url: "http://localhost:8080".into(), - tx_broadcast_urls: vec!["http://localhost:9000".into()], - oauth_token_refresh_interval: 300, // 5 minutes - builder_helper_address: Address::default(), - }; - Ok(config) - } } diff --git a/tests/tx_poller_test.rs b/tests/tx_poller_test.rs index f0c4756..3d8e5c6 100644 --- a/tests/tx_poller_test.rs +++ b/tests/tx_poller_test.rs @@ -1,18 +1,18 @@ mod tests { - use std::str::FromStr; - - use alloy::consensus::{SignableTransaction, TxEip1559, TxEnvelope}; - use alloy::primitives::{Address, TxKind, U256, bytes}; - use alloy::signers::{SignerSync, local::PrivateKeySigner}; + use alloy::primitives::U256; + use alloy::signers::local::PrivateKeySigner; use builder::config::BuilderConfig; use builder::tasks::tx_poller; + use builder::test_utils::{new_signed_tx, setup_logging, setup_test_config}; // Import the refactored function use eyre::{Ok, Result}; #[ignore = "integration test"] #[tokio::test] async fn test_tx_roundtrip() -> Result<()> { + setup_logging(); + // Create a new test environment - let config = setup_test_config().await?; + let config = setup_test_config()?; // Post a transaction to the cache post_tx(&config).await?; @@ -31,8 +31,9 @@ mod tests { async fn post_tx(config: &BuilderConfig) -> Result<()> { let client = reqwest::Client::new(); + let wallet = PrivateKeySigner::random(); - let tx_envelope = new_test_tx(&wallet)?; + let tx_envelope = new_signed_tx(&wallet, 1, U256::from(1), 10_000)?; let url = format!("{}/transactions", config.tx_pool_url); let response = client.post(&url).json(&tx_envelope).send().await?; @@ -44,51 +45,4 @@ mod tests { Ok(()) } - - // Returns a new signed test transaction with default values - fn new_test_tx(wallet: &PrivateKeySigner) -> Result { - let tx = TxEip1559 { - chain_id: 17001, - nonce: 1, - gas_limit: 50000, - to: TxKind::Call( - Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), - ), - value: U256::from(1_f64), - input: bytes!(""), - ..Default::default() - }; - let signature = wallet.sign_hash_sync(&tx.signature_hash())?; - Ok(TxEnvelope::Eip1559(tx.into_signed(signature))) - } - - // Sets up a block builder with test values - pub async fn setup_test_config() -> Result { - let config = BuilderConfig { - host_chain_id: 17000, - ru_chain_id: 17001, - host_rpc_url: "host-rpc.example.com".into(), - ru_rpc_url: "ru-rpc.example.com".into(), - tx_broadcast_urls: vec!["http://localhost:9000".into()], - zenith_address: Address::default(), - quincey_url: "http://localhost:8080".into(), - builder_port: 8080, - sequencer_key: None, - builder_key: "0000000000000000000000000000000000000000000000000000000000000000".into(), - block_confirmation_buffer: 1, - chain_offset: 0, - target_slot_time: 1, - builder_rewards_address: Address::default(), - rollup_block_gas_limit: 100_000, - tx_pool_url: "http://localhost:9000/".into(), - tx_pool_cache_duration: 5, - oauth_client_id: "some_client_id".into(), - oauth_client_secret: "some_client_secret".into(), - oauth_authenticate_url: "http://localhost:8080".into(), - oauth_token_url: "http://localhost:8080".into(), - oauth_token_refresh_interval: 300, // 5 minutes - builder_helper_address: Address::default(), - }; - Ok(config) - } }