Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/AElf.OS.Core/AElf.OS.Core.csproj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\common.props"/>
<Import Project="..\..\common.props" />
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>AElf.OS</RootNamespace>
Expand All @@ -8,8 +8,8 @@
<Description>Core module for the OS layer.</Description>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\AElf.Kernel.Node\AElf.Kernel.Node.csproj"/>
<ProjectReference Include="..\AElf.Kernel.Token\AElf.Kernel.Token.csproj"/>
<ProjectReference Include="..\AElf.Kernel.Node\AElf.Kernel.Node.csproj" />
<ProjectReference Include="..\AElf.Kernel.Token\AElf.Kernel.Token.csproj" />
</ItemGroup>
<ItemGroup>
<CommonMessage Include="..\..\protobuf\network_types.proto">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ namespace AElf.OS.Network.Events;

public class StreamMessageReceivedEvent
{
public StreamMessageReceivedEvent(ByteString message, string clientPubkey)
public StreamMessageReceivedEvent(ByteString message, string clientPubkey, string requestId)
{
Message = message;
ClientPubkey = clientPubkey;
RequestId = requestId;
}

public ByteString Message { get; }

public string ClientPubkey { get; }

public string RequestId { get; }
}
16 changes: 16 additions & 0 deletions src/AElf.OS.Core/Network/Events/StreamPeerExceptionEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using AElf.OS.Network.Application;
using AElf.OS.Network.Infrastructure;

namespace AElf.OS.Network.Events;

public class StreamPeerExceptionEvent
{
public NetworkException Exception { get; }
public IPeer Peer { get; }

public StreamPeerExceptionEvent(NetworkException exception, IPeer peer)
{
Exception = exception;
Peer = peer;
}
}
47 changes: 33 additions & 14 deletions src/AElf.OS.Network.Grpc/Connection/PeerDialer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public class PeerDialer : IPeerDialer
{
private readonly IAccountService _accountService;
private readonly IHandshakeProvider _handshakeProvider;
private KeyCertificatePair _clientKeyCertificatePair;
private IStreamTaskResourcePool _streamTaskResourcePool;
private readonly IStreamTaskResourcePool _streamTaskResourcePool;
public ILocalEventBus EventBus { get; set; }

public PeerDialer(IAccountService accountService,
Expand All @@ -46,8 +45,6 @@ public PeerDialer(IAccountService accountService,
EventBus = NullLocalEventBus.Instance;

Logger = NullLogger<PeerDialer>.Instance;

CreateClientKeyCertificatePair();
}

private NetworkOptions NetworkOptions => NetworkOptionsSnapshot.Value;
Expand Down Expand Up @@ -142,7 +139,17 @@ public async Task<GrpcPeer> DialBackPeerByStreamAsync(DnsEndPoint remoteEndpoint
};
Logger.LogWarning("DialBackPeerByStreamAsync meta={meta}", meta);
var peer = new GrpcStreamBackPeer(remoteEndpoint, info, responseStream, _streamTaskResourcePool, meta);

peer.SetStreamSendCallBack(async (ex, streamMessage, callTimes) =>
{
if (ex == null)
Logger.LogDebug("streamRequest write success {times}-{requestId}-{messageType}-{this}-{latency}", callTimes, streamMessage.RequestId, streamMessage.MessageType, peer,
CommonHelper.GetRequestLatency(streamMessage.RequestId));
else
{
Logger.LogError(ex, "streamRequest write fail, {requestId}-{messageType}-{this}", streamMessage.RequestId, streamMessage.MessageType, peer);
await EventBus.PublishAsync(new StreamPeerExceptionEvent(ex, peer), false);
}
});
peer.UpdateLastReceivedHandshake(handshake);

return peer;
Expand Down Expand Up @@ -191,11 +198,6 @@ public async Task<GrpcPeer> DialBackPeerAsync(DnsEndPoint remoteEndpoint, Handsh
return peer;
}

private void CreateClientKeyCertificatePair()
{
_clientKeyCertificatePair = TlsHelper.GenerateKeyCertificatePair();
}

/// <summary>
/// Calls the server side DoHandshake RPC method, in order to establish a 2-way connection.
/// </summary>
Expand Down Expand Up @@ -245,6 +247,17 @@ private async Task<GrpcStreamPeer> DailStreamPeerAsync(GrpcClient client, DnsEnd
{ GrpcConstants.PubkeyMetadataKey, nodePubkey },
{ GrpcConstants.PeerInfoMetadataKey, connectionInfo.ToString() }
});
streamPeer.SetStreamSendCallBack(async (ex, streamMessage, callTimes) =>
{
if (ex == null)
Logger.LogDebug("streamRequest write success {times}-{requestId}-{messageType}-{this}-{latency}", callTimes, streamMessage.RequestId, streamMessage.MessageType, streamPeer,
CommonHelper.GetRequestLatency(streamMessage.RequestId));
else
{
Logger.LogError(ex, "streamRequest write fail, {requestId}-{messageType}-{this}", streamMessage.RequestId, streamMessage.MessageType, streamPeer);
await EventBus.PublishAsync(new StreamPeerExceptionEvent(ex, streamPeer), false);
}
});
var success = await BuildStreamForPeerAsync(streamPeer, call);
return success ? streamPeer : null;
}
Expand All @@ -266,12 +279,17 @@ public async Task<bool> BuildStreamForPeerAsync(GrpcStreamPeer streamPeer, Async
{
try
{
await call.ResponseStream.ForEachAsync(async req => await
EventBus.PublishAsync(new StreamMessageReceivedEvent(req.ToByteString(), streamPeer.Info.Pubkey), false));
await call.ResponseStream.ForEachAsync(async req =>
{
Logger.LogDebug("listenReceive request={requestId} {streamType}-{messageType} latency={latency}", req.RequestId, req.StreamType, req.MessageType, CommonHelper.GetRequestLatency(req.RequestId));
await EventBus.PublishAsync(new StreamMessageReceivedEvent(req.ToByteString(), streamPeer.Info.Pubkey, req.RequestId), false);
});
Logger.LogWarning("listen end and complete {remoteEndPoint}", streamPeer.RemoteEndpoint.ToString());
}
catch (Exception e)
{
if (e is RpcException exception)
await EventBus.PublishAsync(new StreamPeerExceptionEvent(streamPeer.HandleRpcException(exception, "listen err {remoteEndPoint}"), streamPeer));
Logger.LogError(e, "listen err {remoteEndPoint}", streamPeer.RemoteEndpoint.ToString());
}
}, tokenSource.Token);
Expand Down Expand Up @@ -329,8 +347,9 @@ private async Task<GrpcClient> CreateClientAsync(DnsEndPoint remoteEndpoint)
return null;

