Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
parameters:
parts: 3
n: 2
codecoverage: true
codecoverage: false
- template: templates/build-template-window.yml
parameters:
parts: 3
Expand Down
61 changes: 47 additions & 14 deletions protobuf/peer_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,27 @@ import "aelf/core.proto";

service PeerService {

rpc Ping (PingRequest) returns (PongReply) {}
rpc CheckHealth (HealthCheckRequest) returns (HealthCheckReply) {}

rpc RequestBlock (BlockRequest) returns (BlockReply) {}
rpc RequestBlocks (BlocksRequest) returns (BlockList) {}
rpc Ping (PingRequest) returns (PongReply) {}
rpc CheckHealth (HealthCheckRequest) returns (HealthCheckReply) {}

rpc BlockBroadcastStream (stream BlockWithTransactions) returns (VoidReply) {}

rpc TransactionBroadcastStream (stream aelf.Transaction) returns (VoidReply) {}
rpc AnnouncementBroadcastStream (stream BlockAnnouncement) returns (VoidReply) {}
rpc RequestBlock (BlockRequest) returns (BlockReply) {}
rpc RequestBlocks (BlocksRequest) returns (BlockList) {}

rpc LibAnnouncementBroadcastStream (stream LibAnnouncement) returns (VoidReply) {}
rpc BlockBroadcastStream (stream BlockWithTransactions) returns (VoidReply) {}

rpc GetNodes (NodesRequest) returns (NodeList) {}
rpc TransactionBroadcastStream (stream aelf.Transaction) returns (VoidReply) {}
rpc AnnouncementBroadcastStream (stream BlockAnnouncement) returns (VoidReply) {}

rpc DoHandshake (HandshakeRequest) returns (HandshakeReply) {}
rpc ConfirmHandshake (ConfirmHandshakeRequest) returns (VoidReply) {}
rpc LibAnnouncementBroadcastStream (stream LibAnnouncement) returns (VoidReply) {}

rpc Disconnect (DisconnectReason) returns (VoidReply) {}
rpc RequestByStream (stream StreamMessage) returns (stream StreamMessage) {}

rpc GetNodes (NodesRequest) returns (NodeList) {}

rpc DoHandshake (HandshakeRequest) returns (HandshakeReply) {}
rpc ConfirmHandshake (ConfirmHandshakeRequest) returns (VoidReply) {}

rpc Disconnect (DisconnectReason) returns (VoidReply) {}
}

