Skip to content

Commit c8c2134

Browse files
committed
Update service to use updated telemetry
Related Substrate PR: paritytech/substrate#8143
1 parent 927a405 commit c8c2134

4 files changed

Lines changed: 208 additions & 118 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin/millau/node/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ sc-finality-grandpa-rpc = { git = "https://github.com/paritytech/substrate", bra
3939
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
4040
sc-rpc = { git = "https://github.com/paritytech/substrate", branch = "master" }
4141
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" }
42+
sc-telemetry = { git = "https://github.com/paritytech/substrate", branch = "master" }
4243
sc-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "master" }
4344
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
4445
sp-consensus-aura = { git = "https://github.com/paritytech/substrate", branch = "master" }

bin/millau/node/src/service.rs

Lines changed: 103 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@
3030

3131
use millau_runtime::{self, opaque::Block, RuntimeApi};
3232
use sc_client_api::{ExecutorProvider, RemoteBackend};
33+
use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
3334
use sc_executor::native_executor_instance;
3435
pub use sc_executor::NativeExecutor;
3536
use sc_finality_grandpa::SharedVoterState;
3637
use sc_keystore::LocalKeystore;
3738
use sc_service::{error::Error as ServiceError, Configuration, TaskManager};
39+
use sc_telemetry::{Telemetry, TelemetryWorker};
3840
use sp_consensus_aura::sr25519::AuthorityPair as AuraPair;
3941
use sp_inherents::InherentDataProviders;
4042
use std::sync::Arc;
@@ -70,19 +72,38 @@ pub fn new_partial(
7072
AuraPair,
7173
>,
7274
sc_finality_grandpa::LinkHalf<Block, FullClient, FullSelectChain>,
75+
Option<Telemetry>,
7376
),
7477
>,
7578
ServiceError,
7679
> {
7780
if config.keystore_remote.is_some() {
78-
return Err(ServiceError::Other("Remote Keystores are not supported.".to_string()));
81+
return Err(ServiceError::Other(format!("Remote Keystores are not supported.")));
7982
}
80-
let inherent_data_providers = sp_inherents::InherentDataProviders::new();
83+
let inherent_data_providers = InherentDataProviders::new();
84+
85+
let telemetry = config
86+
.telemetry_endpoints
87+
.clone()
88+
.filter(|x| !x.is_empty())
89+
.map(|endpoints| -> Result<_, sc_telemetry::Error> {
90+
let worker = TelemetryWorker::new(16)?;
91+
let telemetry = worker.handle().new_telemetry(endpoints);
92+
Ok((worker, telemetry))
93+
})
94+
.transpose()?;
8195

82-
let (client, backend, keystore_container, task_manager) =
83-
sc_service::new_full_parts::<Block, RuntimeApi, Executor>(&config)?;
96+
let (client, backend, keystore_container, task_manager) = sc_service::new_full_parts::<Block, RuntimeApi, Executor>(
97+
&config,
98+
telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
99+
)?;
84100
let client = Arc::new(client);
85101

102+
let telemetry = telemetry.map(|(worker, telemetry)| {
103+
task_manager.spawn_handle().spawn("telemetry", worker.run());
104+
telemetry
105+
});
106+
86107
let select_chain = sc_consensus::LongestChain::new(backend.clone());
87108

88109
let transaction_pool = sc_transaction_pool::BasicPool::new_full(
@@ -93,22 +114,28 @@ pub fn new_partial(
93114
client.clone(),
94115
);
95116

96-
let (grandpa_block_import, grandpa_link) =
97-
sc_finality_grandpa::block_import(client.clone(), &(client.clone() as Arc<_>), select_chain.clone())?;
117+
let (grandpa_block_import, grandpa_link) = sc_finality_grandpa::block_import(
118+
client.clone(),
119+
&(client.clone() as Arc<_>),
120+
select_chain.clone(),
121+
telemetry.as_ref().map(|x| x.handle()),
122+
)?;
98123

99124
let aura_block_import =
100125
sc_consensus_aura::AuraBlockImport::<_, _, _, AuraPair>::new(grandpa_block_import.clone(), client.clone());
101126

102-
let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _, _>(
103-
sc_consensus_aura::slot_duration(&*client)?,
104-
aura_block_import.clone(),
105-
Some(Box::new(grandpa_block_import)),
106-
client.clone(),
107-
inherent_data_providers.clone(),
108-
&task_manager.spawn_essential_handle(),
109-
config.prometheus_registry(),
110-
sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()),
111-
)?;
127+
let import_queue = sc_consensus_aura::import_queue::<AuraPair, _, _, _, _, _>(ImportQueueParams {
128+
block_import: aura_block_import.clone(),
129+
justification_import: Some(Box::new(grandpa_block_import.clone())),
130+
client: client.clone(),
131+
inherent_data_providers: inherent_data_providers.clone(),
132+
spawner: &task_manager.spawn_essential_handle(),
133+
can_author_with: sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()),
134+
slot_duration: sc_consensus_aura::slot_duration(&*client)?,
135+
registry: config.prometheus_registry(),
136+
check_for_equivocation: Default::default(),
137+
telemetry: telemetry.as_ref().map(|x| x.handle()),
138+
})?;
112139