Logger.LogDebug($"Upgrading connection to TLS: {certificate}.");
var clientKeyCertificatePair = TlsHelper.GenerateKeyCertificatePair();
ChannelCredentials credentials =
new SslCredentials(TlsHelper.ObjectToPem(certificate), _clientKeyCertificatePair);
new SslCredentials(TlsHelper.ObjectToPem(certificate), clientKeyCertificatePair);

var channel = new Channel(remoteEndpoint.ToString(), credentials, new List<ChannelOption>
{
Expand All @@ -340,7 +359,7 @@ private async Task<GrpcClient> CreateClientAsync(DnsEndPoint remoteEndpoint)
new(GrpcConstants.GrpcArgKeepalivePermitWithoutCalls, GrpcConstants.GrpcArgKeepalivePermitWithoutCallsOpen),
new(GrpcConstants.GrpcArgHttp2MaxPingsWithoutData, GrpcConstants.GrpcArgHttp2MaxPingsWithoutDataVal),
new(GrpcConstants.GrpcArgKeepaliveTimeoutMs, GrpcConstants.GrpcArgKeepaliveTimeoutMsVal),
new(GrpcConstants.GrpcArgKeepaliveTimeMs, GrpcConstants.GrpcArgKeepaliveTimeMsVal)
new(GrpcConstants.GrpcArgKeepaliveTimeMs, GrpcConstants.GrpcArgKeepaliveTimeMsVal),
});

var nodePubkey = AsyncHelper.RunSync(() => _accountService.GetPublicKeyAsync()).ToHex();
Expand Down
2 changes: 2 additions & 0 deletions src/AElf.OS.Network.Grpc/GrpcConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ public static class GrpcConstants
public const string GrpcArgHttp2MaxPingsWithoutData = "grpc.http2_max_pings_without_data";
public const string GrpcArgKeepaliveTimeoutMs = "grpc.keepalive_timeout_ms";
public const string GrpcArgKeepaliveTimeMs = "grpc.keepalive_time_ms";
// public const string GrpcArgHttp2WriteBufferSize = "grpc.http2.write_buffer_size";

public const int GrpcArgKeepalivePermitWithoutCallsOpen = 1;
public const int GrpcArgHttp2MaxPingsWithoutDataVal = 0;
public const int GrpcArgKeepaliveTimeoutMsVal = 60 * 1000;
public const int GrpcArgKeepaliveTimeMsVal = 2 * 60 * 60 * 1000;
// public const int GrpcArgHttp2WriteBufferSizeVal = 6 * 1024;

public const string GrpcGzipConst = "gzip";

Expand Down
2 changes: 1 addition & 1 deletion src/AElf.OS.Network.Grpc/GrpcNetworkModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public override void ConfigureServices(ServiceConfigurationContext context)
context.Services.AddSingleton<PeerService.PeerServiceBase, GrpcServerService>();

// Internal dependencies
context.Services.AddTransient<IPeerDialer, PeerDialer>();
context.Services.AddSingleton<IPeerDialer, PeerDialer>();
context.Services.AddSingleton<GrpcServerService>();