// **** No reply *****
Expand All @@ -45,3 +47,34 @@ message HealthCheckRequest {
message HealthCheckReply {
}

message StreamMessage {
StreamType stream_type = 1;
MessageType message_type = 2;
string request_id = 3;
bytes message = 4;
map<string, string> meta = 5;
}

enum StreamType {
UNKNOWN = 0;
REQUEST = 1;
REPLY = 2;
}

enum MessageType {
ANY = 0;

HAND_SHAKE = 1;
PING = 2;
CONFIRM_HAND_SHAKE = 3;
HEALTH_CHECK = 4;
REQUEST_BLOCK = 5;
REQUEST_BLOCKS = 6;
GET_NODES = 7;

BLOCK_BROADCAST = 8;
TRANSACTION_BROADCAST = 9;
ANNOUNCEMENT_BROADCAST = 10;
LIB_ANNOUNCEMENT_BROADCAST = 11;
DISCONNECT = 12;
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ await _transactionBlockIndexService.ValidateTransactionBlockIndexExistsInBranchA
block.Header.PreviousBlockHash);
if (!blockIndexExists)
continue;
Logger.LogDebug("Transaction: {TransactionId} repackaged", transactionId);
Logger.LogDebug("Transaction: {TransactionId} repackaged", transactionId.ToHex());
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public async Task MergeBlockStateAsync(long lastIrreversibleBlockHeight, Hash la

Logger.LogDebug(
"Start merge lib height: {LastIrreversibleBlockHeight}, lib block hash: {LastIrreversibleBlockHash}, merge count: {BlockIndexesCount}",
lastIrreversibleBlockHeight, lastIrreversibleBlockHash, blockIndexes.Count);
lastIrreversibleBlockHeight, lastIrreversibleBlockHash.ToHex(), blockIndexes.Count);

foreach (var blockIndex in blockIndexes)
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ await _transactionResultService.GetTransactionResultAsync(transactionId, blockHa
== null)
{
Logger.LogWarning(
$"Fail to load transaction result. block hash : {blockHash}, tx id: {transactionId}");
"Fail to load transaction result. block hash : {blockHash}, tx id: {transactionId}", blockHash.ToHex(), transactionId.ToHex());

return null;
}
Expand Down
2 changes: 1 addition & 1 deletion src/AElf.Kernel.Types/Block/BlockHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public byte[] GetHashBytes()
private byte[] GetSignatureData()
{
if (!VerifyFields())
throw new InvalidOperationException($"Invalid block header: {this}.");
throw new InvalidOperationException($"Invalid block header: PreviousBlockHash={PreviousBlockHash?.ToHex()}, mtr={MerkleTreeRootOfTransactions?.ToHex()}, ChainId={ChainId}, Height={Height}, Time={Time}.");

if (Signature.IsEmpty)
return this.ToByteArray();
Expand Down
2 changes: 2 additions & 0 deletions src/AElf.Kernel.Types/KernelConstants.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using Google.Protobuf.WellKnownTypes;

namespace AElf.Kernel;
Expand All @@ -16,4 +17,5 @@ public static class KernelConstants
public const string SignaturePlaceholder = "SignaturePlaceholder";
public const string BlockExecutedDataKey = "BlockExecutedData";
public static Duration AllowedFutureBlockTimeSpan = new() { Seconds = 4 };
public static string SupportStreamMinVersion = "1.4.0.0";
}
10 changes: 5 additions & 5 deletions src/AElf.Launcher/AElf.Launcher.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
<ServerGarbageCollection>true</ServerGarbageCollection>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\AElf.Blockchains.MainChain\AElf.Blockchains.MainChain.csproj"/>
<ProjectReference Include="..\AElf.Blockchains.SideChain\AElf.Blockchains.SideChain.csproj"/>
<ProjectReference Include="..\AElf.Blockchains.MainChain\AElf.Blockchains.MainChain.csproj" />
<ProjectReference Include="..\AElf.Blockchains.SideChain\AElf.Blockchains.SideChain.csproj" />
<None Update="Dockerfile">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
Expand All @@ -15,9 +15,9 @@
</None>
</ItemGroup>
<ItemGroup>
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="5.2.2"/>
<PackageReference Include="Volo.Abp.Autofac" Version="5.2.2"/>
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="5.2.2" />
<PackageReference Include="Volo.Abp.Autofac" Version="5.2.2" />
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>
<ItemGroup>
<Content Include="appsettings.Development.json">
Expand Down
2 changes: 2 additions & 0 deletions src/AElf.OS.Core/Network/Application/INetworkService.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using AElf.Kernel;
using AElf.OS.Network.Infrastructure;
using AElf.OS.Network.Types;
using AElf.Types;

Expand Down Expand Up @@ -28,4 +29,5 @@ Task<bool> RemovePeerByPubkeyAsync(string peerPubkey,
Task CheckPeersHealthAsync();
void CheckNtpDrift();
bool IsPeerPoolFull();
Task<List<NodeInfo>> GetNodesAsync(IPeer peer);
}
38 changes: 29 additions & 9 deletions src/AElf.OS.Core/Network/Application/NetworkService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public Task BroadcastAnnounceAsync(BlockHeader blockHeader)
Logger.LogInformation(ex, $"Could not broadcast announcement to {peer} " +
$"- status {peer.ConnectionStatus}.");

await HandleNetworkException(peer, ex);
await HandleNetworkExceptionAsync(peer, ex);
}
});
}
Expand Down Expand Up @@ -171,7 +171,7 @@ public Task BroadcastTransactionAsync(Transaction transaction)
Logger.LogWarning(ex, $"Could not broadcast transaction to {peer} " +
$"- status {peer.ConnectionStatus}.");

