Skip to content

Various simplifcations around chain handling #6008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use graph::blockchain::{
};
use graph::cheap_clone::CheapClone;
use graph::components::network_provider::ChainName;
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
use graph::components::store::{ChainHeadStore, DeploymentCursorTracker, SourceableStore};
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::EnvVars;
use graph::firehose::FirehoseEndpoint;
Expand All @@ -24,7 +24,7 @@ use graph::{
},
components::store::DeploymentLocator,
firehose::{self as firehose, ForkStep},
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
prelude::{async_trait, o, BlockNumber, Error, Logger, LoggerFactory},
};
use prost::Message;
use std::collections::BTreeSet;
Expand All @@ -46,7 +46,7 @@ pub struct Chain {
logger_factory: LoggerFactory,
name: ChainName,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
chain_head_store: Arc<dyn ChainHeadStore>,
metrics_registry: Arc<MetricsRegistry>,
}

Expand All @@ -63,7 +63,7 @@ impl BlockchainBuilder<Chain> for BasicBlockchainBuilder {
logger_factory: self.logger_factory,
name: self.name,
client: Arc::new(ChainClient::<Chain>::new_firehose(self.firehose_endpoints)),
chain_store: self.chain_store,
chain_head_store: self.chain_head_store,
metrics_registry: self.metrics_registry,
}
}
Expand Down Expand Up @@ -155,8 +155,8 @@ impl Blockchain for Chain {
)))
}

fn chain_store(&self) -> Arc<dyn ChainStore> {
self.chain_store.clone()
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
self.chain_head_store.cheap_clone().chain_head_ptr().await
}