context.Services.AddSingleton<AuthInterceptor>();
Expand Down
7 changes: 7 additions & 0 deletions src/AElf.OS.Network.Grpc/Helpers/CommonHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ public static string GenerateRequestId()
return timeMs.ToString() + '_' + guid;
}

public static long GetRequestLatency(string requestId)
{
var sp = requestId.Split("_");
if (sp.Length != 2) return -1;
return long.TryParse(sp[0], out var start) ? DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - start : -1;
}

public static bool GreaterThanSupportStreamMinVersion(this string version, string minVersion)
{
return Version.Parse(version).CompareTo(Version.Parse(minVersion)) >= 0;
Expand Down
8 changes: 4 additions & 4 deletions src/AElf.OS.Network.Grpc/Peer/GrpcPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ namespace AElf.OS.Network.Grpc;
public class GrpcPeer : IPeer
{
private const int MaxMetricsPerMethod = 100;
protected const int BlockRequestTimeout = 700;
protected const int CheckHealthTimeout = 1000;
protected const int BlockRequestTimeout = 2000;
protected const int CheckHealthTimeout = 2000;
protected const int BlocksRequestTimeout = 5000;
protected const int GetNodesTimeout = 500;
protected const int GetNodesTimeout = 2000;
protected const int UpdateHandshakeTimeout = 3000;
protected const int StreamRecoveryWaitTime = 500;

Expand Down Expand Up @@ -394,7 +394,7 @@ protected virtual void RecordMetric(GrpcRequest grpcRequest, Timestamp requestSt
/// This method handles the case where the peer is potentially down. If the Rpc call
/// put the channel in TransientFailure or Connecting, we give the connection a certain time to recover.
/// </summary>
protected virtual NetworkException HandleRpcException(RpcException exception, string errorMessage)
public virtual NetworkException HandleRpcException(RpcException exception, string errorMessage)
{
var message = $"Failed request to {this}: {errorMessage}";
var type = NetworkExceptionType.Rpc;
Expand Down
33 changes: 29 additions & 4 deletions src/AElf.OS.Network.Grpc/Peer/GrpcStreamBackPeer.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using AElf.OS.Network.Application;
using AElf.OS.Network.Grpc.Helpers;
using AElf.OS.Network.Protocol.Types;
using AElf.Types;
using Grpc.Core;

namespace AElf.OS.Network.Grpc;
Expand All @@ -22,13 +25,35 @@ public GrpcStreamBackPeer(DnsEndPoint remoteEndpoint, PeerConnectionInfo peerCon

public override async Task CheckHealthAsync()
{
var request = new GrpcRequest { ErrorMessage = "Check health failed." };
var requestId = CommonHelper.GenerateRequestId();
var request = new GrpcRequest { ErrorMessage = $"Check health failed.requestId={requestId}" };

var data = new Metadata
{
{ GrpcConstants.TimeoutMetadataKey, CheckHealthTimeout.ToString() },
};
await RequestAsync(() => StreamRequestAsync(MessageType.HealthCheck, new HealthCheckRequest(), data), request);
await RequestAsync(() => StreamRequestAsync(MessageType.HealthCheck, new HealthCheckRequest(), data, requestId), request);
}

public override async Task<List<BlockWithTransactions>> GetBlocksAsync(Hash firstHash, int count)
{
var blockRequest = new BlocksRequest { PreviousBlockHash = firstHash, Count = count };
var blockInfo = $"{{ first: {firstHash}, count: {count} }}";

var requestId = CommonHelper.GenerateRequestId();
var request = new GrpcRequest
{
ErrorMessage = $"Get blocks for {blockInfo} failed.requestId={requestId}",
MetricName = nameof(MetricNames.GetBlocks),
MetricInfo = $"Get blocks for {blockInfo}"
};

var data = new Metadata
{
{ GrpcConstants.TimeoutMetadataKey, BlocksRequestTimeout.ToString() },
};
var listMessage = await RequestAsync(() => StreamRequestAsync(MessageType.RequestBlocks, blockRequest, data, requestId), request);
return listMessage != null ? BlockList.Parser.ParseFrom(listMessage.Message).Blocks.ToList() : new List<BlockWithTransactions>();
}

public override async Task DisconnectAsync(bool gracefulDisconnect)
Expand Down Expand Up @@ -57,10 +82,10 @@ public override Task<bool> TryRecoverAsync()
}


protected override NetworkException HandleRpcException(RpcException exception, string errorMessage)
public override NetworkException HandleRpcException(RpcException exception, string errorMessage)
{
var message = $"Failed request to {this}: {errorMessage}";
var type = NetworkExceptionType.Rpc;
var type = NetworkExceptionType.Rpc;
if (exception.StatusCode ==
// there was an exception, not related to connectivity.
StatusCode.Cancelled)
Expand Down
Loading