await HandleNetworkException(peer, ex);
await HandleNetworkExceptionAsync(peer, ex);
}
});
}
Expand Down Expand Up @@ -201,7 +201,7 @@ public Task BroadcastLibAnnounceAsync(Hash libHash, long libHeight)
{
Logger.LogWarning(ex, $"Could not broadcast lib announcement to {peer} " +
$"- status {peer.ConnectionStatus}.");
await HandleNetworkException(peer, ex);
await HandleNetworkExceptionAsync(peer, ex);
}
});
}
Expand Down Expand Up @@ -256,7 +256,6 @@ public async Task<Response<List<BlockWithTransactions>>> GetBlocksAsync(Hash pre

if (peer == null)
throw new InvalidOperationException($"Could not find peer {peerPubkey}.");

var response = await Request(peer, p => p.GetBlocksAsync(previousBlock, count));

if (response.Success && response.Payload != null
Expand Down Expand Up @@ -343,7 +342,7 @@ private void EnqueueBlock(IPeer peer, BlockWithTransactions blockWithTransaction
if (ex != null)
{
Logger.LogWarning(ex, $"Could not broadcast block to {peer} - status {peer.ConnectionStatus}.");
await HandleNetworkException(peer, ex);
await HandleNetworkExceptionAsync(peer, ex);
}
});
}
Expand Down Expand Up @@ -372,13 +371,33 @@ private async Task<Response<T>> Request<T>(IPeer peer, Func<IPeer, Task<T>> func
if (ex.ExceptionType == NetworkExceptionType.HandlerException)
return new Response<T>(default);

await HandleNetworkException(peer, ex);
await HandleNetworkExceptionAsync(peer, ex);
}

return new Response<T>();
}

private async Task HandleNetworkException(IPeer peer, NetworkException exception)
public async Task<List<NodeInfo>> GetNodesAsync(IPeer peer)
{
try
{
var nodeList = await peer.GetNodesAsync();

if (nodeList?.Nodes == null)
return new List<NodeInfo>();

Logger.LogDebug("get nodes: {nodeList} from peer: {peer}.", nodeList, peer);
return nodeList.Nodes.ToList();
}
catch (Exception e)
{
if (e is NetworkException exception) await HandleNetworkExceptionAsync(peer, exception);
Logger.LogWarning(e, "get nodes failed. peer={peer}", peer);
return new List<NodeInfo>();
}
}

