Skip to content

Commit fdd4e04

Browse files
committed
Implement subscribe-then-hydrate for pool profilers
1 parent 112fcd9 commit fdd4e04

File tree

24 files changed

+779
-179
lines changed

24 files changed

+779
-179
lines changed

crates/adapters/blockchain/src/cache/database.rs

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@ use std::pin::Pin;
1717

1818
use alloy::primitives::{Address, U256};
1919
use futures_util::{Stream, StreamExt};
20-
use nautilus_model::defi::{
21-
Block, Chain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, Token,
22-
data::{DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
23-
pool_analysis::{
24-
position::PoolPosition,
25-
snapshot::{PoolAnalytics, PoolSnapshot, PoolState},
20+
use nautilus_model::{
21+
defi::{
22+
Block, Chain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex, Token,
23+
data::{DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
24+
pool_analysis::{
25+
position::PoolPosition,
26+
snapshot::{PoolAnalytics, PoolSnapshot, PoolState},
27+
},
28+
tick_map::tick::PoolTick,
29+
validation::validate_address,
2630
},
27-
tick_map::tick::PoolTick,
28-
validation::validate_address,
31+
identifiers::InstrumentId,
2932
};
3033
use sqlx::{PgPool, Row, postgres::PgConnectOptions};
3134

@@ -1577,7 +1580,8 @@ impl BlockchainCacheDatabase {
15771580
fee_growth_global_0::TEXT, fee_growth_global_1::TEXT,
15781581
total_amount0_deposited::TEXT, total_amount1_deposited::TEXT,
15791582
total_amount0_collected::TEXT, total_amount1_collected::TEXT,
1580-
total_swaps, total_mints, total_burns, total_fee_collects, total_flashes
1583+
total_swaps, total_mints, total_burns, total_fee_collects, total_flashes,
1584+
(SELECT dex_name FROM pool WHERE chain_id = $1 AND address = $2) as dex_name
15811585
FROM pool_snapshot
15821586
WHERE chain_id = $1 AND pool_address = $2 AND is_valid = TRUE
15831587
ORDER BY block DESC, transaction_index DESC, log_index DESC
@@ -1656,7 +1660,23 @@ impl BlockchainCacheDatabase {
16561660
)
16571661
.await?;
16581662

1663+
let dex_name: String = row.get("dex_name");
1664+
let chain = nautilus_model::defi::Chain::from_chain_id(chain_id)
1665+
.ok_or_else(|| anyhow::anyhow!("Unknown chain_id: {}", chain_id))?;
1666+
1667+
let dex_type = nautilus_model::defi::DexType::from_dex_name(&dex_name)
1668+
.ok_or_else(|| anyhow::anyhow!("Unknown dex_name: {}", dex_name))?;
1669+
1670+
let dex_extended = crate::exchanges::get_dex_extended(chain.name, &dex_type)
1671+
.ok_or_else(|| {
1672+
anyhow::anyhow!("No DEX extended found for {} on {}", dex_name, chain.name)
1673+
})?;
1674+
1675+
let instrument_id =
1676+
Pool::create_instrument_id(chain.name, &dex_extended.dex, pool_address);
1677+
16591678
Ok(Some(PoolSnapshot::new(
1679+
instrument_id,
16601680
state,
16611681
positions,
16621682
ticks,
@@ -1843,6 +1863,7 @@ impl BlockchainCacheDatabase {
18431863
&'a self,
18441864
chain: SharedChain,
18451865
dex: SharedDex,
1866+
instrument_id: InstrumentId,
18461867
pool_address: &Address,
18471868
from_position: Option<BlockPosition>,
18481869
) -> Pin<Box<dyn Stream<Item = Result<DexPoolData, anyhow::Error>> + Send + 'a>> {
@@ -2130,8 +2151,10 @@ impl BlockchainCacheDatabase {
21302151

21312152
// Transform rows to events
21322153
let stream = query.map(move |row_result| match row_result {
2133-
Ok(row) => transform_row_to_dex_pool_data(&row, chain.clone(), dex.clone())
2134-
.map_err(|e| anyhow::anyhow!("Steam pool event transform error: {}", e)),
2154+
Ok(row) => {
2155+
transform_row_to_dex_pool_data(&row, chain.clone(), dex.clone(), instrument_id)
2156+
.map_err(|e| anyhow::anyhow!("Steam pool event transform error: {}", e))
2157+
}
21352158
Err(e) => Err(anyhow::anyhow!("Stream pool events database error: {}", e)),
21362159
});
21372160

crates/adapters/blockchain/src/cache/rows.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ use std::str::FromStr;
1717

1818
use alloy::primitives::{Address, I256, U160, U256};
1919
use nautilus_core::UnixNanos;
20-
use nautilus_model::defi::{
21-
PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap,
22-
data::{DexPoolData, PoolFeeCollect, PoolFlash},
23-
validation::validate_address,
20+
use nautilus_model::{
21+
defi::{
22+
PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap,
23+
data::{DexPoolData, PoolFeeCollect, PoolFlash},
24+
validation::validate_address,
25+
},
26+
identifiers::InstrumentId,
2427
};
2528
use sqlx::{FromRow, Row, postgres::PgRow};
2629

@@ -128,6 +131,7 @@ pub fn transform_row_to_dex_pool_data(
128131
row: &PgRow,
129132
chain: nautilus_model::defi::SharedChain,
130133
dex: nautilus_model::defi::SharedDex,
134+
instrument_id: InstrumentId,
131135
) -> Result<DexPoolData, sqlx::Error> {
132136
let event_type = row.try_get::<String, _>("event_type")?;
133137
let pool_address_str = row.try_get::<String, _>("pool_address")?;
@@ -191,6 +195,7 @@ pub fn transform_row_to_dex_pool_data(
191195
let pool_swap = PoolSwap::new(
192196
chain,
193197
dex,
198+
instrument_id,
194199
pool_address,
195200
block,
196201
transaction_hash,
@@ -278,6 +283,7 @@ pub fn transform_row_to_dex_pool_data(
278283
let pool_liquidity_update = PoolLiquidityUpdate::new(
279284
chain,
280285
dex,
286+
instrument_id,
281287
pool_address,
282288
kind,
283289
block,
@@ -329,6 +335,7 @@ pub fn transform_row_to_dex_pool_data(
329335
let pool_fee_collect = PoolFeeCollect::new(
330336
chain,
331337
dex,
338+
instrument_id,
332339
pool_address,
333340
block,
334341
transaction_hash,
@@ -389,6 +396,7 @@ pub fn transform_row_to_dex_pool_data(
389396
let pool_flash = PoolFlash::new(
390397
chain,
391398
dex,
399+
instrument_id,
392400
pool_address,
393401
block,
394402
transaction_hash,

crates/adapters/blockchain/src/contracts/uniswap_v3_pool.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@ use alloy::{
2020
sol,
2121
sol_types::{SolCall, private::primitives::aliases::I24},
2222
};
23-
use nautilus_model::defi::{
24-
data::block::BlockPosition,
25-
pool_analysis::{
26-
position::PoolPosition,
27-
snapshot::{PoolAnalytics, PoolSnapshot, PoolState},
23+
use nautilus_model::{
24+
defi::{
25+
data::block::BlockPosition,
26+
pool_analysis::{
27+
position::PoolPosition,
28+
snapshot::{PoolAnalytics, PoolSnapshot, PoolState},
29+
},
30+
tick_map::tick::PoolTick,
2831
},
29-
tick_map::tick::PoolTick,
32+
identifiers::InstrumentId,
3033
};
3134
use thiserror::Error;
3235

@@ -436,6 +439,7 @@ impl UniswapV3PoolContract {
436439
pub async fn fetch_snapshot(
437440
&self,
438441
pool_address: &Address,
442+
instrument_id: InstrumentId,
439443
tick_values: &[i32],
440444
position_keys: &[(Address, i32, i32)],
441445
block: Option<u64>,
@@ -474,6 +478,7 @@ impl UniswapV3PoolContract {
474478
};
475479

476480
Ok(PoolSnapshot::new(
481+
instrument_id,
477482
pool_state,
478483
positions,
479484
ticks,

crates/adapters/blockchain/src/data/client.rs

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -378,11 +378,55 @@ impl BlockchainDataClient {
378378

379379
Ok(())
380380
}
381-
DefiSubscribeCommand::Pool(_cmd) => {
382-
tracing::info!("Processing subscribe pool command");
383-
// Pool subscriptions are typically handled at the application level
384-
// as they involve specific pool addresses and don't require blockchain streaming
385-
tracing::warn!("Pool subscriptions are handled at application level");
381+
DefiSubscribeCommand::Pool(cmd) => {
382+
tracing::info!(
383+
"Processing subscribe pool command for {}",
384+
cmd.instrument_id
385+
);
386+
387+
if let Some(ref mut _rpc) = core_client.rpc_client {
388+
tracing::warn!("RPC pool subscription not yet implemented, using HyperSync");
389+
}
390+
391+
if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
392+
let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
393+
.map_err(|e| {
394+
anyhow::anyhow!(
395+
"Invalid pool address '{}' failed with error: {:?}",
396+
cmd.instrument_id,
397+
e
398+
)
399+
})?;
400+
401+
// Subscribe to all pool event types
402+
core_client
403+
.subscription_manager
404+
.subscribe_swaps(dex, pool_address);
405+
core_client
406+
.subscription_manager
407+
.subscribe_burns(dex, pool_address);
408+
core_client
409+
.subscription_manager
410+
.subscribe_mints(dex, pool_address);
411+
core_client
412+
.subscription_manager
413+
.subscribe_collects(dex, pool_address);
414+
core_client
415+
.subscription_manager
416+
.subscribe_flashes(dex, pool_address);
417+
418+
tracing::info!(
419+
"Subscribed to all pool events for {} at address {}",
420+
cmd.instrument_id,
421+
pool_address
422+
);
423+
} else {
424+
anyhow::bail!(
425+
"Invalid venue {}, expected Blockchain DEX format",
426+
cmd.instrument_id.venue
427+
)
428+
}
429+
386430
Ok(())
387431
}
388432
DefiSubscribeCommand::PoolSwaps(cmd) => {
@@ -537,10 +581,47 @@ impl BlockchainDataClient {
537581

538582
Ok(())
539583
}
540-
DefiUnsubscribeCommand::Pool(_cmd) => {
541-
tracing::info!("Processing unsubscribe pool command");
542-
// Pool unsubscriptions are typically handled at the application level
543-
tracing::warn!("Pool unsubscriptions are handled at application level");
584+
DefiUnsubscribeCommand::Pool(cmd) => {
585+
tracing::info!(
586+
"Processing unsubscribe pool command for {}",
587+
cmd.instrument_id
588+
);
589+
590+
if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
591+
let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
592+
.map_err(|_| {
593+
anyhow::anyhow!("Invalid pool address: {}", cmd.instrument_id)
594+
})?;
595+
596+
// Unsubscribe from all pool event types
597+
core_client
598+
.subscription_manager
599+
.unsubscribe_swaps(dex, pool_address);
600+
core_client
601+
.subscription_manager
602+
.unsubscribe_burns(dex, pool_address);
603+
core_client
604+
.subscription_manager
605+
.unsubscribe_mints(dex, pool_address);
606+
core_client
607+
.subscription_manager
608+
.unsubscribe_collects(dex, pool_address);
609+
core_client
610+
.subscription_manager
611+
.unsubscribe_flashes(dex, pool_address);
612+
613+
tracing::info!(
614+
"Unsubscribed from all pool events for {} at address {}",
615+
cmd.instrument_id,
616+
pool_address
617+
);
618+
} else {
619+
anyhow::bail!(
620+
"Invalid venue {}, expected Blockchain DEX format",
621+
cmd.instrument_id.venue
622+
)
623+
}
624+
544625
Ok(())
545626
}
546627
DefiUnsubscribeCommand::PoolSwaps(cmd) => {

crates/adapters/blockchain/src/data/core.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,7 @@ impl BlockchainDataClientCore {
670670
dex_extended.convert_to_trade_data(&pool.token0, &pool.token1, swap_event)?;
671671
let swap = swap_event.to_pool_swap(
672672
self.chain.clone(),
673+
pool.instrument_id,
673674
pool.address,
674675
Some(side),
675676
Some(size),
@@ -702,6 +703,7 @@ impl BlockchainDataClientCore {
702703
let liquidity_update = mint_event.to_pool_liquidity_update(
703704
self.chain.clone(),
704705
dex_extended.dex.clone(),
706+
pool.instrument_id,
705707
pool.address,
706708
timestamp,
707709
);
@@ -731,6 +733,7 @@ impl BlockchainDataClientCore {
731733
let liquidity_update = burn_event.to_pool_liquidity_update(
732734
self.chain.clone(),
733735
dex_extended.dex.clone(),
736+
pool.instrument_id,
734737
pool.address,
735738
timestamp,
736739
);
@@ -759,6 +762,7 @@ impl BlockchainDataClientCore {
759762
let fee_collect = collect_event.to_pool_fee_collect(
760763
self.chain.clone(),
761764
dex_extended.dex.clone(),
765+
pool.instrument_id,
762766
pool.address,
763767
timestamp,
764768
);
@@ -781,7 +785,12 @@ impl BlockchainDataClientCore {
781785
.get_block_timestamp(flash_event.block_number)
782786
.copied();
783787

784-
let flash = flash_event.to_pool_flash(self.chain.clone(), pool.address, timestamp);
788+
let flash = flash_event.to_pool_flash(
789+
self.chain.clone(),
790+
pool.instrument_id,
791+
pool.address,
792+
timestamp,
793+
);
785794

786795
Ok(flash)
787796
}
@@ -1110,8 +1119,13 @@ impl BlockchainDataClientCore {
11101119
pool.instrument_id
11111120
);
11121121

1113-
let mut event_stream =
1114-
database.stream_pool_events(self.chain.clone(), dex.clone(), &pool.address, None);
1122+
let mut event_stream = database.stream_pool_events(
1123+
self.chain.clone(),
1124+
dex.clone(),
1125+
pool.instrument_id,
1126+
&pool.address,
1127+
None,
1128+
);
11151129
let mut event_count = 0;
11161130

11171131
while let Some(event_result) = event_stream.next().await {

crates/adapters/blockchain/src/events/burn.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515

1616
use alloy::primitives::{Address, U256};
1717
use nautilus_core::UnixNanos;
18-
use nautilus_model::defi::{PoolLiquidityUpdate, PoolLiquidityUpdateType, SharedChain, SharedDex};
18+
use nautilus_model::{
19+
defi::{PoolLiquidityUpdate, PoolLiquidityUpdateType, SharedChain, SharedDex},
20+
identifiers::InstrumentId,
21+
};
1922

2023
/// Represents a burn event that occurs when liquidity is removed from a position in a liquidity pool.
2124
#[derive(Debug, Clone)]
@@ -87,12 +90,14 @@ impl BurnEvent {
8790
&self,
8891
chain: SharedChain,
8992
dex: SharedDex,
93+
instrument_id: InstrumentId,
9094
pool_address: Address,
9195
timestamp: Option<UnixNanos>,
9296
) -> PoolLiquidityUpdate {
9397
PoolLiquidityUpdate::new(
9498
chain,
9599
dex,
100+
instrument_id,
96101
pool_address,
97102
PoolLiquidityUpdateType::Burn,
98103
self.block_number,

0 commit comments

Comments
 (0)