async fn block_pointer_from_number(
Expand All @@ -182,7 +182,7 @@ impl Blockchain for Chain {

async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
let ingestor = FirehoseBlockIngestor::<crate::Block, Self>::new(
self.chain_store.cheap_clone(),
self.chain_head_store.cheap_clone(),
self.chain_client(),
self.logger_factory
.component_logger("ArweaveFirehoseBlockIngestor", None),
Expand Down
27 changes: 14 additions & 13 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use graph::firehose::{FirehoseEndpoint, ForkStep};
use graph::futures03::TryStreamExt;
use graph::prelude::{
retry, BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock,
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry, StoreError,
};
use graph::schema::InputSchema;
use graph::slog::{debug, error, trace, warn};
Expand All @@ -25,7 +25,6 @@ use graph::{
FirehoseMapper as FirehoseMapperTrait, TriggersAdapter as TriggersAdapterTrait,
},
firehose_block_stream::FirehoseBlockStream,
polling_block_stream::PollingBlockStream,
Block, BlockPtr, Blockchain, ChainHeadUpdateListener, IngestorError,
RuntimeAdapter as RuntimeAdapterTrait, TriggerFilter as _,
},
Expand All @@ -34,7 +33,7 @@ use graph::{
firehose,
prelude::{
async_trait, o, serde_json as json, BlockNumber, ChainStore, EthereumBlockWithCalls,
Logger, LoggerFactory, NodeId,
Logger, LoggerFactory,
},
};
use prost::Message;
Expand All @@ -49,6 +48,7 @@ use crate::data_source::DataSourceTemplate;
use crate::data_source::UnresolvedDataSourceTemplate;
use crate::ingestor::PollingBlockIngestor;
use crate::network::EthereumNetworkAdapters;
use crate::polling_block_stream::PollingBlockStream;
use crate::runtime::runtime_adapter::eth_call_gas;
use crate::{
adapter::EthereumAdapter as _,
Expand Down Expand Up @@ -175,7 +175,6 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
.logger_factory
.subgraph_logger(&deployment)
.new(o!("component" => "BlockStream"));
let chain_store = chain.chain_store();
let chain_head_update_stream = chain
.chain_head_update_listener
.subscribe(chain.name.to_string(), logger.clone());
Expand Down Expand Up @@ -213,10 +212,8 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
};

Ok(Box::new(PollingBlockStream::new(
chain_store,
chain_head_update_stream,
Arc::new(adapter),
chain.node_id.clone(),
deployment.hash,
filter,
start_blocks,
Expand Down Expand Up @@ -336,7 +333,6 @@ impl RuntimeAdapterBuilder for EthereumRuntimeAdapterBuilder {
pub struct Chain {
logger_factory: LoggerFactory,
pub name: ChainName,
node_id: NodeId,
registry: Arc<MetricsRegistry>,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
Expand All @@ -363,7 +359,6 @@ impl Chain {
pub fn new(
logger_factory: LoggerFactory,
name: ChainName,
node_id: NodeId,
registry: Arc<MetricsRegistry>,
chain_store: Arc<dyn ChainStore>,
call_cache: Arc<dyn EthereumCallCache>,
Expand All @@ -381,7 +376,6 @@ impl Chain {
Chain {
logger_factory,
name,
node_id,
registry,
client,
chain_store,
Expand All @@ -403,6 +397,13 @@ impl Chain {
self.call_cache.clone()
}

pub async fn block_number(
&self,
hash: &BlockHash,
) -> Result<Option<(String, BlockNumber, Option<u64>, Option<BlockHash>)>, StoreError> {
self.chain_store.block_number(hash).await
}

// TODO: This is only used to build the block stream which could prolly
// be moved to the chain itself and return a block stream future that the
// caller can spawn.
Expand Down Expand Up @@ -507,8 +508,8 @@ impl Blockchain for Chain {
}
}

fn chain_store(&self) -> Arc<dyn ChainStore> {
self.chain_store.clone()
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
self.chain_store.cheap_clone().chain_head_ptr().await
}

async fn block_pointer_from_number(
Expand Down Expand Up @@ -578,7 +579,7 @@ impl Blockchain for Chain {
let ingestor: Box<dyn BlockIngestor> = match self.chain_client().as_ref() {
ChainClient::Firehose(_) => {
let ingestor = FirehoseBlockIngestor::<HeaderOnlyBlock, Self>::new(
self.chain_store.cheap_clone(),
self.chain_store.cheap_clone().as_head_store(),
self.chain_client(),
self.logger_factory
.component_logger("EthereumFirehoseBlockIngestor", None),
Expand Down Expand Up @@ -615,7 +616,7 @@ impl Blockchain for Chain {
logger,
graph::env::ENV_VARS.reorg_threshold(),
self.chain_client(),
self.chain_store().cheap_clone(),
self.chain_store.cheap_clone(),
self.polling_ingestor_interval,
self.name.clone(),
)?)
Expand Down
1 change: 1 addition & 0 deletions chain/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod data_source;
mod env;
mod ethereum_adapter;
mod ingestor;
mod polling_block_stream;
pub mod runtime;
mod transport;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
use anyhow::Error;
use futures03::{stream::Stream, Future, FutureExt};
use anyhow::{anyhow, Error};
use graph::tokio;
use std::cmp;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use super::block_stream::{
use graph::blockchain::block_stream::{
BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, ChainHeadUpdateStream,
FirehoseCursor, TriggersAdapterWrapper, BUFFERED_BLOCK_STREAM_SIZE,
};
use super::{Block, BlockPtr, Blockchain, TriggerFilterWrapper};
use graph::blockchain::{Block, BlockPtr, TriggerFilterWrapper};
use graph::futures03::{stream::Stream, Future, FutureExt};
use graph::prelude::{DeploymentHash, BLOCK_NUMBER_MAX};
use graph::slog::{debug, info, trace, warn, Logger};

use crate::components::store::BlockNumber;
use crate::data::subgraph::UnifiedMappingApiVersion;
use crate::prelude::*;
use graph::components::store::BlockNumber;
use graph::data::subgraph::UnifiedMappingApiVersion;

use crate::Chain;

// A high number here forces a slow start.
const STARTING_PREVIOUS_TRIGGERS_PER_BLOCK: f64 = 1_000_000.0;

enum BlockStreamState<C>
where
C: Blockchain,
{
enum BlockStreamState {
/// Starting or restarting reconciliation.
///
/// Valid next states: Reconciliation
Expand All @@ -32,13 +33,13 @@ where
/// The BlockStream is reconciling the subgraph store state with the chain store state.
///
/// Valid next states: YieldingBlocks, Idle, BeginReconciliation (in case of revert)
Reconciliation(Pin<Box<dyn Future<Output = Result<NextBlocks<C>, Error>> + Send>>),
Reconciliation(Pin<Box<dyn Future<Output = Result<NextBlocks, Error>> + Send>>),

/// The BlockStream is emitting blocks that must be processed in order to bring the subgraph
/// store up to date with the chain store.
///
/// Valid next states: BeginReconciliation
YieldingBlocks(Box<VecDeque<BlockWithTriggers<C>>>),
YieldingBlocks(Box<VecDeque<BlockWithTriggers<Chain>>>),

/// The BlockStream experienced an error and is pausing before attempting to produce
/// blocks again.
Expand All @@ -55,16 +56,13 @@ where

/// A single next step to take in reconciling the state of the subgraph store with the state of the
/// chain store.
enum ReconciliationStep<C>
where
C: Blockchain,
{
enum ReconciliationStep {
/// Revert(to) the block the subgraph should be reverted to, so it becomes the new subgraph
/// head.
Revert(BlockPtr),

/// Move forwards, processing one or more blocks. Second element is the block range size.
ProcessDescendantBlocks(Vec<BlockWithTriggers<C>>, BlockNumber),
ProcessDescendantBlocks(Vec<BlockWithTriggers<Chain>>, BlockNumber),

/// This step is a no-op, but we need to check again for a next step.
Retry,
Expand All @@ -74,18 +72,13 @@ where
Done,
}

struct PollingBlockStreamContext<C>
where
C: Blockchain,
{
chain_store: Arc<dyn ChainStore>,
adapter: Arc<TriggersAdapterWrapper<C>>,
node_id: NodeId,
struct PollingBlockStreamContext {
adapter: Arc<TriggersAdapterWrapper<Chain>>,
subgraph_id: DeploymentHash,
// This is not really a block number, but the (unsigned) difference
// between two block numbers
reorg_threshold: BlockNumber,
filter: Arc<TriggerFilterWrapper<C>>,
filter: Arc<TriggerFilterWrapper<Chain>>,
start_blocks: Vec<BlockNumber>,
logger: Logger,
previous_triggers_per_block: f64,
Expand All @@ -98,12 +91,10 @@ where
current_block: Option<BlockPtr>,
}

impl<C: Blockchain> Clone for PollingBlockStreamContext<C> {
impl Clone for PollingBlockStreamContext {
fn clone(&self) -> Self {
Self {
chain_store: self.chain_store.cheap_clone(),
adapter: self.adapter.clone(),
node_id: self.node_id.clone(),
subgraph_id: self.subgraph_id.clone(),
reorg_threshold: self.reorg_threshold,
filter: self.filter.clone(),
Expand All @@ -119,37 +110,29 @@ impl<C: Blockchain> Clone for PollingBlockStreamContext<C> {
}
}

pub struct PollingBlockStream<C: Blockchain> {
state: BlockStreamState<C>,
pub struct PollingBlockStream {
state: BlockStreamState,
consecutive_err_count: u32,
chain_head_update_stream: ChainHeadUpdateStream,
ctx: PollingBlockStreamContext<C>,
ctx: PollingBlockStreamContext,
}

// This is the same as `ReconciliationStep` but without retries.
enum NextBlocks<C>
where
C: Blockchain,
{
enum NextBlocks {
/// Blocks and range size
Blocks(VecDeque<BlockWithTriggers<C>>, BlockNumber),
Blocks(VecDeque<BlockWithTriggers<Chain>>, BlockNumber),

// The payload is block the subgraph should be reverted to, so it becomes the new subgraph head.
Revert(BlockPtr),
Done,
}

impl<C> PollingBlockStream<C>
where
C: Blockchain,
{
impl PollingBlockStream {
pub fn new(
chain_store: Arc<dyn ChainStore>,
chain_head_update_stream: ChainHeadUpdateStream,
adapter: Arc<TriggersAdapterWrapper<C>>,
node_id: NodeId,
adapter: Arc<TriggersAdapterWrapper<Chain>>,
subgraph_id: DeploymentHash,
filter: Arc<TriggerFilterWrapper<C>>,
filter: Arc<TriggerFilterWrapper<Chain>>,
start_blocks: Vec<BlockNumber>,
reorg_threshold: BlockNumber,
logger: Logger,
Expand All @@ -164,9 +147,7 @@ where
chain_head_update_stream,
ctx: PollingBlockStreamContext {
current_block: start_block,
chain_store,
adapter,
node_id,
subgraph_id,
reorg_threshold,
logger,
Expand All @@ -182,12 +163,9 @@ where
}
}

impl<C> PollingBlockStreamContext<C>
where
C: Blockchain,
{
impl PollingBlockStreamContext {
/// Perform reconciliation steps until there are blocks to yield or we are up-to-date.
async fn next_blocks(&self) -> Result<NextBlocks<C>, Error> {
async fn next_blocks(&self) -> Result<NextBlocks, Error> {
let ctx = self.clone();

loop {
Expand All @@ -212,7 +190,7 @@ where
}

/// Determine the next reconciliation step. Does not modify Store or ChainStore.
async fn get_next_step(&self) -> Result<ReconciliationStep<C>, Error> {
async fn get_next_step(&self) -> Result<ReconciliationStep, Error> {
let ctx = self.clone();
let start_blocks = self.start_blocks.clone();
let max_block_range_size = self.max_block_range_size;
Expand Down Expand Up @@ -498,14 +476,14 @@ where
}
}

impl<C: Blockchain> BlockStream<C> for PollingBlockStream<C> {
impl BlockStream<Chain> for PollingBlockStream {
fn buffer_size_hint(&self) -> usize {
BUFFERED_BLOCK_STREAM_SIZE
}
}

impl<C: Blockchain> Stream for PollingBlockStream<C> {
type Item = Result<BlockStreamEvent<C>, BlockStreamError>;
impl Stream for PollingBlockStream {
type Item = Result<BlockStreamEvent<Chain>, BlockStreamError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let result = loop {
Expand Down
Loading