113140
Ok(sc_service::PartialComponents {
114141
client,
@@ -119,11 +146,11 @@ pub fn new_partial(
119146
select_chain,
120147
transaction_pool,
121148
inherent_data_providers,
122-
other: (aura_block_import, grandpa_link),
149+
other: (aura_block_import, grandpa_link, telemetry),
123150
})
124151
}
125152

126-
fn remote_keystore(_url: &str) -> Result<Arc<LocalKeystore>, &'static str> {
153+
fn remote_keystore(_url: &String) -> Result<Arc<LocalKeystore>, &'static str> {
127154
// FIXME: here would the concrete keystore be built,
128155
// must return a concrete type (NOT `LocalKeystore`) that
129156
// implements `CryptoStore` and `SyncCryptoStore`
@@ -141,7 +168,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
141168
select_chain,
142169
transaction_pool,
143170
inherent_data_providers,
144-
other: (block_import, grandpa_link),
171+
other: (block_import, grandpa_link, mut telemetry),
145172
} = new_partial(&config)?;
146173

147174
if let Some(url) = &config.keystore_remote {
@@ -173,13 +200,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
173200
})?;
174201

175202
if config.offchain_worker.enabled {
176-
sc_service::build_offchain_workers(
177-
&config,
178-
backend.clone(),
179-
task_manager.spawn_handle(),
180-
client.clone(),
181-
network.clone(),
182-
);
203+
sc_service::build_offchain_workers(&config, task_manager.spawn_handle(), client.clone(), network.clone());
183204
}
184205

185206
let role = config.role.clone();
@@ -265,7 +286,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
265286
})
266287
};
267288

268-
let (_rpc_handlers, telemetry_connection_notifier) = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
289+
let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
269290
network: network.clone(),
270291
client: client.clone(),
271292
keystore: keystore_container.sync_keystore(),
@@ -278,32 +299,35 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
278299
network_status_sinks,
279300
system_rpc_tx,
280301
config,
281-
telemetry_span: None,
302+
telemetry: telemetry.as_mut(),
282303
})?;
283304

284305
if role.is_authority() {
285-
let proposer = sc_basic_authorship::ProposerFactory::new(
306+
let proposer_factory = sc_basic_authorship::ProposerFactory::new(
286307
task_manager.spawn_handle(),
287308
client.clone(),
288309
transaction_pool,
289310
prometheus_registry.as_ref(),
311+
telemetry.as_ref().map(|x| x.handle()),
290312
);
291313

292314
let can_author_with = sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone());
293315

294-
let aura = sc_consensus_aura::start_aura::<_, _, _, _, _, AuraPair, _, _, _, _>(
295-
sc_consensus_aura::slot_duration(&*client)?,
296-
client.clone(),
316+
let aura = sc_consensus_aura::start_aura::<AuraPair, _, _, _, _, _, _, _, _, _>(StartAuraParams {
317+
slot_duration: sc_consensus_aura::slot_duration(&*client)?,
318+
client: client.clone(),
297319
select_chain,
298320
block_import,
299-
proposer,
300-
network.clone(),
301-
inherent_data_providers,
321+
proposer_factory,
322+
inherent_data_providers: inherent_data_providers.clone(),
302323
force_authoring,
303324
backoff_authoring_blocks,
304-
keystore_container.sync_keystore(),
325+
keystore: keystore_container.sync_keystore(),
305326
can_author_with,
306-
)?;
327+
sync_oracle: network.clone(),
328+
block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32),
329+
telemetry: telemetry.as_ref().map(|x| x.handle()),
330+
})?;
307331

308332
// the AURA authoring task is considered essential, i.e. if it
309333
// fails we take down the service with it.
@@ -326,6 +350,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
326350
observer_enabled: false,
327351
keystore,
328352
is_authority: role.is_authority(),
353+
telemetry: telemetry.as_ref().map(|x| x.handle()),
329354
};
330355