private async Task HandleNetworkExceptionAsync(IPeer peer, NetworkException exception)
{
if (exception.ExceptionType == NetworkExceptionType.Unrecoverable)
{
Expand All @@ -398,8 +417,9 @@ private async Task RecoverPeerAsync(IPeer peer)
return;

var success = await peer.TryRecoverAsync();

if (!success)
if (success)
await _networkServer.BuildStreamForPeerAsync(peer);
else
await _networkServer.TrySchedulePeerReconnectionAsync(peer);
}

Expand Down
16 changes: 16 additions & 0 deletions src/AElf.OS.Core/Network/Events/StreamMessageReceivedEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Google.Protobuf;

namespace AElf.OS.Network.Events;

public class StreamMessageReceivedEvent
{
public StreamMessageReceivedEvent(ByteString message, string clientPubkey)
{
Message = message;
ClientPubkey = clientPubkey;
}

public ByteString Message { get; }

public string ClientPubkey { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ public interface IAElfNetworkServer
Task StopAsync(bool gracefulDisconnect = true);
void CheckNtpDrift();
Task<bool> CheckEndpointAvailableAsync(DnsEndPoint endpoint);
Task<bool> BuildStreamForPeerAsync(IPeer peer);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using AElf.Kernel.Account.Application;
using AElf.OS.Network.Application;
using AElf.OS.Network.Domain;
using AElf.OS.Network.Extensions;
using Microsoft.Extensions.Logging;
Expand All @@ -28,18 +28,20 @@ public class PeerDiscoveryJobProcessor : IPeerDiscoveryJobProcessor, ISingletonD
private readonly IDiscoveredNodeCacheProvider _discoveredNodeCacheProvider;
private readonly IAElfNetworkServer _networkServer;
private readonly INodeManager _nodeManager;
private readonly INetworkService _networkService;

private TransformManyBlock<IPeer, NodeInfo> _discoverNodesDataflow;
private ActionBlock<NodeInfo> _processNodeDataflow;

public PeerDiscoveryJobProcessor(INodeManager nodeManager,
IDiscoveredNodeCacheProvider discoveredNodeCacheProvider, IAElfNetworkServer networkServer,
IAccountService accountService)
IAccountService accountService, INetworkService networkService)
{
_nodeManager = nodeManager;
_discoveredNodeCacheProvider = discoveredNodeCacheProvider;
_networkServer = networkServer;
_accountService = accountService;
_networkService = networkService;
CreatePeerDiscoveryDataflow();

Logger = NullLogger<PeerDiscoveryJobProcessor>.Instance;
Expand Down Expand Up @@ -80,21 +82,7 @@ private void CreatePeerDiscoveryDataflow()

private async Task<List<NodeInfo>> DiscoverNodesAsync(IPeer peer)
{
try
{
var nodeList = await peer.GetNodesAsync();

if (nodeList?.Nodes == null)
return new List<NodeInfo>();

Logger.LogDebug($"Discover nodes: {nodeList} from peer: {peer}.");
return nodeList.Nodes.ToList();
}
catch (Exception e)
{
Logger.LogWarning(e, "Discover nodes failed.");
return new List<NodeInfo>();
}
return await _networkService.GetNodesAsync(peer);
}

private async Task ProcessNodeAsync(NodeInfo node)
Expand Down
1 change: 1 addition & 0 deletions src/AElf.OS.Core/Network/NetworkConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public static class NetworkConstants
public const int DefaultMaxBufferedTransactionCount = 100;
public const int DefaultMaxBufferedBlockCount = 50;
public const int DefaultMaxBufferedAnnouncementCount = 200;
public const int DefaultMaxBufferedStreamCount = 200;

public const int DefaultPeerReconnectionPeriod = 60_000; // 1 min
public const int DefaultMaximumReconnectionTime = 60_000 * 60 * 24; // 1 day
Expand Down
3 changes: 3 additions & 0 deletions src/AElf.OS.Core/Network/NetworkOptions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using AElf.Kernel;

namespace AElf.OS.Network;

Expand Down Expand Up @@ -75,6 +76,8 @@ public class NetworkOptions
public int PeerInvalidTransactionTimeout { get; set; } = NetworkConstants.DefaultPeerInvalidTransactionTimeout;

public int PeerInvalidTransactionLimit { get; set; } = NetworkConstants.DefaultPeerInvalidTransactionLimit;

public string SupportStreamMinVersion { get; set; } = KernelConstants.SupportStreamMinVersion;
}

[Flags]
Expand Down
4 changes: 2 additions & 2 deletions src/AElf.OS.Core/Network/Protocol/HandshakeProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ public async Task<HandshakeValidationResult> ValidateHandshakeAsync(Handshake ha
var chainId = _blockchainService.GetChainId();
if (handshake.HandshakeData.ChainId != chainId)
{
Logger.LogDebug($"Chain is is incorrect: {handshake.HandshakeData.ChainId}.");
Logger.LogDebug($"Chain is incorrect: {handshake.HandshakeData.ChainId}.");
return HandshakeValidationResult.InvalidChainId;
}

if (handshake.HandshakeData.Version != KernelConstants.ProtocolVersion)
{
Logger.LogDebug($"Version is is incorrect: {handshake.HandshakeData.Version}.");
Logger.LogDebug($"Version is incorrect: {handshake.HandshakeData.Version}.");
return HandshakeValidationResult.InvalidVersion;
}

Expand Down
5 changes: 5 additions & 0 deletions src/AElf.OS.Core/Network/Protocol/Types/PeerConnectionInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ public class PeerConnectionInfo
public bool IsInbound { get; set; }
public byte[] SessionId { get; set; }
public string NodeVersion { get; set; }

public override string ToString()
{
return $"key: {Pubkey.Substring(0, 45)}...";
}
}
Loading