Graph Node Ethereum
- 5 minsLoadManager
graph LR
subgraph LoadManager
lm_logger["Logger"]
lm_effort["HashMap<String, ShardEffort>"]
lm_blocked_queries["HashSet<u64>"]
lm_jailed_queries["RwLock<HashSet<QueryRef>>"]
lm_kill_state["HashMap<String, RwLock<KillState>>"]
lm_effort_gauge["Box<GaugeVec>"]
lm_query_counters["HashMap<CacheStatus, Counter>"]
lm_kill_rate_gauge["Box<GaugeVec>"]
end
subgraph Shard
s_effort["ShardEffort"]
s_effort_inner["ShardEffortInner"]
s_kill_state["KillState"]
end
LoadManager --> Shard
lm_effort --> s_effort
lm_kill_state --> s_kill_state
s_effort --> s_effort_inner
s_effort_inner --> s_effort
style LoadManager fill:#f9f,stroke:#333,stroke-width:4px
style Shard fill:#ccf,stroke:#333,stroke-width:2px
Storebuilder
Architechture Graph
graph LR
subgraph StoreBuilder
sb_config["Config"]
sb_node_id["NodeId"]
sb_fork_base["ForkBase"]
sb_metrics["MetricsRegistry"]
sb_logger["Logger"]
end
subgraph NetworkStore
ns_block_store["BlockStore"]
ns_subgraph_store["SubgraphStore"]
end
subgraph Subscription
sm["SubscriptionManager"]
chul["ChainHeadUpdateListener"]
end
subgraph Database
pp["PrimaryPool"]
end
subgraph Blockchain
bm["BlockchainMap"]
end
StoreBuilder --> NetworkStore
StoreBuilder --> Subscription
StoreBuilder --> Database
StoreBuilder --> Blockchain
NetworkStore --> ns_block_store
NetworkStore --> ns_subgraph_store
Subscription --> sm
Subscription --> chul
Database --> pp
Blockchain --> bm
style StoreBuilder fill:#f9f,stroke:#333,stroke-width:4px
style NetworkStore fill:#ccf,stroke:#333,stroke-width:2px
style Subscription fill:#cfc,stroke:#333,stroke-width:2px
style Database fill:#fcc,stroke:#333,stroke-width:2px
style Blockchain fill:#ccf,stroke:#333,stroke-width:2px
Sequence Diagram
sequenceDiagram
participant Main
participant StoreBuilder
participant NetworkStore
participant BlockStore
participant SubscriptionManager
participant ChainHeadUpdateListener
participant PrimaryPool
Main->>StoreBuilder: new(logger, node_id, config, fork_base, metrics_registry)
StoreBuilder->>NetworkStore: new(config.chain_ids())
StoreBuilder->>BlockStore: network_store.block_store()
StoreBuilder->>SubscriptionManager: subscription_manager()
StoreBuilder->>ChainHeadUpdateListener: chain_head_update_listener()
StoreBuilder->>PrimaryPool: primary_pool()
Main->>StoreBuilder: blockchain_map(env_vars, node_id, logger, block_store, logger_factory, metrics_registry, chain_head_update_listener)
StoreBuilder->>NetworkStore: block_store.cleanup_ethereum_shallow_blocks(eth_network_names)
Main->>StoreBuilder: graphql_runner(logger, network_store, subscription_manager, load_manager, graphql_metrics_registry)
Main->>StoreBuilder: subgraph_instance_manager(logger_factory, env_vars, network_store.subgraph_store(), blockchain_map, sg_count, metrics_registry, link_resolver, ipfs_service, arweave_service, static_filters)
Main->>StoreBuilder: subgraph_provider(logger_factory, link_resolver, subgraph_instance_manager, sg_count)
Main->>StoreBuilder: subgraph_registrar(logger_factory, link_resolver, subgraph_provider, network_store.subgraph_store(), subscription_manager, blockchain_map, node_id, version_switching_mode, subgraph_settings)
Main->>JsonRpcServer: serve(json_rpc_port, http_port, ws_port, subgraph_registrar, node_id, logger)
Main->>GraphQLQueryServer: start(http_port, ws_port)
Main->>GraphQLSubscriptionServer: serve(ws_port)
Main->>IndexNodeServer: start(index_node_port)
Main->>PrometheusMetricsServer: start(metrics_port)
main
flowchart TD
start["Start"]
env_vars["Load environment variables"]
opt["Parse command-line options"]
logger["Initialize logger"]
version["Log version information"]
poi_protection["Check POI protection"]
config["Load configuration"]
subgraph_settings["Load subgraph settings"]
check_config["Check configuration"]
node_id["Create node ID"]
subgraph_["Obtain subgraph-related arguments"]
ports["Obtain server ports"]
fork_base["Obtain fork base URL"]
elastic_config["Obtain Elasticsearch logging configuration"]
prometheus["Set up Prometheus registry"]
logger_factory["Create logger factory"]
ipfs_clients["Create IPFS clients"]
ipfs_service["Create IPFS service"]
arweave_resolver["Create Arweave resolver"]
arweave_service["Create Arweave service"]
link_resolver["Create link resolver"]
metrics_server["Create metrics server"]
endpoint_metrics["Create endpoint metrics"]
graphql_metrics_registry["Create GraphQL metrics registry"]
contention_logger["Create contention logger"]
expensive_queries["Read expensive queries"]
store_builder["Create store builder"]
launch_services["Launch services"]
end_["END"]
start --> env_vars
env_vars --> opt
opt --> logger
logger --> version
version --> poi_protection
poi_protection --> config
config --> subgraph_settings
subgraph_settings --> check_config
check_config --> node_id
node_id --> subgraph_
subgraph_ --> ports
ports --> fork_base
fork_base --> elastic_config
elastic_config --> prometheus
prometheus --> logger_factory
logger_factory --> ipfs_clients
ipfs_clients --> ipfs_service
ipfs_service --> arweave_resolver
arweave_resolver --> arweave_service
arweave_service --> link_resolver
link_resolver --> metrics_server
metrics_server --> endpoint_metrics
endpoint_metrics --> graphql_metrics_registry
graphql_metrics_registry --> contention_logger
contention_logger --> expensive_queries
expensive_queries --> store_builder
store_builder --> launch_services
launch_services --> end_
launch services
flowchart TD
start["Start"]
subscription_manager["Get subscription manager"]
chain_head_update_listener["Get chain head update listener"]
primary_pool["Get primary pool"]
network_store["Get network store"]
block_store["Get block store"]
validator["Create validator"]
network_adapters["Create network adapters"]
blockchain_map["Create blockchain map"]
cleanup_ethereum_shallow_blocks["Clean up Ethereum shallow blocks"]
blockchain_map_arc["Create Arc<blockchain_map>"]
shards["Get shards"]
load_manager["Create load manager"]
graphql_runner["Create GraphQL runner"]
graphql_server["Create GraphQL server"]
subscription_server["Create subscription server"]
index_node_server["Create index node server"]
block_ingestors["Start block ingestors"]
job_runner["Create job runner"]
register_store_jobs["Register store jobs"]
static_filters["Get static filters"]
subgraph_count["Create subgraph count metric"]
subgraph_instance_manager["Create subgraph instance manager"]
subgraph_provider["Create subgraph provider"]
version_switching_mode["Get version switching mode"]
subgraph_registrar["Create subgraph registrar"]
spawn_subgraph_registrar["Spawn subgraph registrar"]
json_rpc_server["Start JSON-RPC server"]
add_cli_subgraph["Add CLI subgraph"]
spawn_graphql_server["Spawn GraphQL server"]
spawn_subscription_server["Spawn subscription server"]
spawn_index_node_server["Spawn index node server"]
spawn_metrics_server["Spawn metrics server"]
start --> subscription_manager
subscription_manager --> chain_head_update_listener
chain_head_update_listener --> primary_pool
primary_pool --> network_store
network_store --> block_store
block_store --> validator
validator --> network_adapters
network_adapters --> blockchain_map
blockchain_map --> cleanup_ethereum_shallow_blocks
cleanup_ethereum_shallow_blocks --> blockchain_map_arc
blockchain_map_arc --> shards
shards --> load_manager
load_manager --> graphql_runner
graphql_runner --> graphql_server
graphql_runner --> subscription_server
subscription_server --> index_node_server
blockchain_map_arc --> block_ingestors
block_ingestors --> job_runner
job_runner --> register_store_jobs
register_store_jobs --> static_filters
static_filters --> subgraph_count
subgraph_count --> subgraph_instance_manager
subgraph_instance_manager --> subgraph_provider
subgraph_provider --> version_switching_mode
version_switching_mode --> subgraph_registrar
subgraph_registrar --> spawn_subgraph_registrar
spawn_subgraph_registrar --> json_rpc_server
json_rpc_server --> add_cli_subgraph
add_cli_subgraph --> spawn_graphql_server
spawn_graphql_server --> spawn_subscription_server
spawn_subscription_server --> spawn_index_node_server
spawn_index_node_server --> spawn_metrics_server
do_poll
flowchart TB
A["Start"] --> B["do_poll(eth_adapter)"]
B --> C["get head block hash and number from store"]
C --> D["Fetch latest_block_header from RPC"]
D --> node_1
node_1["eth_getBlockByNumber(latest)"]
node_1 --"one block with tx hash(no receipt)"--> Elatest block #lt;= head_block
E --"YES"--> K["return OK"]
E --"NO"-->P
P["Ingest latest block"]
P --> Q["Fetch missing parent blocks"]
Q --> RIs missing block hash present?
R -- Yes --> S["Ingest missing block"]
S --> Q
ingest_block.eth_adapter
flowchart TB
node_7["ingest_block(block_hash)"]
node_1["eth_adapter.load_full_block(block)"]
node_4["fetch_receipts_with_retry(hashes, block_hash)"]
node_5supports_block_receipts
node_8["fetch_block_receipts_with_retry"]
node_3["transaction_receipt"]
node_10["eth_getTransactionReceipt"]
node_6["eth_getBlockReceipts(blockHash)"]
node_9["eth_adapter.block_by_hash"]
node_14["block_with_txs(block_hash)"]
node_13["eth_getBlockByHash(hash)"]
node_11["return Block#lt;Transcation#gt; without logs"]
node_2["fetch_individual_receipts_with_retry(hashes,block_hash)"]
node_12["fetch_transaction_receipt_with_retry(tx_hash,block_hash)"]
node_15["return ethereum block"]
node_16["mark as non-final"]
node_17["chain_store.upsert_block"]
node_18["chain_store.attempt_chain_head_update"]
node_7 --> node_9
subgraph block_by_hash
node_9 --> node_14
node_14 --> node_13
node_13 --> node_11
end
subgraph load_full_block
node_1 --"block.transactions"--> node_4
node_4 --> node_5
node_5 --"true(no iterate)"--> node_8
node_5 --"false(need iterate)"--> node_2
node_8 --> node_6
node_2 --> node_12
subgraph loop_txs
node_12 --> node_3
node_3 --> node_10
end
end
node_10 --> node_15
node_11 --> node_1
node_6 --> node_15
node_15 --> node_16
node_16 --> node_17
node_17 --> node_18
block pooling stream
sequenceDiagram
participant BlockStream as PollingBlockStream
participant ChainStore as 区块链存储
participant Adapter as 适配器
participant Subgraph as 子图
BlockStream->>ChainStore: 查询最新区块
ChainStore-->>BlockStream: 返回最新区块
BlockStream->>Adapter: 处理区块
Adapter-->>BlockStream: 返回处理结果
alt 子图指针落后于链头指针
BlockStream->>Subgraph: 检查子图状态
Subgraph-->>BlockStream: 子图指针状态
alt 子图指针不在主链上
BlockStream->>Subgraph: 获取父区块指针
Subgraph-->>BlockStream: 返回父区块指针
BlockStream->>Subgraph: 回退到父区块
Subgraph-->>BlockStream: 更新子图状态
else 子图指针在主链上
BlockStream->>BlockStream: 继续处理
end
else 子图指针在主链上
BlockStream->>BlockStream: 继续处理
end
flowchart TD
A[查询最新区块] --> B
B --|是|--> C[检查子图状态]
C --> D
D --|否|--> E[获取父区块指针]
E --> F[回退到父区块]
F --> G[更新子图状态]
D --|是|--> H[继续处理]
B --|否|--> H[继续处理]
PollingBlockStream
Reconciliation:
在数据流或数据库的上下文中,reconciliation 意味着确保不同数据源之间的信息一致。例如,合并来自不同来源的数据,以消除差异和不一致。
yield:
在这里,yield 表示从一个过程(区块流的对齐)中逐步产生输出(即区块),让系统处理这些区块。这个过程通常在异步编程中使用,以允许在计算中有序地发出结果,而不是一次性返回所有结果。
stateDiagram-v2
[*] --> BeginReconciliation
BeginReconciliation --> Reconciliation : start/restart
Reconciliation --> YieldingBlocks : poll ready ok, NextBlocks Blocks
YieldingBlocks --> YieldingBlocks : 消化每一个区块
Reconciliation --> Idle : poll ready ok,NextBlocks Done, completed
Reconciliation --> BeginReconciliation : poll ready ok,next blocks revert
Reconciliation --> RetryAfterDelay : Poll Ready error
YieldingBlocks --> BeginReconciliation : need reconciliation
RetryAfterDelay --> BeginReconciliation : Poll Ready
RetryAfterDelay --> RetryAfterDelay : poll pending
Idle --> BeginReconciliation : chain head update