331356
if enable_grandpa {
@@ -339,10 +364,10 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
339364
config: grandpa_config,
340365
link: grandpa_link,
341366
network,
342-
telemetry_on_connect: telemetry_connection_notifier.map(|x| x.on_connect_stream()),
343367
voting_rule: sc_finality_grandpa::VotingRulesBuilder::default().build(),
344368
prometheus_registry,
345369
shared_voter_state: SharedVoterState::empty(),
370+
telemetry: telemetry.as_ref().map(|x| x.handle()),
346371
};
347372

348373
// the GRANDPA voter task is considered infallible, i.e.
@@ -358,8 +383,27 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
358383

359384
/// Builds a new service for a light client.
360385
pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError> {
386+
let telemetry = config
387+
.telemetry_endpoints
388+
.clone()
389+
.filter(|x| !x.is_empty())
390+
.map(|endpoints| -> Result<_, sc_telemetry::Error> {
391+
let worker = TelemetryWorker::new(16)?;
392+
let telemetry = worker.handle().new_telemetry(endpoints);
393+
Ok((worker, telemetry))
394+
})
395+
.transpose()?;
396+
361397
let (client, backend, keystore_container, mut task_manager, on_demand) =
362-
sc_service::new_light_parts::<Block, RuntimeApi, Executor>(&config)?;
398+
sc_service::new_light_parts::<Block, RuntimeApi, Executor>(
399+
&config,
400+
telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
401+
)?;
402+
403+
let mut telemetry = telemetry.map(|(worker, telemetry)| {
404+
task_manager.spawn_handle().spawn("telemetry", worker.run());
405+
telemetry
406+
});
363407

364408
config
365409
.network
@@ -376,22 +420,28 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
376420
on_demand.clone(),
377421
));
378422

379-
let (grandpa_block_import, _) =
380-
sc_finality_grandpa::block_import(client.clone(), &(client.clone() as Arc<_>), select_chain)?;
423+
let (grandpa_block_import, _) = sc_finality_grandpa::block_import(
424+
client.clone(),
425+
&(client.clone() as Arc<_>),
426+
select_chain.clone(),
427+
telemetry.as_ref().map(|x| x.handle()),
428+
)?;
381429

382430
let aura_block_import =
383431
sc_consensus_aura::AuraBlockImport::<_, _, _, AuraPair>::new(grandpa_block_import.clone(), client.clone());
384432

385-
let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _, _>(
386-
sc_consensus_aura::slot_duration(&*client)?,
387-
aura_block_import,
388-
Some(Box::new(grandpa_block_import)),
389-
client.clone(),
390-
InherentDataProviders::new(),
391-
&task_manager.spawn_essential_handle(),
392-
config.prometheus_registry(),
393-
sp_consensus::NeverCanAuthor,
394-
)?;
433+
let import_queue = sc_consensus_aura::import_queue::<AuraPair, _, _, _, _, _>(ImportQueueParams {
434+
block_import: aura_block_import.clone(),
435+
justification_import: Some(Box::new(grandpa_block_import.clone())),
436+
client: client.clone(),
437+
inherent_data_providers: InherentDataProviders::new(),
438+
spawner: &task_manager.spawn_essential_handle(),
439+
can_author_with: sp_consensus::NeverCanAuthor,
440+
slot_duration: sc_consensus_aura::slot_duration(&*client)?,
441+
registry: config.prometheus_registry(),
442+
check_for_equivocation: Default::default(),
443+
telemetry: telemetry.as_ref().map(|x| x.handle()),
444+
})?;
395445

396446
let (network, network_status_sinks, system_rpc_tx, network_starter) =
397447
sc_service::build_network(sc_service::BuildNetworkParams {
@@ -405,13 +455,7 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
405455
})?;
406456

407457
if config.offchain_worker.enabled {
408-
sc_service::build_offchain_workers(
409-
&config,
410-
backend.clone(),
411-
task_manager.spawn_handle(),
412-
client.clone(),
413-
network.clone(),
414-
);
458+
sc_service::build_offchain_workers(&config, task_manager.spawn_handle(), client.clone(), network.clone());
415459
}
416460

417461
sc_service::spawn_tasks(sc_service::SpawnTasksParams {
@@ -427,7 +471,7 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
427471
network,
428472
network_status_sinks,
429473
system_rpc_tx,
430-
telemetry_span: None,
474+
telemetry: telemetry.as_mut(),
431475
})?;
432476

433477
network_starter.start_network();

0 commit comments

Comments
 (0)