diff --git a/chain/arweave/src/chain.rs b/chain/arweave/src/chain.rs index 40f2538a400..449c077b69f 100644 --- a/chain/arweave/src/chain.rs +++ b/chain/arweave/src/chain.rs @@ -7,7 +7,7 @@ use graph::blockchain::{ }; use graph::cheap_clone::CheapClone; use graph::components::network_provider::ChainName; -use graph::components::store::{DeploymentCursorTracker, SourceableStore}; +use graph::components::store::{ChainHeadStore, DeploymentCursorTracker, SourceableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::env::EnvVars; use graph::firehose::FirehoseEndpoint; @@ -24,7 +24,7 @@ use graph::{ }, components::store::DeploymentLocator, firehose::{self as firehose, ForkStep}, - prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory}, + prelude::{async_trait, o, BlockNumber, Error, Logger, LoggerFactory}, }; use prost::Message; use std::collections::BTreeSet; @@ -46,7 +46,7 @@ pub struct Chain { logger_factory: LoggerFactory, name: ChainName, client: Arc>, - chain_store: Arc, + chain_head_store: Arc, metrics_registry: Arc, } @@ -63,7 +63,7 @@ impl BlockchainBuilder for BasicBlockchainBuilder { logger_factory: self.logger_factory, name: self.name, client: Arc::new(ChainClient::::new_firehose(self.firehose_endpoints)), - chain_store: self.chain_store, + chain_head_store: self.chain_head_store, metrics_registry: self.metrics_registry, } } @@ -155,8 +155,8 @@ impl Blockchain for Chain { ))) } - fn chain_store(&self) -> Arc { - self.chain_store.clone() + async fn chain_head_ptr(&self) -> Result, Error> { + self.chain_head_store.cheap_clone().chain_head_ptr().await } async fn block_pointer_from_number( @@ -182,7 +182,7 @@ impl Blockchain for Chain { async fn block_ingestor(&self) -> anyhow::Result> { let ingestor = FirehoseBlockIngestor::::new( - self.chain_store.cheap_clone(), + self.chain_head_store.cheap_clone(), self.chain_client(), self.logger_factory .component_logger("ArweaveFirehoseBlockIngestor", None), diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 0408771f23e..35c155b9c0f 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -13,7 +13,7 @@ use graph::firehose::{FirehoseEndpoint, ForkStep}; use graph::futures03::TryStreamExt; use graph::prelude::{ retry, BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock, - EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry, + EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry, StoreError, }; use graph::schema::InputSchema; use graph::slog::{debug, error, trace, warn}; @@ -25,7 +25,6 @@ use graph::{ FirehoseMapper as FirehoseMapperTrait, TriggersAdapter as TriggersAdapterTrait, }, firehose_block_stream::FirehoseBlockStream, - polling_block_stream::PollingBlockStream, Block, BlockPtr, Blockchain, ChainHeadUpdateListener, IngestorError, RuntimeAdapter as RuntimeAdapterTrait, TriggerFilter as _, }, @@ -34,7 +33,7 @@ use graph::{ firehose, prelude::{ async_trait, o, serde_json as json, BlockNumber, ChainStore, EthereumBlockWithCalls, - Logger, LoggerFactory, NodeId, + Logger, LoggerFactory, }, }; use prost::Message; @@ -49,6 +48,7 @@ use crate::data_source::DataSourceTemplate; use crate::data_source::UnresolvedDataSourceTemplate; use crate::ingestor::PollingBlockIngestor; use crate::network::EthereumNetworkAdapters; +use crate::polling_block_stream::PollingBlockStream; use crate::runtime::runtime_adapter::eth_call_gas; use crate::{ adapter::EthereumAdapter as _, @@ -175,7 +175,6 @@ impl BlockStreamBuilder for EthereumStreamBuilder { .logger_factory .subgraph_logger(&deployment) .new(o!("component" => "BlockStream")); - let chain_store = chain.chain_store(); let chain_head_update_stream = chain .chain_head_update_listener .subscribe(chain.name.to_string(), logger.clone()); @@ -213,10 +212,8 @@ impl BlockStreamBuilder for EthereumStreamBuilder { }; Ok(Box::new(PollingBlockStream::new( - chain_store, chain_head_update_stream, Arc::new(adapter), - chain.node_id.clone(), deployment.hash, filter, start_blocks, @@ -336,7 +333,6 @@ impl RuntimeAdapterBuilder for EthereumRuntimeAdapterBuilder { pub struct Chain { logger_factory: LoggerFactory, pub name: ChainName, - node_id: NodeId, registry: Arc, client: Arc>, chain_store: Arc, @@ -363,7 +359,6 @@ impl Chain { pub fn new( logger_factory: LoggerFactory, name: ChainName, - node_id: NodeId, registry: Arc, chain_store: Arc, call_cache: Arc, @@ -381,7 +376,6 @@ impl Chain { Chain { logger_factory, name, - node_id, registry, client, chain_store, @@ -403,6 +397,13 @@ impl Chain { self.call_cache.clone() } + pub async fn block_number( + &self, + hash: &BlockHash, + ) -> Result, Option)>, StoreError> { + self.chain_store.block_number(hash).await + } + // TODO: This is only used to build the block stream which could prolly // be moved to the chain itself and return a block stream future that the // caller can spawn. @@ -507,8 +508,8 @@ impl Blockchain for Chain { } } - fn chain_store(&self) -> Arc { - self.chain_store.clone() + async fn chain_head_ptr(&self) -> Result, Error> { + self.chain_store.cheap_clone().chain_head_ptr().await } async fn block_pointer_from_number( @@ -578,7 +579,7 @@ impl Blockchain for Chain { let ingestor: Box = match self.chain_client().as_ref() { ChainClient::Firehose(_) => { let ingestor = FirehoseBlockIngestor::::new( - self.chain_store.cheap_clone(), + self.chain_store.cheap_clone().as_head_store(), self.chain_client(), self.logger_factory .component_logger("EthereumFirehoseBlockIngestor", None), @@ -615,7 +616,7 @@ impl Blockchain for Chain { logger, graph::env::ENV_VARS.reorg_threshold(), self.chain_client(), - self.chain_store().cheap_clone(), + self.chain_store.cheap_clone(), self.polling_ingestor_interval, self.name.clone(), )?) diff --git a/chain/ethereum/src/lib.rs b/chain/ethereum/src/lib.rs index 8cf4e4cc669..fa76f70d799 100644 --- a/chain/ethereum/src/lib.rs +++ b/chain/ethereum/src/lib.rs @@ -6,6 +6,7 @@ mod data_source; mod env; mod ethereum_adapter; mod ingestor; +mod polling_block_stream; pub mod runtime; mod transport; diff --git a/graph/src/blockchain/polling_block_stream.rs b/chain/ethereum/src/polling_block_stream.rs similarity index 93% rename from graph/src/blockchain/polling_block_stream.rs rename to chain/ethereum/src/polling_block_stream.rs index fa774261227..a215f775685 100644 --- a/graph/src/blockchain/polling_block_stream.rs +++ b/chain/ethereum/src/polling_block_stream.rs @@ -1,5 +1,5 @@ -use anyhow::Error; -use futures03::{stream::Stream, Future, FutureExt}; +use anyhow::{anyhow, Error}; +use graph::tokio; use std::cmp; use std::collections::VecDeque; use std::pin::Pin; @@ -7,23 +7,24 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; -use super::block_stream::{ +use graph::blockchain::block_stream::{ BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, ChainHeadUpdateStream, FirehoseCursor, TriggersAdapterWrapper, BUFFERED_BLOCK_STREAM_SIZE, }; -use super::{Block, BlockPtr, Blockchain, TriggerFilterWrapper}; +use graph::blockchain::{Block, BlockPtr, TriggerFilterWrapper}; +use graph::futures03::{stream::Stream, Future, FutureExt}; +use graph::prelude::{DeploymentHash, BLOCK_NUMBER_MAX}; +use graph::slog::{debug, info, trace, warn, Logger}; -use crate::components::store::BlockNumber; -use crate::data::subgraph::UnifiedMappingApiVersion; -use crate::prelude::*; +use graph::components::store::BlockNumber; +use graph::data::subgraph::UnifiedMappingApiVersion; + +use crate::Chain; // A high number here forces a slow start. const STARTING_PREVIOUS_TRIGGERS_PER_BLOCK: f64 = 1_000_000.0; -enum BlockStreamState -where - C: Blockchain, -{ +enum BlockStreamState { /// Starting or restarting reconciliation. /// /// Valid next states: Reconciliation @@ -32,13 +33,13 @@ where /// The BlockStream is reconciling the subgraph store state with the chain store state. /// /// Valid next states: YieldingBlocks, Idle, BeginReconciliation (in case of revert) - Reconciliation(Pin, Error>> + Send>>), + Reconciliation(Pin> + Send>>), /// The BlockStream is emitting blocks that must be processed in order to bring the subgraph /// store up to date with the chain store. /// /// Valid next states: BeginReconciliation - YieldingBlocks(Box>>), + YieldingBlocks(Box>>), /// The BlockStream experienced an error and is pausing before attempting to produce /// blocks again. @@ -55,16 +56,13 @@ where /// A single next step to take in reconciling the state of the subgraph store with the state of the /// chain store. -enum ReconciliationStep -where - C: Blockchain, -{ +enum ReconciliationStep { /// Revert(to) the block the subgraph should be reverted to, so it becomes the new subgraph /// head. Revert(BlockPtr), /// Move forwards, processing one or more blocks. Second element is the block range size. - ProcessDescendantBlocks(Vec>, BlockNumber), + ProcessDescendantBlocks(Vec>, BlockNumber), /// This step is a no-op, but we need to check again for a next step. Retry, @@ -74,18 +72,13 @@ where Done, } -struct PollingBlockStreamContext -where - C: Blockchain, -{ - chain_store: Arc, - adapter: Arc>, - node_id: NodeId, +struct PollingBlockStreamContext { + adapter: Arc>, subgraph_id: DeploymentHash, // This is not really a block number, but the (unsigned) difference // between two block numbers reorg_threshold: BlockNumber, - filter: Arc>, + filter: Arc>, start_blocks: Vec, logger: Logger, previous_triggers_per_block: f64, @@ -98,12 +91,10 @@ where current_block: Option, } -impl Clone for PollingBlockStreamContext { +impl Clone for PollingBlockStreamContext { fn clone(&self) -> Self { Self { - chain_store: self.chain_store.cheap_clone(), adapter: self.adapter.clone(), - node_id: self.node_id.clone(), subgraph_id: self.subgraph_id.clone(), reorg_threshold: self.reorg_threshold, filter: self.filter.clone(), @@ -119,37 +110,29 @@ impl Clone for PollingBlockStreamContext { } } -pub struct PollingBlockStream { - state: BlockStreamState, +pub struct PollingBlockStream { + state: BlockStreamState, consecutive_err_count: u32, chain_head_update_stream: ChainHeadUpdateStream, - ctx: PollingBlockStreamContext, + ctx: PollingBlockStreamContext, } // This is the same as `ReconciliationStep` but without retries. -enum NextBlocks -where - C: Blockchain, -{ +enum NextBlocks { /// Blocks and range size - Blocks(VecDeque>, BlockNumber), + Blocks(VecDeque>, BlockNumber), // The payload is block the subgraph should be reverted to, so it becomes the new subgraph head. Revert(BlockPtr), Done, } -impl PollingBlockStream -where - C: Blockchain, -{ +impl PollingBlockStream { pub fn new( - chain_store: Arc, chain_head_update_stream: ChainHeadUpdateStream, - adapter: Arc>, - node_id: NodeId, + adapter: Arc>, subgraph_id: DeploymentHash, - filter: Arc>, + filter: Arc>, start_blocks: Vec, reorg_threshold: BlockNumber, logger: Logger, @@ -164,9 +147,7 @@ where chain_head_update_stream, ctx: PollingBlockStreamContext { current_block: start_block, - chain_store, adapter, - node_id, subgraph_id, reorg_threshold, logger, @@ -182,12 +163,9 @@ where } } -impl PollingBlockStreamContext -where - C: Blockchain, -{ +impl PollingBlockStreamContext { /// Perform reconciliation steps until there are blocks to yield or we are up-to-date. - async fn next_blocks(&self) -> Result, Error> { + async fn next_blocks(&self) -> Result { let ctx = self.clone(); loop { @@ -212,7 +190,7 @@ where } /// Determine the next reconciliation step. Does not modify Store or ChainStore. - async fn get_next_step(&self) -> Result, Error> { + async fn get_next_step(&self) -> Result { let ctx = self.clone(); let start_blocks = self.start_blocks.clone(); let max_block_range_size = self.max_block_range_size; @@ -498,14 +476,14 @@ where } } -impl BlockStream for PollingBlockStream { +impl BlockStream for PollingBlockStream { fn buffer_size_hint(&self) -> usize { BUFFERED_BLOCK_STREAM_SIZE } } -impl Stream for PollingBlockStream { - type Item = Result, BlockStreamError>; +impl Stream for PollingBlockStream { + type Item = Result, BlockStreamError>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let result = loop { diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index aa580b03f90..58b0e23ac2d 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -8,7 +8,7 @@ use graph::blockchain::{ }; use graph::cheap_clone::CheapClone; use graph::components::network_provider::ChainName; -use graph::components::store::{DeploymentCursorTracker, SourceableStore}; +use graph::components::store::{ChainHeadStore, DeploymentCursorTracker, SourceableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::env::EnvVars; use graph::firehose::FirehoseEndpoint; @@ -29,7 +29,7 @@ use graph::{ }, components::store::DeploymentLocator, firehose::{self as firehose, ForkStep}, - prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory}, + prelude::{async_trait, o, BlockNumber, Error, Logger, LoggerFactory}, }; use prost::Message; use std::collections::BTreeSet; @@ -165,7 +165,7 @@ pub struct Chain { logger_factory: LoggerFactory, name: ChainName, client: Arc>, - chain_store: Arc, + chain_head_store: Arc, metrics_registry: Arc, block_stream_builder: Arc>, prefer_substreams: bool, @@ -183,7 +183,7 @@ impl BlockchainBuilder for BasicBlockchainBuilder { Chain { logger_factory: self.logger_factory, name: self.name, - chain_store: self.chain_store, + chain_head_store: self.chain_head_store, client: Arc::new(ChainClient::new_firehose(self.firehose_endpoints)), metrics_registry: self.metrics_registry, block_stream_builder: Arc::new(NearStreamBuilder {}), @@ -275,8 +275,8 @@ impl Blockchain for Chain { unimplemented!("This chain does not support Dynamic Data Sources. is_refetch_block_required always returns false, this shouldn't be called.") } - fn chain_store(&self) -> Arc { - self.chain_store.clone() + async fn chain_head_ptr(&self) -> Result, Error> { + self.chain_head_store.cheap_clone().chain_head_ptr().await } async fn block_pointer_from_number( @@ -302,7 +302,7 @@ impl Blockchain for Chain { async fn block_ingestor(&self) -> anyhow::Result> { let ingestor = FirehoseBlockIngestor::::new( - self.chain_store.cheap_clone(), + self.chain_head_store.cheap_clone(), self.chain_client(), self.logger_factory .component_logger("NearFirehoseBlockIngestor", None), diff --git a/chain/substreams/src/block_ingestor.rs b/chain/substreams/src/block_ingestor.rs index 69b16ecc869..f176f549647 100644 --- a/chain/substreams/src/block_ingestor.rs +++ b/chain/substreams/src/block_ingestor.rs @@ -8,6 +8,7 @@ use graph::blockchain::{ client::ChainClient, substreams_block_stream::SubstreamsBlockStream, BlockIngestor, }; use graph::components::network_provider::ChainName; +use graph::components::store::ChainHeadStore; use graph::prelude::MetricsRegistry; use graph::slog::trace; use graph::substreams::Package; @@ -15,7 +16,6 @@ use graph::tokio_stream::StreamExt; use graph::{ blockchain::block_stream::BlockStreamEvent, cheap_clone::CheapClone, - components::store::ChainStore, prelude::{async_trait, error, info, DeploymentHash, Logger}, util::backoff::ExponentialBackoff, }; @@ -26,7 +26,7 @@ const SUBSTREAMS_HEAD_TRACKER_BYTES: &[u8; 89935] = include_bytes!( ); pub struct SubstreamsBlockIngestor { - chain_store: Arc, + chain_store: Arc, client: Arc>, logger: Logger, chain_name: ChainName, @@ -35,7 +35,7 @@ pub struct SubstreamsBlockIngestor { impl SubstreamsBlockIngestor { pub fn new( - chain_store: Arc, + chain_store: Arc, client: Arc>, logger: Logger, chain_name: ChainName, diff --git a/chain/substreams/src/chain.rs b/chain/substreams/src/chain.rs index a15dbb0f269..1c44d77bde1 100644 --- a/chain/substreams/src/chain.rs +++ b/chain/substreams/src/chain.rs @@ -7,7 +7,7 @@ use graph::blockchain::{ NoopRuntimeAdapter, TriggerFilterWrapper, }; use graph::components::network_provider::ChainName; -use graph::components::store::{DeploymentCursorTracker, SourceableStore}; +use graph::components::store::{ChainHeadStore, DeploymentCursorTracker, SourceableStore}; use graph::env::EnvVars; use graph::prelude::{BlockHash, CheapClone, Entity, LoggerFactory, MetricsRegistry}; use graph::schema::EntityKey; @@ -19,7 +19,7 @@ use graph::{ }, components::store::DeploymentLocator, data::subgraph::UnifiedMappingApiVersion, - prelude::{async_trait, BlockNumber, ChainStore}, + prelude::{async_trait, BlockNumber}, slog::Logger, }; @@ -65,7 +65,7 @@ impl blockchain::Block for Block { } pub struct Chain { - chain_store: Arc, + chain_head_store: Arc, block_stream_builder: Arc>, chain_id: ChainName, @@ -79,7 +79,7 @@ impl Chain { logger_factory: LoggerFactory, chain_client: Arc>, metrics_registry: Arc, - chain_store: Arc, + chain_store: Arc, block_stream_builder: Arc>, chain_id: ChainName, ) -> Self { @@ -87,7 +87,7 @@ impl Chain { logger_factory, client: chain_client, metrics_registry, - chain_store, + chain_head_store: chain_store, block_stream_builder, chain_id, } @@ -167,8 +167,8 @@ impl Blockchain for Chain { unimplemented!("This chain does not support Dynamic Data Sources. is_refetch_block_required always returns false, this shouldn't be called.") } - fn chain_store(&self) -> Arc { - self.chain_store.clone() + async fn chain_head_ptr(&self) -> Result, Error> { + self.chain_head_store.cheap_clone().chain_head_ptr().await } async fn block_pointer_from_number( @@ -195,7 +195,7 @@ impl Blockchain for Chain { async fn block_ingestor(&self) -> anyhow::Result> { Ok(Box::new(SubstreamsBlockIngestor::new( - self.chain_store.cheap_clone(), + self.chain_head_store.cheap_clone(), self.client.cheap_clone(), self.logger_factory .component_logger("SubstreamsBlockIngestor", None), @@ -211,13 +211,13 @@ impl blockchain::BlockchainBuilder for BasicBlockchainBuilder { let BasicBlockchainBuilder { logger_factory, name, - chain_store, + chain_head_store, firehose_endpoints, metrics_registry, } = self; Chain { - chain_store, + chain_head_store, block_stream_builder: Arc::new(crate::BlockStreamBuilder::new()), logger_factory, client: Arc::new(ChainClient::new_firehose(firehose_endpoints)), diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index a8f3eeed441..237b4cb472e 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -6,7 +6,6 @@ use crate::subgraph::inputs::IndexingInputs; use crate::subgraph::state::IndexingState; use crate::subgraph::stream::new_block_stream; use anyhow::Context as _; -use async_trait::async_trait; use graph::blockchain::block_stream::{ BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor, }; @@ -1141,7 +1140,7 @@ where if cached_head_ptr.is_none() || close_to_chain_head(&block_ptr, &cached_head_ptr, CAUGHT_UP_DISTANCE) { - self.state.cached_head_ptr = self.inputs.chain.chain_store().chain_head_ptr().await?; + self.state.cached_head_ptr = self.inputs.chain.chain_head_ptr().await?; } let is_caught_up = close_to_chain_head(&block_ptr, &self.state.cached_head_ptr, CAUGHT_UP_DISTANCE); @@ -1162,6 +1161,7 @@ where &mut self, event: Option, CancelableError>>, ) -> Result { + let stopwatch = &self.metrics.stream.stopwatch; let action = match event { Some(Ok(BlockStreamEvent::ProcessWasmBlock( block_ptr, @@ -1170,11 +1170,7 @@ where handler, cursor, ))) => { - let _section = self - .metrics - .stream - .stopwatch - .start_section(PROCESS_WASM_BLOCK_SECTION_NAME); + let _section = stopwatch.start_section(PROCESS_WASM_BLOCK_SECTION_NAME); let res = self .handle_process_wasm_block(block_ptr.clone(), block_time, data, handler, cursor) .await; @@ -1182,19 +1178,11 @@ where self.handle_action(start, block_ptr, res).await? } Some(Ok(BlockStreamEvent::ProcessBlock(block, cursor))) => { - let _section = self - .metrics - .stream - .stopwatch - .start_section(PROCESS_BLOCK_SECTION_NAME); + let _section = stopwatch.start_section(PROCESS_BLOCK_SECTION_NAME); self.handle_process_block(block, cursor).await? } Some(Ok(BlockStreamEvent::Revert(revert_to_ptr, cursor))) => { - let _section = self - .metrics - .stream - .stopwatch - .start_section(HANDLE_REVERT_SECTION_NAME); + let _section = stopwatch.start_section(HANDLE_REVERT_SECTION_NAME); self.handle_revert(revert_to_ptr, cursor).await? } // Log and drop the errors from the block_stream @@ -1333,33 +1321,7 @@ impl Action { } } -#[async_trait] -trait StreamEventHandler { - async fn handle_process_wasm_block( - &mut self, - block_ptr: BlockPtr, - block_time: BlockTime, - block_data: Box<[u8]>, - handler: String, - cursor: FirehoseCursor, - ) -> Result; - async fn handle_process_block( - &mut self, - block: BlockWithTriggers, - cursor: FirehoseCursor, - ) -> Result; - async fn handle_revert( - &mut self, - revert_to_ptr: BlockPtr, - cursor: FirehoseCursor, - ) -> Result; - async fn handle_err(&mut self, err: CancelableError) - -> Result; - fn needs_restart(&self, revert_to_ptr: BlockPtr, subgraph_ptr: BlockPtr) -> bool; -} - -#[async_trait] -impl StreamEventHandler for SubgraphRunner +impl SubgraphRunner where C: Blockchain, T: RuntimeHostBuilder, @@ -1463,7 +1425,7 @@ where && !self.inputs.store.is_deployment_synced() && !close_to_chain_head( &block_ptr, - &self.inputs.chain.chain_store().chain_head_ptr().await?, + &self.inputs.chain.chain_head_ptr().await?, // The "skip ptr updates timer" is ignored when a subgraph is at most 1000 blocks // behind the chain head. 1000, diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 99f2dabd1ac..86f196ac99c 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -883,35 +883,6 @@ pub enum BlockStreamEvent { ProcessWasmBlock(BlockPtr, BlockTime, Box<[u8]>, String, FirehoseCursor), } -impl BlockStreamEvent { - pub fn block_ptr(&self) -> BlockPtr { - match self { - BlockStreamEvent::Revert(ptr, _) => ptr.clone(), - BlockStreamEvent::ProcessBlock(block, _) => block.ptr(), - BlockStreamEvent::ProcessWasmBlock(ptr, _, _, _, _) => ptr.clone(), - } - } -} - -impl Clone for BlockStreamEvent -where - C::TriggerData: Clone, -{ - fn clone(&self) -> Self { - match self { - Self::Revert(arg0, arg1) => Self::Revert(arg0.clone(), arg1.clone()), - Self::ProcessBlock(arg0, arg1) => Self::ProcessBlock(arg0.clone(), arg1.clone()), - Self::ProcessWasmBlock(arg0, arg1, arg2, arg3, arg4) => Self::ProcessWasmBlock( - arg0.clone(), - arg1.clone(), - arg2.clone(), - arg3.clone(), - arg4.clone(), - ), - } - } -} - #[derive(Clone)] pub struct BlockStreamMetrics { pub deployment_head: Box, diff --git a/graph/src/blockchain/builder.rs b/graph/src/blockchain/builder.rs index 07046d62e71..943586770c5 100644 --- a/graph/src/blockchain/builder.rs +++ b/graph/src/blockchain/builder.rs @@ -2,8 +2,11 @@ use tonic::async_trait; use super::Blockchain; use crate::{ - components::store::ChainStore, data::value::Word, env::EnvVars, firehose::FirehoseEndpoints, - prelude::LoggerFactory, prelude::MetricsRegistry, + components::store::ChainHeadStore, + data::value::Word, + env::EnvVars, + firehose::FirehoseEndpoints, + prelude::{LoggerFactory, MetricsRegistry}, }; use std::sync::Arc; @@ -12,7 +15,7 @@ use std::sync::Arc; pub struct BasicBlockchainBuilder { pub logger_factory: LoggerFactory, pub name: Word, - pub chain_store: Arc, + pub chain_head_store: Arc, pub firehose_endpoints: FirehoseEndpoints, pub metrics_registry: Arc, } diff --git a/graph/src/blockchain/firehose_block_ingestor.rs b/graph/src/blockchain/firehose_block_ingestor.rs index 026b83018d4..fbe35eab3a7 100644 --- a/graph/src/blockchain/firehose_block_ingestor.rs +++ b/graph/src/blockchain/firehose_block_ingestor.rs @@ -2,7 +2,7 @@ use std::{marker::PhantomData, sync::Arc, time::Duration}; use crate::{ blockchain::Block as BlockchainBlock, - components::store::ChainStore, + components::store::ChainHeadStore, firehose::{self, decode_firehose_block, HeaderOnly}, prelude::{error, info, Logger}, util::backoff::ExponentialBackoff, @@ -40,7 +40,7 @@ pub struct FirehoseBlockIngestor where M: prost::Message + BlockchainBlock + Default + 'static, { - chain_store: Arc, + chain_head_store: Arc, client: Arc>, logger: Logger, default_transforms: Vec, @@ -54,13 +54,13 @@ where M: prost::Message + BlockchainBlock + Default + 'static, { pub fn new( - chain_store: Arc, + chain_head_store: Arc, client: Arc>, logger: Logger, chain_name: ChainName, ) -> FirehoseBlockIngestor { FirehoseBlockIngestor { - chain_store, + chain_head_store, client, logger, phantom: PhantomData {}, @@ -78,7 +78,7 @@ where let mut backoff = ExponentialBackoff::new(Duration::from_millis(250), Duration::from_secs(30)); loop { - match self.chain_store.clone().chain_head_cursor() { + match self.chain_head_store.clone().chain_head_cursor() { Ok(cursor) => return cursor.unwrap_or_default(), Err(e) => { error!(self.logger, "Fetching chain head cursor failed: {:#}", e); @@ -149,7 +149,7 @@ where trace!(self.logger, "Received new block to ingest {}", block.ptr()); - self.chain_store + self.chain_head_store .clone() .set_chain_head(block, response.cursor.clone()) .await diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 430eb27bd85..01487c42113 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -2,7 +2,11 @@ use crate::{ bail, components::{ link_resolver::LinkResolver, - store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator, SourceableStore}, + network_provider::ChainName, + store::{ + BlockNumber, ChainHeadStore, ChainIdStore, DeploymentCursorTracker, DeploymentLocator, + SourceableStore, + }, subgraph::InstanceDSTemplateInfo, }, data::subgraph::UnifiedMappingApiVersion, @@ -438,7 +442,7 @@ impl Blockchain for MockBlockchain { todo!() } - fn chain_store(&self) -> std::sync::Arc { + async fn chain_head_ptr(&self) -> Result, Error> { todo!() } @@ -471,6 +475,23 @@ pub struct MockChainStore { pub blocks: BTreeMap>, } +#[async_trait] +impl ChainHeadStore for MockChainStore { + async fn chain_head_ptr(self: Arc) -> Result, Error> { + unimplemented!() + } + fn chain_head_cursor(&self) -> Result, Error> { + unimplemented!() + } + async fn set_chain_head( + self: Arc, + _block: Arc, + _cursor: String, + ) -> Result<(), Error> { + unimplemented!() + } +} + #[async_trait] impl ChainStore for MockChainStore { async fn block_ptrs_by_numbers( @@ -502,19 +523,6 @@ impl ChainStore for MockChainStore { ) -> Result, Error> { unimplemented!() } - async fn chain_head_ptr(self: Arc) -> Result, Error> { - unimplemented!() - } - fn chain_head_cursor(&self) -> Result, Error> { - unimplemented!() - } - async fn set_chain_head( - self: Arc, - _block: Arc, - _cursor: String, - ) -> Result<(), Error> { - unimplemented!() - } async fn blocks(self: Arc, _hashes: Vec) -> Result, Error> { unimplemented!() } @@ -562,7 +570,20 @@ impl ChainStore for MockChainStore { fn chain_identifier(&self) -> Result { unimplemented!() } - fn set_chain_identifier(&self, _ident: &ChainIdentifier) -> Result<(), Error> { + fn as_head_store(self: Arc) -> Arc { + self.clone() + } +} + +impl ChainIdStore for MockChainStore { + fn chain_identifier(&self, _name: &ChainName) -> Result { + unimplemented!() + } + fn set_chain_identifier( + &self, + _name: &ChainName, + _ident: &ChainIdentifier, + ) -> Result<(), Error> { unimplemented!() } } diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 25a7d147540..d651574a232 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -10,7 +10,6 @@ pub mod firehose_block_ingestor; pub mod firehose_block_stream; pub mod mock; mod noop_runtime_adapter; -pub mod polling_block_stream; pub mod substreams_block_stream; mod types; @@ -31,7 +30,7 @@ use crate::{ runtime::{gas::GasCounter, AscHeap, HostExportError}, }; use crate::{ - components::store::{BlockNumber, ChainStore}, + components::store::BlockNumber, prelude::{thiserror::Error, LinkResolver}, }; use anyhow::{anyhow, Context, Error}; @@ -196,7 +195,8 @@ pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static { unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error>; - fn chain_store(&self) -> Arc; + /// Return the pointer for the latest block that we are aware of + async fn chain_head_ptr(&self) -> Result, Error>; async fn block_pointer_from_number( &self, diff --git a/graph/src/components/network_provider/chain_identifier_store.rs b/graph/src/components/network_provider/chain_identifier_validator.rs similarity index 68% rename from graph/src/components/network_provider/chain_identifier_store.rs rename to graph/src/components/network_provider/chain_identifier_validator.rs index e6a4f916206..2b784b55a45 100644 --- a/graph/src/components/network_provider/chain_identifier_store.rs +++ b/graph/src/components/network_provider/chain_identifier_validator.rs @@ -1,14 +1,14 @@ -use anyhow::anyhow; +use std::sync::Arc; + use thiserror::Error; use crate::blockchain::BlockHash; use crate::blockchain::ChainIdentifier; use crate::components::network_provider::ChainName; -use crate::components::store::BlockStore; -use crate::components::store::ChainStore; +use crate::components::store::ChainIdStore; /// Additional requirements for stores that are necessary for provider checks. -pub trait ChainIdentifierStore: Send + Sync + 'static { +pub trait ChainIdentifierValidator: Send + Sync + 'static { /// Verifies that the chain identifier returned by the network provider /// matches the previously stored value. /// @@ -17,7 +17,7 @@ pub trait ChainIdentifierStore: Send + Sync + 'static { &self, chain_name: &ChainName, chain_identifier: &ChainIdentifier, - ) -> Result<(), ChainIdentifierStoreError>; + ) -> Result<(), ChainIdentifierValidationError>; /// Saves the provided identifier that will be used as the source of truth /// for future validations. @@ -25,11 +25,11 @@ pub trait ChainIdentifierStore: Send + Sync + 'static { &self, chain_name: &ChainName, chain_identifier: &ChainIdentifier, - ) -> Result<(), ChainIdentifierStoreError>; + ) -> Result<(), ChainIdentifierValidationError>; } #[derive(Debug, Error)] -pub enum ChainIdentifierStoreError { +pub enum ChainIdentifierValidationError { #[error("identifier not set for chain '{0}'")] IdentifierNotSet(ChainName), @@ -51,28 +51,33 @@ pub enum ChainIdentifierStoreError { Store(#[source] anyhow::Error), } -impl ChainIdentifierStore for B -where - C: ChainStore, - B: BlockStore, -{ +pub fn chain_id_validator(store: Arc) -> Arc { + Arc::new(ChainIdentifierStore::new(store)) +} + +pub(crate) struct ChainIdentifierStore { + store: Arc, +} + +impl ChainIdentifierStore { + pub fn new(store: Arc) -> Self { + Self { store } + } +} + +impl ChainIdentifierValidator for ChainIdentifierStore { fn validate_identifier( &self, chain_name: &ChainName, chain_identifier: &ChainIdentifier, - ) -> Result<(), ChainIdentifierStoreError> { - let chain_store = self.chain_store(&chain_name).ok_or_else(|| { - ChainIdentifierStoreError::Store(anyhow!( - "unable to get store for chain '{chain_name}'" - )) - })?; - - let store_identifier = chain_store - .chain_identifier() - .map_err(|err| ChainIdentifierStoreError::Store(err))?; + ) -> Result<(), ChainIdentifierValidationError> { + let store_identifier = self + .store + .chain_identifier(chain_name) + .map_err(|err| ChainIdentifierValidationError::Store(err))?; if store_identifier.is_default() { - return Err(ChainIdentifierStoreError::IdentifierNotSet( + return Err(ChainIdentifierValidationError::IdentifierNotSet( chain_name.clone(), )); } @@ -84,7 +89,7 @@ where // but it's possible that it will be created by Firehose. Firehose always returns "0" // for `net_version`, so we need to allow switching between the two. if store_identifier.net_version != "0" && chain_identifier.net_version != "0" { - return Err(ChainIdentifierStoreError::NetVersionMismatch { + return Err(ChainIdentifierValidationError::NetVersionMismatch { chain_name: chain_name.clone(), store_net_version: store_identifier.net_version, chain_net_version: chain_identifier.net_version.clone(), @@ -93,7 +98,7 @@ where } if store_identifier.genesis_block_hash != chain_identifier.genesis_block_hash { - return Err(ChainIdentifierStoreError::GenesisBlockHashMismatch { + return Err(ChainIdentifierValidationError::GenesisBlockHashMismatch { chain_name: chain_name.clone(), store_genesis_block_hash: store_identifier.genesis_block_hash, chain_genesis_block_hash: chain_identifier.genesis_block_hash.clone(), @@ -107,15 +112,9 @@ where &self, chain_name: &ChainName, chain_identifier: &ChainIdentifier, - ) -> Result<(), ChainIdentifierStoreError> { - let chain_store = self.chain_store(&chain_name).ok_or_else(|| { - ChainIdentifierStoreError::Store(anyhow!( - "unable to get store for chain '{chain_name}'" - )) - })?; - - chain_store - .set_chain_identifier(chain_identifier) - .map_err(|err| ChainIdentifierStoreError::Store(err)) + ) -> Result<(), ChainIdentifierValidationError> { + self.store + .set_chain_identifier(chain_name, chain_identifier) + .map_err(|err| ChainIdentifierValidationError::Store(err)) } } diff --git a/graph/src/components/network_provider/genesis_hash_check.rs b/graph/src/components/network_provider/genesis_hash_check.rs index a8d547e79c0..0cfd8c6d1b0 100644 --- a/graph/src/components/network_provider/genesis_hash_check.rs +++ b/graph/src/components/network_provider/genesis_hash_check.rs @@ -6,26 +6,34 @@ use slog::error; use slog::warn; use slog::Logger; -use crate::components::network_provider::ChainIdentifierStore; -use crate::components::network_provider::ChainIdentifierStoreError; +use crate::components::network_provider::chain_id_validator; +use crate::components::network_provider::ChainIdentifierValidationError; +use crate::components::network_provider::ChainIdentifierValidator; use crate::components::network_provider::ChainName; use crate::components::network_provider::NetworkDetails; use crate::components::network_provider::ProviderCheck; use crate::components::network_provider::ProviderCheckStatus; use crate::components::network_provider::ProviderName; +use crate::components::store::ChainIdStore; /// Requires providers to have the same network version and genesis hash as one /// previously stored in the database. pub struct GenesisHashCheck { - chain_identifier_store: Arc, + chain_identifier_store: Arc, } impl GenesisHashCheck { - pub fn new(chain_identifier_store: Arc) -> Self { + pub fn new(chain_identifier_store: Arc) -> Self { Self { chain_identifier_store, } } + + pub fn from_id_store(id_store: Arc) -> Self { + Self { + chain_identifier_store: chain_id_validator(id_store), + } + } } #[async_trait] @@ -62,7 +70,7 @@ impl ProviderCheck for GenesisHashCheck { .chain_identifier_store .validate_identifier(chain_name, &chain_identifier); - use ChainIdentifierStoreError::*; + use ChainIdentifierValidationError::*; match check_result { Ok(()) => ProviderCheckStatus::Valid, @@ -154,16 +162,16 @@ mod tests { #[derive(Default)] struct TestChainIdentifierStore { - validate_identifier_calls: Mutex>>, - update_identifier_calls: Mutex>>, + validate_identifier_calls: Mutex>>, + update_identifier_calls: Mutex>>, } impl TestChainIdentifierStore { - fn validate_identifier_call(&self, x: Result<(), ChainIdentifierStoreError>) { + fn validate_identifier_call(&self, x: Result<(), ChainIdentifierValidationError>) { self.validate_identifier_calls.lock().unwrap().push(x) } - fn update_identifier_call(&self, x: Result<(), ChainIdentifierStoreError>) { + fn update_identifier_call(&self, x: Result<(), ChainIdentifierValidationError>) { self.update_identifier_calls.lock().unwrap().push(x) } } @@ -181,12 +189,12 @@ mod tests { } #[async_trait] - impl ChainIdentifierStore for TestChainIdentifierStore { + impl ChainIdentifierValidator for TestChainIdentifierStore { fn validate_identifier( &self, _chain_name: &ChainName, _chain_identifier: &ChainIdentifier, - ) -> Result<(), ChainIdentifierStoreError> { + ) -> Result<(), ChainIdentifierValidationError> { self.validate_identifier_calls.lock().unwrap().remove(0) } @@ -194,7 +202,7 @@ mod tests { &self, _chain_name: &ChainName, _chain_identifier: &ChainIdentifier, - ) -> Result<(), ChainIdentifierStoreError> { + ) -> Result<(), ChainIdentifierValidationError> { self.update_identifier_calls.lock().unwrap().remove(0) } } @@ -288,10 +296,10 @@ mod tests { #[tokio::test] async fn check_temporary_failure_on_initial_chain_identifier_update_error() { let store = Arc::new(TestChainIdentifierStore::default()); - store.validate_identifier_call(Err(ChainIdentifierStoreError::IdentifierNotSet( + store.validate_identifier_call(Err(ChainIdentifierValidationError::IdentifierNotSet( "chain-1".into(), ))); - store.update_identifier_call(Err(ChainIdentifierStoreError::Store(anyhow!("error")))); + store.update_identifier_call(Err(ChainIdentifierValidationError::Store(anyhow!("error")))); let check = GenesisHashCheck::new(store); @@ -321,7 +329,7 @@ mod tests { #[tokio::test] async fn check_valid_on_initial_chain_identifier_update() { let store = Arc::new(TestChainIdentifierStore::default()); - store.validate_identifier_call(Err(ChainIdentifierStoreError::IdentifierNotSet( + store.validate_identifier_call(Err(ChainIdentifierValidationError::IdentifierNotSet( "chain-1".into(), ))); store.update_identifier_call(Ok(())); @@ -351,7 +359,7 @@ mod tests { #[tokio::test] async fn check_valid_when_stored_identifier_network_version_is_zero() { let store = Arc::new(TestChainIdentifierStore::default()); - store.validate_identifier_call(Err(ChainIdentifierStoreError::NetVersionMismatch { + store.validate_identifier_call(Err(ChainIdentifierValidationError::NetVersionMismatch { chain_name: "chain-1".into(), store_net_version: "0".to_owned(), chain_net_version: "1".to_owned(), @@ -382,7 +390,7 @@ mod tests { #[tokio::test] async fn check_fails_on_identifier_network_version_mismatch() { let store = Arc::new(TestChainIdentifierStore::default()); - store.validate_identifier_call(Err(ChainIdentifierStoreError::NetVersionMismatch { + store.validate_identifier_call(Err(ChainIdentifierValidationError::NetVersionMismatch { chain_name: "chain-1".into(), store_net_version: "2".to_owned(), chain_net_version: "1".to_owned(), @@ -413,11 +421,13 @@ mod tests { #[tokio::test] async fn check_fails_on_identifier_genesis_hash_mismatch() { let store = Arc::new(TestChainIdentifierStore::default()); - store.validate_identifier_call(Err(ChainIdentifierStoreError::GenesisBlockHashMismatch { - chain_name: "chain-1".into(), - store_genesis_block_hash: vec![2].into(), - chain_genesis_block_hash: vec![1].into(), - })); + store.validate_identifier_call(Err( + ChainIdentifierValidationError::GenesisBlockHashMismatch { + chain_name: "chain-1".into(), + store_genesis_block_hash: vec![2].into(), + chain_genesis_block_hash: vec![1].into(), + }, + )); let check = GenesisHashCheck::new(store); @@ -444,7 +454,8 @@ mod tests { #[tokio::test] async fn check_temporary_failure_on_store_errors() { let store = Arc::new(TestChainIdentifierStore::default()); - store.validate_identifier_call(Err(ChainIdentifierStoreError::Store(anyhow!("error")))); + store + .validate_identifier_call(Err(ChainIdentifierValidationError::Store(anyhow!("error")))); let check = GenesisHashCheck::new(store); diff --git a/graph/src/components/network_provider/mod.rs b/graph/src/components/network_provider/mod.rs index 6ca27bc86d3..d4023e4237d 100644 --- a/graph/src/components/network_provider/mod.rs +++ b/graph/src/components/network_provider/mod.rs @@ -1,12 +1,13 @@ -mod chain_identifier_store; +mod chain_identifier_validator; mod extended_blocks_check; mod genesis_hash_check; mod network_details; mod provider_check; mod provider_manager; -pub use self::chain_identifier_store::ChainIdentifierStore; -pub use self::chain_identifier_store::ChainIdentifierStoreError; +pub use self::chain_identifier_validator::chain_id_validator; +pub use self::chain_identifier_validator::ChainIdentifierValidationError; +pub use self::chain_identifier_validator::ChainIdentifierValidator; pub use self::extended_blocks_check::ExtendedBlocksCheck; pub use self::genesis_hash_check::GenesisHashCheck; pub use self::network_details::NetworkDetails; diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 7d6f2201879..5d98a2c249f 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -9,6 +9,7 @@ use super::*; use crate::blockchain::block_stream::{EntitySourceOperation, FirehoseCursor}; use crate::blockchain::{BlockTime, ChainIdentifier, ExtendedBlockPtr}; use crate::components::metrics::stopwatch::StopwatchMetrics; +use crate::components::network_provider::ChainName; use crate::components::subgraph::SubgraphVersionSwitchingMode; use crate::components::transaction_receipt; use crate::components::versions::ApiVersion; @@ -442,20 +443,55 @@ pub trait QueryStoreManager: Send + Sync + 'static { ) -> Result, QueryExecutionError>; } -pub trait BlockStore: Send + Sync + 'static { +pub trait BlockStore: ChainIdStore + Send + Sync + 'static { type ChainStore: ChainStore; - fn create_chain_store( - &self, - network: &str, - ident: ChainIdentifier, - ) -> anyhow::Result>; fn chain_store(&self, network: &str) -> Option>; } +/// An interface for tracking the chain head in the store used by most chain +/// implementations +#[async_trait] +pub trait ChainHeadStore: Send + Sync { + /// Get the current head block pointer for this chain. + /// Any changes to the head block pointer will be to a block with a larger block number, never + /// to a block with a smaller or equal block number. + /// + /// The head block pointer will be None on initial set up. + async fn chain_head_ptr(self: Arc) -> Result, Error>; + + /// Get the current head block cursor for this chain. + /// + /// The head block cursor will be None on initial set up. + fn chain_head_cursor(&self) -> Result, Error>; + + /// This method does actually three operations: + /// - Upserts received block into blocks table + /// - Update chain head block into networks table + /// - Update chain head cursor into networks table + async fn set_chain_head( + self: Arc, + block: Arc, + cursor: String, + ) -> Result<(), Error>; +} + +#[async_trait] +pub trait ChainIdStore: Send + Sync + 'static { + /// Return the chain identifier for this store. + fn chain_identifier(&self, chain_name: &ChainName) -> Result; + + /// Update the chain identifier for this store. + fn set_chain_identifier( + &self, + chain_name: &ChainName, + ident: &ChainIdentifier, + ) -> Result<(), Error>; +} + /// Common trait for blockchain store implementations. #[async_trait] -pub trait ChainStore: Send + Sync + 'static { +pub trait ChainStore: ChainHeadStore { /// Get a pointer to this blockchain's genesis block. fn genesis_block_ptr(&self) -> Result; @@ -485,28 +521,6 @@ pub trait ChainStore: Send + Sync + 'static { ancestor_count: BlockNumber, ) -> Result, Error>; - /// Get the current head block pointer for this chain. - /// Any changes to the head block pointer will be to a block with a larger block number, never - /// to a block with a smaller or equal block number. - /// - /// The head block pointer will be None on initial set up. - async fn chain_head_ptr(self: Arc) -> Result, Error>; - - /// Get the current head block cursor for this chain. - /// - /// The head block cursor will be None on initial set up. - fn chain_head_cursor(&self) -> Result, Error>; - - /// This method does actually three operations: - /// - Upserts received block into blocks table - /// - Update chain head block into networks table - /// - Update chain head cursor into networks table - async fn set_chain_head( - self: Arc, - block: Arc, - cursor: String, - ) -> Result<(), Error>; - /// Returns the blocks present in the store. async fn blocks( self: Arc, @@ -584,8 +598,9 @@ pub trait ChainStore: Send + Sync + 'static { /// Return the chain identifier for this store. fn chain_identifier(&self) -> Result; - /// Update the chain identifier for this store. - fn set_chain_identifier(&self, ident: &ChainIdentifier) -> Result<(), Error>; + /// Workaround for Rust issue #65991 that keeps us from using an + /// `Arc` as an `Arc` + fn as_head_store(self: Arc) -> Arc; } pub trait EthereumCallCache: Send + Sync + 'static { diff --git a/node/src/chain.rs b/node/src/chain.rs index e2325aa6c7a..624817717f0 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -16,8 +16,7 @@ use graph::blockchain::{ }; use graph::cheap_clone::CheapClone; use graph::components::network_provider::ChainName; -use graph::components::store::{BlockStore as _, ChainStore}; -use graph::data::store::NodeId; +use graph::components::store::{BlockStore as _, ChainHeadStore}; use graph::endpoint::EndpointMetrics; use graph::env::{EnvVars, ENV_VARS}; use graph::firehose::{FirehoseEndpoint, SubgraphLimit}; @@ -353,7 +352,6 @@ pub async fn create_ethereum_networks_for_chain( pub async fn networks_as_chains( config: &Arc, blockchain_map: &mut BlockchainMap, - node_id: &NodeId, logger: &Logger, networks: &Networks, store: Arc, @@ -407,7 +405,7 @@ pub async fn networks_as_chains( chain_id: ChainName, blockchain_map: &mut BlockchainMap, logger_factory: LoggerFactory, - chain_store: Arc, + chain_head_store: Arc, metrics_registry: Arc, ) { let substreams_endpoints = networks.substreams_endpoints(chain_id.clone()); @@ -421,7 +419,7 @@ pub async fn networks_as_chains( BasicBlockchainBuilder { logger_factory: logger_factory.clone(), name: chain_id.clone(), - chain_store, + chain_head_store, metrics_registry: metrics_registry.clone(), firehose_endpoints: substreams_endpoints, } @@ -441,7 +439,7 @@ pub async fn networks_as_chains( BasicBlockchainBuilder { logger_factory: logger_factory.clone(), name: chain_id.clone(), - chain_store: chain_store.cheap_clone(), + chain_head_store: chain_store.cheap_clone(), firehose_endpoints, metrics_registry: metrics_registry.clone(), } @@ -493,7 +491,6 @@ pub async fn networks_as_chains( let chain = ethereum::Chain::new( logger_factory.clone(), chain_id.clone(), - node_id.clone(), metrics_registry.clone(), chain_store.cheap_clone(), call_cache, @@ -531,7 +528,7 @@ pub async fn networks_as_chains( BasicBlockchainBuilder { logger_factory: logger_factory.clone(), name: chain_id.clone(), - chain_store: chain_store.cheap_clone(), + chain_head_store: chain_store.cheap_clone(), firehose_endpoints, metrics_registry: metrics_registry.clone(), } @@ -559,7 +556,7 @@ pub async fn networks_as_chains( BasicBlockchainBuilder { logger_factory: logger_factory.clone(), name: chain_id.clone(), - chain_store, + chain_head_store: chain_store, metrics_registry: metrics_registry.clone(), firehose_endpoints: substreams_endpoints, } diff --git a/node/src/main.rs b/node/src/main.rs index 6cd892079c1..0c5744513bb 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -290,7 +290,7 @@ async fn main_inner() { let mut provider_checks: Vec> = Vec::new(); if env_vars.genesis_validation_enabled { - provider_checks.push(Arc::new(network_provider::GenesisHashCheck::new( + provider_checks.push(Arc::new(network_provider::GenesisHashCheck::from_id_store( block_store.clone(), ))); } @@ -315,7 +315,6 @@ async fn main_inner() { let blockchain_map = network_adapters .blockchain_map( &env_vars, - &node_id, &logger, block_store, &logger_factory, diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index e1f460a7581..905568a5637 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -7,15 +7,18 @@ use graph::blockchain::BlockHash; use graph::blockchain::BlockPtr; use graph::blockchain::ChainIdentifier; use graph::cheap_clone::CheapClone; -use graph::components::network_provider::ChainIdentifierStore; use graph::components::network_provider::ChainName; +use graph::components::store::ChainIdStore; use graph::components::store::StoreError; use graph::prelude::BlockNumber; use graph::prelude::ChainStore as _; use graph::prelude::LightEthereumBlockExt; use graph::prelude::{anyhow, anyhow::bail}; use graph::slog::Logger; -use graph::{components::store::BlockStore as _, prelude::anyhow::Error}; +use graph::{ + components::store::BlockStore as _, components::store::ChainHeadStore as _, + prelude::anyhow::Error, +}; use graph_chain_ethereum::chain::BlockFinality; use graph_chain_ethereum::EthereumAdapter; use graph_chain_ethereum::EthereumAdapterTrait as _; @@ -161,7 +164,7 @@ pub fn remove(primary: ConnectionPool, store: Arc, name: String) -> pub async fn update_chain_genesis( networks: &Networks, coord: Arc, - store: Arc, + store: Arc, logger: &Logger, chain_id: ChainName, genesis_hash: BlockHash, @@ -185,7 +188,7 @@ pub async fn update_chain_genesis( // Update the local shard's genesis, whether or not it is the primary. // The chains table is replicated from the primary and keeps another genesis hash. // To keep those in sync we need to update the primary and then refresh the shard tables. - store.update_identifier( + store.set_chain_identifier( &chain_id, &ChainIdentifier { net_version: ident.net_version.clone(), @@ -193,10 +196,6 @@ pub async fn update_chain_genesis( }, )?; - // Update the primary public.chains - println!("Updating primary public.chains"); - store.set_chain_identifier(chain_id, &ident)?; - // Refresh the new values println!("Refresh mappings"); crate::manager::commands::database::remap(&coord, None, None, false).await?; diff --git a/node/src/manager/commands/provider_checks.rs b/node/src/manager/commands/provider_checks.rs index 7f4feca928e..298e797e934 100644 --- a/node/src/manager/commands/provider_checks.rs +++ b/node/src/manager/commands/provider_checks.rs @@ -1,7 +1,8 @@ use std::sync::Arc; use std::time::Duration; -use graph::components::network_provider::ChainIdentifierStore; +use graph::components::network_provider::chain_id_validator; +use graph::components::network_provider::ChainIdentifierValidator; use graph::components::network_provider::ChainName; use graph::components::network_provider::ExtendedBlocksCheck; use graph::components::network_provider::GenesisHashCheck; @@ -36,9 +37,10 @@ pub async fn execute( .providers_unchecked(chain_name) .unique_by(|x| x.provider_name()) { + let validator = chain_id_validator(store.clone()); match tokio::time::timeout( timeout, - run_checks(logger, chain_name, adapter, store.clone()), + run_checks(logger, chain_name, adapter, validator.clone()), ) .await { @@ -56,11 +58,9 @@ pub async fn execute( .providers_unchecked(chain_name) .unique_by(|x| x.provider_name()) { - match tokio::time::timeout( - timeout, - run_checks(logger, chain_name, adapter, store.clone()), - ) - .await + let validator = chain_id_validator(store.clone()); + match tokio::time::timeout(timeout, run_checks(logger, chain_name, adapter, validator)) + .await { Ok(result) => { errors.extend(result); @@ -76,9 +76,10 @@ pub async fn execute( .providers_unchecked(chain_name) .unique_by(|x| x.provider_name()) { + let validator = chain_id_validator(store.clone()); match tokio::time::timeout( timeout, - run_checks(logger, chain_name, adapter, store.clone()), + run_checks(logger, chain_name, adapter, validator.clone()), ) .await { @@ -107,7 +108,7 @@ async fn run_checks( logger: &Logger, chain_name: &ChainName, adapter: &dyn NetworkDetails, - store: Arc, + store: Arc, ) -> Vec { let provider_name = adapter.provider_name(); diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 2c6bfdcb148..f79a0497477 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -10,7 +10,7 @@ use crate::MetricsContext; use graph::anyhow::bail; use graph::cheap_clone::CheapClone; use graph::components::link_resolver::{ArweaveClient, FileSizeLimit}; -use graph::components::network_provider::ChainIdentifierStore; +use graph::components::network_provider::chain_id_validator; use graph::components::store::DeploymentLocator; use graph::components::subgraph::Settings; use graph::endpoint::EndpointMetrics; @@ -98,8 +98,7 @@ pub async fn run( Vec::new(); if env_vars.genesis_validation_enabled { - let store: Arc = network_store.block_store(); - + let store = chain_id_validator(network_store.block_store()); provider_checks.push(Arc::new( graph::components::network_provider::GenesisHashCheck::new(store), )); @@ -130,7 +129,6 @@ pub async fn run( networks .blockchain_map( &env_vars, - &node_id, &logger, block_store, &logger_factory, diff --git a/node/src/network_setup.rs b/node/src/network_setup.rs index 55a4995eb6b..1a491a7bad1 100644 --- a/node/src/network_setup.rs +++ b/node/src/network_setup.rs @@ -20,7 +20,7 @@ use graph::{ log::factory::LoggerFactory, prelude::{ anyhow::{anyhow, Result}, - info, Logger, NodeId, + info, Logger, }, slog::{o, warn, Discard}, }; @@ -404,7 +404,6 @@ impl Networks { pub async fn blockchain_map( &self, config: &Arc, - node_id: &NodeId, logger: &Logger, store: Arc, logger_factory: &LoggerFactory, @@ -416,7 +415,6 @@ impl Networks { networks_as_chains( config, &mut bm, - node_id, logger, self, store, diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index 7974afe41db..2a2f19ebb2e 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -279,10 +279,10 @@ impl IndexNodeResolver { ); return Ok(r::Value::Null); }; - let chain_store = chain.chain_store(); + // let chain_store = chain.chain_store(); let call_cache = chain.call_cache(); - let (block_number, timestamp) = match chain_store.block_number(&block_hash).await { + let (block_number, timestamp) = match chain.block_number(&block_hash).await { Ok(Some((_, n, timestamp, _))) => (n, timestamp), Ok(None) => { error!( diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index d34915248b3..c3754c399af 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -10,13 +10,16 @@ use diesel::{ r2d2::{ConnectionManager, PooledConnection}, sql_query, ExpressionMethods as _, PgConnection, RunQueryDsl, }; -use graph::components::network_provider::ChainName; use graph::{ blockchain::ChainIdentifier, components::store::{BlockStore as BlockStoreTrait, QueryPermit}, prelude::{error, info, BlockNumber, BlockPtr, Logger, ENV_VARS}, slog::o, }; +use graph::{ + components::{network_provider::ChainName, store::ChainIdStore}, + prelude::ChainStore as _, +}; use graph::{internal_error, prelude::CheapClone}; use graph::{prelude::StoreError, util::timed_cache::TimedCache}; @@ -567,20 +570,11 @@ impl BlockStore { Ok(()) } -} - -impl BlockStoreTrait for BlockStore { - type ChainStore = ChainStore; - - fn chain_store(&self, network: &str) -> Option> { - self.store(network) - } - - fn create_chain_store( + pub fn create_chain_store( &self, network: &str, ident: ChainIdentifier, - ) -> anyhow::Result> { + ) -> anyhow::Result> { match self.store(network) { Some(chain_store) => { return Ok(chain_store); @@ -605,3 +599,49 @@ impl BlockStoreTrait for BlockStore { .map_err(anyhow::Error::from) } } + +impl BlockStoreTrait for BlockStore { + type ChainStore = ChainStore; + + fn chain_store(&self, network: &str) -> Option> { + self.store(network) + } +} + +impl ChainIdStore for BlockStore { + fn chain_identifier(&self, chain_name: &ChainName) -> Result { + let chain_store = self + .chain_store(&chain_name) + .ok_or_else(|| anyhow!("unable to get store for chain '{chain_name}'"))?; + + chain_store.chain_identifier() + } + + fn set_chain_identifier( + &self, + chain_name: &ChainName, + ident: &ChainIdentifier, + ) -> Result<(), anyhow::Error> { + use primary::chains as c; + + // Update the block shard first since that contains a copy from the primary + let chain_store = self + .chain_store(&chain_name) + .ok_or_else(|| anyhow!("unable to get store for chain '{chain_name}'"))?; + + chain_store.set_chain_identifier(ident)?; + + // Update the master copy in the primary + let primary_pool = self.pools.get(&*PRIMARY_SHARD).unwrap(); + let mut conn = primary_pool.get()?; + + diesel::update(c::table.filter(c::name.eq(chain_name.as_str()))) + .set(( + c::genesis_block_hash.eq(ident.genesis_block_hash.hash_hex()), + c::net_version.eq(&ident.net_version), + )) + .execute(&mut conn)?; + + Ok(()) + } +} diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index a94c44a8870..18e694a37bd 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -4,6 +4,7 @@ use diesel::prelude::*; use diesel::r2d2::{ConnectionManager, PooledConnection}; use diesel::sql_types::Text; use diesel::{insert_into, update}; +use graph::components::store::ChainHeadStore; use graph::data::store::ethereum::call; use graph::derive::CheapClone; use graph::env::ENV_VARS; @@ -1849,6 +1850,26 @@ impl ChainStore { ) } + pub(crate) fn set_chain_identifier(&self, ident: &ChainIdentifier) -> Result<(), Error> { + use public::ethereum_networks as n; + + let mut conn = self.pool.get()?; + + diesel::update(n::table.filter(n::name.eq(&self.chain))) + .set(( + n::genesis_block_hash.eq(ident.genesis_block_hash.hash_hex()), + n::net_version.eq(&ident.net_version), + )) + .execute(&mut conn)?; + + Ok(()) + } + + #[cfg(debug_assertions)] + pub fn set_chain_identifier_for_tests(&self, ident: &ChainIdentifier) -> Result<(), Error> { + self.set_chain_identifier(ident) + } + /// Store the given chain as the blocks for the `network` set the /// network's genesis block to `genesis_hash`, and head block to /// `null` @@ -1972,6 +1993,100 @@ fn json_block_to_block_ptr_ext(json_block: &JsonBlock) -> Result) -> Result, Error> { + use public::ethereum_networks::dsl::*; + + Ok(self + .cheap_clone() + .pool + .with_conn(move |conn, _| { + ethereum_networks + .select((head_block_hash, head_block_number)) + .filter(name.eq(&self.chain)) + .load::<(Option, Option)>(conn) + .map(|rows| { + rows.first() + .map(|(hash_opt, number_opt)| match (hash_opt, number_opt) { + (Some(hash), Some(number)) => Some( + ( + // FIXME: + // + // workaround for arweave + H256::from_slice(&hex::decode(hash).unwrap()[..32]), + *number, + ) + .into(), + ), + (None, None) => None, + _ => unreachable!(), + }) + .and_then(|opt: Option| opt) + }) + .map_err(|e| CancelableError::from(StoreError::from(e))) + }) + .await?) + } + + fn chain_head_cursor(&self) -> Result, Error> { + use public::ethereum_networks::dsl::*; + + ethereum_networks + .select(head_block_cursor) + .filter(name.eq(&self.chain)) + .load::>(&mut self.get_conn()?) + .map(|rows| { + rows.first() + .map(|cursor_opt| cursor_opt.as_ref().cloned()) + .and_then(|opt| opt) + }) + .map_err(Error::from) + } + + async fn set_chain_head( + self: Arc, + block: Arc, + cursor: String, + ) -> Result<(), Error> { + use public::ethereum_networks as n; + + let pool = self.pool.clone(); + let network = self.chain.clone(); + let storage = self.storage.clone(); + + let ptr = block.ptr(); + let hash = ptr.hash_hex(); + let number = ptr.number as i64; //block height + + //this will send an update via postgres, channel: chain_head_updates + self.chain_head_update_sender.send(&hash, number)?; + + pool.with_conn(move |conn, _| { + conn.transaction(|conn| -> Result<(), StoreError> { + storage + .upsert_block(conn, &network, block.as_ref(), true) + .map_err(CancelableError::from)?; + + update(n::table.filter(n::name.eq(&self.chain))) + .set(( + n::head_block_hash.eq(&hash), + n::head_block_number.eq(number), + n::head_block_cursor.eq(cursor), + )) + .execute(conn)?; + + Ok(()) + }) + .map_err(CancelableError::from) + }) + .await?; + + Ok(()) + } +} + #[async_trait] impl ChainStoreTrait for ChainStore { fn genesis_block_ptr(&self) -> Result { @@ -2075,96 +2190,6 @@ impl ChainStoreTrait for ChainStore { Ok(missing) } - async fn chain_head_ptr(self: Arc) -> Result, Error> { - use public::ethereum_networks::dsl::*; - - Ok(self - .cheap_clone() - .pool - .with_conn(move |conn, _| { - ethereum_networks - .select((head_block_hash, head_block_number)) - .filter(name.eq(&self.chain)) - .load::<(Option, Option)>(conn) - .map(|rows| { - rows.first() - .map(|(hash_opt, number_opt)| match (hash_opt, number_opt) { - (Some(hash), Some(number)) => Some( - ( - // FIXME: - // - // workaround for arweave - H256::from_slice(&hex::decode(hash).unwrap()[..32]), - *number, - ) - .into(), - ), - (None, None) => None, - _ => unreachable!(), - }) - .and_then(|opt: Option| opt) - }) - .map_err(|e| CancelableError::from(StoreError::from(e))) - }) - .await?) - } - - fn chain_head_cursor(&self) -> Result, Error> { - use public::ethereum_networks::dsl::*; - - ethereum_networks - .select(head_block_cursor) - .filter(name.eq(&self.chain)) - .load::>(&mut self.get_conn()?) - .map(|rows| { - rows.first() - .map(|cursor_opt| cursor_opt.as_ref().cloned()) - .and_then(|opt| opt) - }) - .map_err(Error::from) - } - - async fn set_chain_head( - self: Arc, - block: Arc, - cursor: String, - ) -> Result<(), Error> { - use public::ethereum_networks as n; - - let pool = self.pool.clone(); - let network = self.chain.clone(); - let storage = self.storage.clone(); - - let ptr = block.ptr(); - let hash = ptr.hash_hex(); - let number = ptr.number as i64; //block height - - //this will send an update via postgres, channel: chain_head_updates - self.chain_head_update_sender.send(&hash, number)?; - - pool.with_conn(move |conn, _| { - conn.transaction(|conn| -> Result<(), StoreError> { - storage - .upsert_block(conn, &network, block.as_ref(), true) - .map_err(CancelableError::from)?; - - update(n::table.filter(n::name.eq(&self.chain))) - .set(( - n::head_block_hash.eq(&hash), - n::head_block_number.eq(number), - n::head_block_cursor.eq(cursor), - )) - .execute(conn)?; - - Ok(()) - }) - .map_err(CancelableError::from) - }) - .await?; - - Ok(()) - } - async fn block_ptrs_by_numbers( self: Arc, numbers: Vec, @@ -2496,21 +2521,6 @@ impl ChainStoreTrait for ChainStore { .await } - fn set_chain_identifier(&self, ident: &ChainIdentifier) -> Result<(), Error> { - use public::ethereum_networks as n; - - let mut conn = self.pool.get()?; - - diesel::update(n::table.filter(n::name.eq(&self.chain))) - .set(( - n::genesis_block_hash.eq(ident.genesis_block_hash.hash_hex()), - n::net_version.eq(&ident.net_version), - )) - .execute(&mut conn)?; - - Ok(()) - } - fn chain_identifier(&self) -> Result { let mut conn = self.pool.get()?; use public::ethereum_networks as n; @@ -2524,6 +2534,10 @@ impl ChainStoreTrait for ChainStore { genesis_block_hash, }) } + + fn as_head_store(self: Arc) -> Arc { + self.clone() + } } mod recent_blocks_cache { diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index b191916a9b6..96da86a7b64 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -636,7 +636,7 @@ fn build_store() -> (Arc, ConnectionPool, Config, Arc { - cs.set_chain_identifier(&ChainIdentifier { + cs.set_chain_identifier_for_tests(&ChainIdentifier { net_version: NETWORK_VERSION.to_string(), genesis_block_hash: GENESIS_PTR.hash.clone(), }) diff --git a/store/test-store/tests/postgres/chain_head.rs b/store/test-store/tests/postgres/chain_head.rs index c93eb624e59..cf501f1438f 100644 --- a/store/test-store/tests/postgres/chain_head.rs +++ b/store/test-store/tests/postgres/chain_head.rs @@ -15,7 +15,10 @@ use graph::prelude::{serde_json as json, EthereumBlock}; use graph::prelude::{BlockNumber, QueryStoreManager, QueryTarget}; use graph::{cheap_clone::CheapClone, prelude::web3::types::H160}; use graph::{components::store::BlockStore as _, prelude::DeploymentHash}; -use graph::{components::store::ChainStore as _, prelude::EthereumCallCache as _}; +use graph::{ + components::store::ChainHeadStore as _, components::store::ChainStore as _, + prelude::EthereumCallCache as _, +}; use graph_store_postgres::Store as DieselStore; use graph_store_postgres::{layout_for_tests::FAKE_NETWORK_SHARED, ChainStore as DieselChainStore}; diff --git a/tests/src/fixture/ethereum.rs b/tests/src/fixture/ethereum.rs index d93ac25c235..ddf950bd273 100644 --- a/tests/src/fixture/ethereum.rs +++ b/tests/src/fixture/ethereum.rs @@ -40,7 +40,6 @@ pub async fn chain( mock_registry, chain_store, firehose_endpoints, - node_id, } = CommonChainConfig::new(test_name, stores).await; let client = Arc::new(ChainClient::::new_firehose(firehose_endpoints)); @@ -53,7 +52,6 @@ pub async fn chain( let chain = Chain::new( logger_factory, stores.network_name.clone(), - node_id, mock_registry, chain_store.cheap_clone(), chain_store, diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 217c7f705b6..cc99e406c1c 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -20,7 +20,7 @@ use graph::cheap_clone::CheapClone; use graph::components::link_resolver::{ArweaveClient, ArweaveResolver, FileSizeLimit}; use graph::components::metrics::MetricsRegistry; use graph::components::network_provider::ChainName; -use graph::components::store::{BlockStore, DeploymentLocator, EthereumCallCache, SourceableStore}; +use graph::components::store::{DeploymentLocator, EthereumCallCache, SourceableStore}; use graph::components::subgraph::Settings; use graph::data::graphql::load_manager::LoadManager; use graph::data::query::{Query, QueryTarget}; @@ -87,7 +87,6 @@ struct CommonChainConfig { mock_registry: Arc, chain_store: Arc, firehose_endpoints: FirehoseEndpoints, - node_id: NodeId, } impl CommonChainConfig { @@ -96,7 +95,6 @@ impl CommonChainConfig { let mock_registry = Arc::new(MetricsRegistry::mock()); let logger_factory = LoggerFactory::new(logger.cheap_clone(), None, mock_registry.clone()); let chain_store = stores.chain_store.cheap_clone(); - let node_id = NodeId::new(NODE_ID).unwrap(); let firehose_endpoints = FirehoseEndpoints::for_testing(vec![Arc::new(FirehoseEndpoint::new( @@ -116,7 +114,6 @@ impl CommonChainConfig { mock_registry, chain_store, firehose_endpoints, - node_id, } } }