diff --git a/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Aio.cs b/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Aio.cs new file mode 100644 index 00000000000000..61b75e5a1cdbc4 --- /dev/null +++ b/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Aio.cs @@ -0,0 +1,129 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Runtime.InteropServices; + +internal static partial class Interop +{ + internal static partial class Sys + { + [StructLayout(LayoutKind.Sequential)] + internal struct IoEvent + { + internal ulong Data; + internal ulong Obj; + internal long Res; + internal long Res2; + } + + [StructLayout(LayoutKind.Sequential)] + internal struct IoControlBlock + { + internal ulong AioData; + + // these fields swap for big endian + // https://github.com/torvalds/linux/blob/0a679e13ea30f85a1aef0669ee0c5a9fd7860b34/include/uapi/linux/aio_abi.h#L77-L83 + private uint _swappedField_1; + private uint _swappedField_2; + + internal ushort AioLioOpcode; + internal short AioReqprio; + internal uint AioFildes; + + internal ulong AioBuf; + internal ulong AioNbytes; + internal long AioOffset; + + internal ulong AioReserved2; + + internal uint AioFlags; + internal uint AioResfd; + + internal uint AioKey + { + get => BitConverter.IsLittleEndian ? _swappedField_1 : _swappedField_2; + set + { + if (BitConverter.IsLittleEndian) + { + _swappedField_1 = value; + } + else + { + _swappedField_2 = value; + } + } + } + internal int AioRwFlags + { + get => BitConverter.IsLittleEndian ? (int)_swappedField_2 : (int)_swappedField_1; + set + { + if (BitConverter.IsLittleEndian) + { + _swappedField_2 = (uint)value; + } + else + { + _swappedField_1 = (uint)value; + } + } + } + } + + [StructLayout(LayoutKind.Sequential)] + internal struct AioRing + { + internal uint Id; + internal uint Nr; + internal uint Head; + internal uint Tail; + internal uint Magic; + internal uint CompatFeatures; + internal uint IncompatFeatures; + internal uint HeaderLength; + + internal static unsafe IoEvent* IoEvents(AioRing* ring, int idx) + { + IoEvent* ev = (IoEvent*)(ring + 1); + return ev + idx; + } + } + + [StructLayout(LayoutKind.Sequential)] + internal struct AioContext + { + internal unsafe AioRing* Ring; + } + + internal static class IoControlBlockFlags + { + internal const int IOCB_CMD_PREAD = 0; + internal const int IOCB_CMD_PWRITE = 1; + internal const int IOCB_CMD_FSYNC = 2; + internal const int IOCB_CMD_FDSYNC = 3; + // 4 was the experimental IOCB_CMD_PREADX + internal const int IOCB_CMD_POLL = 5; + internal const int IOCB_CMD_NOOP = 6; + internal const int IOCB_CMD_PREADV = 7; + internal const int IOCB_CMD_PWRITEV = 8; + } + + [DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_IsAioSupported")] + internal static extern bool IsAioSupported(); + + [DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_IoSetup", SetLastError = true)] + internal static extern unsafe int IoSetup(uint eventsCount, AioContext* context); + + [DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_IoDestroy", SetLastError = true)] + internal static extern unsafe int IoDestroy(AioRing* ring); + + [DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_IoSubmit", SetLastError = true)] + internal static extern unsafe int IoSubmit(AioRing* ring, long controlBlocksCount, IoControlBlock** ioControlBlocks); + + [DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_IoGetEvents", SetLastError = true)] + internal static extern unsafe int IoGetEvents(AioRing* ring, long minimumEventsCount, long maximumEventsCount, IoEvent* ioEvents); + } +} diff --git a/src/libraries/Native/Unix/Common/pal_config.h.in b/src/libraries/Native/Unix/Common/pal_config.h.in index 701c2c5dc1a65c..7ded10c4b7d088 100644 --- a/src/libraries/Native/Unix/Common/pal_config.h.in +++ b/src/libraries/Native/Unix/Common/pal_config.h.in @@ -96,6 +96,7 @@ #cmakedefine01 HAVE_TCP_H_TCP_KEEPALIVE #cmakedefine01 HAVE_BUILTIN_MUL_OVERFLOW #cmakedefine01 HAVE_DISCONNECTX +#cmakedefine01 HAVE_LINUX_AIO // Mac OS X has stat64, but it is deprecated since plain stat now // provides the same 64-bit aware struct when targeting OS X > 10.5 diff --git a/src/libraries/Native/Unix/System.Native/pal_networking.c b/src/libraries/Native/Unix/System.Native/pal_networking.c index 444aa1ad09a09f..f9bd81c8b6b09d 100644 --- a/src/libraries/Native/Unix/System.Native/pal_networking.c +++ b/src/libraries/Native/Unix/System.Native/pal_networking.c @@ -56,6 +56,10 @@ #if HAVE_LINUX_CAN_H #include #endif +#if HAVE_LINUX_AIO +#include +#include +#endif #if HAVE_KQUEUE #if KEVENT_HAS_VOID_UDATA static void* GetKeventUdata(uintptr_t udata) @@ -2879,3 +2883,52 @@ uint32_t SystemNative_InterfaceNameToIndex(char* interfaceName) interfaceName++; return if_nametoindex(interfaceName); } + +int32_t SystemNative_IsAioSupported(void) +{ +#if HAVE_LINUX_AIO + return true; +#else + return false; +#endif +} + +int32_t SystemNative_IoSetup(uint32_t eventsCount, AioContext* context) +{ +#if HAVE_LINUX_AIO + return (int32_t)syscall(__NR_io_setup, eventsCount, context); +#else + errno = ENOTSUP; + return -1; +#endif +} + +int32_t SystemNative_IoDestroy(AioRing* ring) +{ +#if HAVE_LINUX_AIO + return (int32_t)syscall(__NR_io_destroy, ring); +#else + errno = ENOTSUP; + return -1; +#endif +} + +int32_t SystemNative_IoSubmit(AioRing* ring, int64_t controlBlocksCount, IoControlBlock** ioControlBlocks) +{ +#if HAVE_LINUX_AIO + return (int32_t)syscall(__NR_io_submit, ring, controlBlocksCount, ioControlBlocks); +#else + errno = ENOTSUP; + return -1; +#endif +} + +int32_t SystemNative_IoGetEvents(AioRing* ring, int64_t minEventsCount, int64_t maxEventsCount, IoEvent* ioEvents) +{ +#if HAVE_LINUX_AIO + return (int32_t)syscall(__NR_io_getevents, ring, minEventsCount, maxEventsCount, ioEvents, NULL); // NULL is the timeout +#else + errno = ENOTSUP; + return -1; +#endif +} diff --git a/src/libraries/Native/Unix/System.Native/pal_networking.h b/src/libraries/Native/Unix/System.Native/pal_networking.h index e92f39a0f07f5e..77f22711220aec 100644 --- a/src/libraries/Native/Unix/System.Native/pal_networking.h +++ b/src/libraries/Native/Unix/System.Native/pal_networking.h @@ -310,6 +310,47 @@ typedef struct uint32_t Padding; // Pad out to 8-byte alignment } SocketEvent; +typedef struct +{ + uint64_t Data; + uint64_t Obj; + int64_t Res; + int64_t Res2; +} IoEvent; + +typedef struct +{ + uint64_t AioData; + uint32_t AioKey; + int32_t AioRwFlags; + uint16_t AioLioOpcode; + int16_t AioReqprio; + uint32_t AioFildes; + uint64_t AioBuf; + uint64_t AioNbytes; + int64_t AioOffset; + uint64_t AioReserved2; + uint32_t AioFlags; + uint32_t AioResfd; +} IoControlBlock; + +typedef struct +{ + uint32_t Id; + uint32_t Nr; + uint32_t Head; + uint32_t Tail; + uint32_t Magic; + uint32_t CompatFeatures; + uint32_t IncompatFeatures; + uint32_t HeaderLength; +} AioRing; + +typedef struct +{ + AioRing* Ring; +} AioContext; + DLLEXPORT int32_t SystemNative_GetHostEntryForName(const uint8_t* address, HostEntry* entry); DLLEXPORT void SystemNative_FreeHostEntry(HostEntry* entry); @@ -424,3 +465,13 @@ DLLEXPORT int32_t SystemNative_SendFile(intptr_t out_fd, intptr_t in_fd, int64_t DLLEXPORT int32_t SystemNative_Disconnect(intptr_t socket); DLLEXPORT uint32_t SystemNative_InterfaceNameToIndex(char* interfaceName); + +DLLEXPORT int32_t SystemNative_IsAioSupported(void); + +DLLEXPORT int32_t SystemNative_IoSetup(uint32_t eventsCount, AioContext* context); + +DLLEXPORT int32_t SystemNative_IoDestroy(AioRing* ring); + +DLLEXPORT int32_t SystemNative_IoSubmit(AioRing* ring, int64_t controlBlocksCount, IoControlBlock** ioControlBlocks); + +DLLEXPORT int32_t SystemNative_IoGetEvents(AioRing* ring, int64_t minEventsCount, int64_t maxEventsCount, IoEvent* ioEvents); diff --git a/src/libraries/Native/Unix/configure.cmake b/src/libraries/Native/Unix/configure.cmake index bf323f6bae16b7..be759b24ba3278 100644 --- a/src/libraries/Native/Unix/configure.cmake +++ b/src/libraries/Native/Unix/configure.cmake @@ -839,6 +839,17 @@ check_c_source_compiles( " HAVE_BUILTIN_MUL_OVERFLOW) +check_c_source_compiles( + " + #include + int main(void) + { + int x = IOCB_CMD_PREAD; + return x; + } + " + HAVE_LINUX_AIO) + configure_file( ${CMAKE_CURRENT_SOURCE_DIR}/Common/pal_config.h.in ${CMAKE_CURRENT_BINARY_DIR}/Common/pal_config.h) diff --git a/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj b/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj index f105c12ef91467..a1d92c729eba68 100644 --- a/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj +++ b/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj @@ -287,6 +287,9 @@ Common\Interop\Unix\System.Native\Interop.Accept.cs + + Common\Interop\Unix\System.Native\Interop.Aio.cs + Common\Interop\Unix\System.Native\Interop.Bind.cs @@ -403,7 +406,4 @@ - - - \ No newline at end of file diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index 690119761c2717..28e2c7f5f5fb5d 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using Microsoft.Win32.SafeHandles; +using System.Buffers; using System.Collections.Generic; using System.Diagnostics; using System.Runtime.CompilerServices; @@ -56,6 +57,7 @@ private void ReturnOperation(BufferMemoryReceiveOperation operation) { operation.Reset(); operation.Buffer = default; + operation.PinHandle = default; operation.Callback = null; operation.SocketAddress = null; Volatile.Write(ref _cachedBufferMemoryReceiveOperation, operation); // benign race condition @@ -74,6 +76,7 @@ private void ReturnOperation(BufferMemorySendOperation operation) { operation.Reset(); operation.Buffer = default; + operation.PinHandle = default; operation.Callback = null; operation.SocketAddress = null; Volatile.Write(ref _cachedBufferMemorySendOperation, operation); // benign race condition @@ -108,7 +111,7 @@ private BufferListSendOperation RentBufferListSendOperation() => Interlocked.Exchange(ref _cachedBufferListSendOperation, null) ?? new BufferListSendOperation(this); - private abstract class AsyncOperation : IThreadPoolWorkItem + internal abstract class AsyncOperation : IThreadPoolWorkItem { private enum State { @@ -138,6 +141,14 @@ public ManualResetEventSlim Event set { CallbackOrEvent = value; } } + public bool IsSync => CallbackOrEvent is ManualResetEventSlim; + + public bool IsCompleted => Volatile.Read(ref _state) == (int)State.Complete; + + public bool IsCancelled => Volatile.Read(ref _state) == (int)State.Cancelled; + + public bool IsWaiting => Volatile.Read(ref _state) == (int)State.Waiting; + public AsyncOperation(SocketAsyncContext context) { AssociatedContext = context; @@ -165,18 +176,7 @@ public bool TryComplete(SocketAsyncContext context) } public bool TrySetRunning() - { - State oldState = (State)Interlocked.CompareExchange(ref _state, (int)State.Running, (int)State.Waiting); - if (oldState == State.Cancelled) - { - // This operation has already been cancelled, and had its completion processed. - // Simply return false to indicate no further processing is needed. - return false; - } - - Debug.Assert(oldState == (int)State.Waiting); - return true; - } + => (State)Interlocked.CompareExchange(ref _state, (int)State.Running, (int)State.Waiting) == State.Waiting; public void SetComplete() { @@ -310,6 +310,110 @@ public void DoAbort() public abstract void InvokeCallback(bool allowPooling); + internal virtual bool TryBatch(SocketAsyncContext context, int id, ref Interop.Sys.IoControlBlock ioControlBlock) => false; + + internal virtual void HandleBatchEvent(in Interop.Sys.IoEvent ioControlBlock) + { + Debug.Fail("Expected to be implemented only by types overriding TryAsBatch"); + throw new InvalidOperationException(); + } + + protected bool CanRetry(SocketError error) => error == SocketError.TryAgain || error == SocketError.WouldBlock; + + // todo: move this method somewhere else + protected static SocketError Map(Interop.Error error) + { + switch (error) + { + case Interop.Error.EACCES: + return SocketError.AccessDenied; + case Interop.Error.EADDRINUSE: + return SocketError.AddressAlreadyInUse; + case Interop.Error.EADDRNOTAVAIL: + return SocketError.AddressNotAvailable; + case Interop.Error.EAFNOSUPPORT: + return SocketError.AddressFamilyNotSupported; + case (Interop.Error)11: + case Interop.Error.EAGAIN: + return SocketError.WouldBlock; + case Interop.Error.EALREADY: + return SocketError.AlreadyInProgress; + case Interop.Error.EBADF: + case Interop.Error.ECANCELED: + return SocketError.OperationAborted; + case Interop.Error.ECONNABORTED: + return SocketError.ConnectionAborted; + case Interop.Error.ECONNREFUSED: + return SocketError.ConnectionRefused; + case Interop.Error.ECONNRESET: + return SocketError.ConnectionReset; + case Interop.Error.EDESTADDRREQ: + return SocketError.DestinationAddressRequired; + case Interop.Error.EFAULT: + return SocketError.Fault; + case Interop.Error.EHOSTDOWN: + return SocketError.HostDown; + case Interop.Error.ENXIO: + return SocketError.HostNotFound; + case Interop.Error.EHOSTUNREACH: + return SocketError.HostUnreachable; + case Interop.Error.EINPROGRESS: + return SocketError.InProgress; + case Interop.Error.EINTR: + return SocketError.Interrupted; + case Interop.Error.EINVAL: + return SocketError.InvalidArgument; + case Interop.Error.EISCONN: + return SocketError.IsConnected; + case Interop.Error.EMFILE: + return SocketError.TooManyOpenSockets; + case Interop.Error.EMSGSIZE: + return SocketError.MessageSize; + case Interop.Error.ENETDOWN: + return SocketError.NetworkDown; + case Interop.Error.ENETRESET: + return SocketError.NetworkReset; + case Interop.Error.ENETUNREACH: + return SocketError.NetworkUnreachable; + case Interop.Error.ENFILE: + return SocketError.TooManyOpenSockets; + case Interop.Error.ENOBUFS: + return SocketError.NoBufferSpaceAvailable; + case Interop.Error.ENODATA: + return SocketError.NoData; + case Interop.Error.ENOENT: + return SocketError.AddressNotAvailable; + case Interop.Error.ENOPROTOOPT: + return SocketError.ProtocolOption; + case Interop.Error.ENOTCONN: + return SocketError.NotConnected; + case Interop.Error.ENOTSOCK: + return SocketError.NotSocket; + case Interop.Error.ENOTSUP: + return SocketError.OperationNotSupported; + case Interop.Error.EPERM: + return SocketError.AccessDenied; + case Interop.Error.EPIPE: + return SocketError.Shutdown; + case Interop.Error.EPFNOSUPPORT: + return SocketError.ProtocolFamilyNotSupported; + case Interop.Error.EPROTONOSUPPORT: + return SocketError.ProtocolNotSupported; + case Interop.Error.EPROTOTYPE: + return SocketError.ProtocolType; + case Interop.Error.ESOCKTNOSUPPORT: + return SocketError.SocketNotSupported; + case Interop.Error.ESHUTDOWN: + return SocketError.Disconnecting; + case Interop.Error.SUCCESS: + return SocketError.Success; + case Interop.Error.ETIMEDOUT: + return SocketError.TimedOut; + default: + throw new IndexOutOfRangeException($"Unexpected error: {error}"); + } + } + [Conditional("SOCKETASYNCCONTEXT_TRACE")] public void Trace(string message, [CallerMemberName] string memberName = null) { @@ -329,14 +433,36 @@ private abstract class ReadOperation : AsyncOperation, IThreadPoolWorkItem { public ReadOperation(SocketAsyncContext context) : base(context) { } - void IThreadPoolWorkItem.Execute() => AssociatedContext.ProcessAsyncReadOperation(this); + void IThreadPoolWorkItem.Execute() + { + if (IsCompleted) + { + CancellationRegistration.Dispose(); + InvokeCallback(allowPooling: true); + } + else + { + AssociatedContext.ProcessAsyncReadOperation(this); + } + } } private abstract class WriteOperation : AsyncOperation, IThreadPoolWorkItem { public WriteOperation(SocketAsyncContext context) : base(context) { } - void IThreadPoolWorkItem.Execute() => AssociatedContext.ProcessAsyncWriteOperation(this); + void IThreadPoolWorkItem.Execute() + { + if (IsCompleted) + { + CancellationRegistration.Dispose(); + InvokeCallback(allowPooling: true); + } + else + { + AssociatedContext.ProcessAsyncWriteOperation(this); + } + } } private abstract class SendOperation : WriteOperation @@ -362,6 +488,7 @@ public override void InvokeCallback(bool allowPooling) => private sealed class BufferMemorySendOperation : SendOperation { public Memory Buffer; + internal MemoryHandle PinHandle; public BufferMemorySendOperation(SocketAsyncContext context) : base(context) { } @@ -386,6 +513,55 @@ public override void InvokeCallback(bool allowPooling) cb(bt, sa, sal, SocketFlags.None, ec); } + + internal override unsafe bool TryBatch(SocketAsyncContext context, int id, ref Interop.Sys.IoControlBlock ioControlBlock) + { + if (IsSync || !TrySetRunning()) + { + return false; + } + + PinHandle = Buffer.Pin(); + + ioControlBlock.AioData = (ulong)id; + ioControlBlock.AioLioOpcode = Interop.Sys.IoControlBlockFlags.IOCB_CMD_PWRITE; + ioControlBlock.AioFildes = (uint)context._socket.DangerousGetHandle().ToInt32(); + ioControlBlock.AioBuf = (ulong)PinHandle.Pointer; + ioControlBlock.AioNbytes = (ulong)Count; + ioControlBlock.AioOffset = Offset; + + return true; + } + + internal override void HandleBatchEvent(in Interop.Sys.IoEvent ioControlBlock) + { + PinHandle.Dispose(); + + if (ioControlBlock.Res < 0) + { + ErrorCode = Map((Interop.Error)(-ioControlBlock.Res)); + + if (CanRetry(ErrorCode)) + { + SetWaiting(); + } + else + { + SetComplete(); + } + } + else + { + BytesTransferred = (int)ioControlBlock.Res; + Offset += (int)ioControlBlock.Res; + Count -= (int)ioControlBlock.Res; + ErrorCode = SocketError.Success; + + SetComplete(); + } + + AssociatedContext.ProcessBatchWriteOperation(this); + } } private sealed class BufferListSendOperation : SendOperation @@ -428,6 +604,51 @@ protected override bool DoTryComplete(SocketAsyncContext context) int bufferIndex = 0; return SocketPal.TryCompleteSendTo(context._socket, new ReadOnlySpan(BufferPtr, Offset + Count), null, ref bufferIndex, ref Offset, ref Count, Flags, SocketAddress, SocketAddressLen, ref BytesTransferred, out ErrorCode); } + + internal override bool TryBatch(SocketAsyncContext context, int id, ref Interop.Sys.IoControlBlock ioControlBlock) + { + if (IsSync || !TrySetRunning()) + { + return false; + } + + ioControlBlock.AioData = (ulong)id; + ioControlBlock.AioLioOpcode = Interop.Sys.IoControlBlockFlags.IOCB_CMD_PWRITE; + ioControlBlock.AioFildes = (uint)context._socket.DangerousGetHandle().ToInt32(); + ioControlBlock.AioBuf = (ulong)BufferPtr; + ioControlBlock.AioNbytes = (ulong)Count; + ioControlBlock.AioOffset = Offset; + + return true; + } + + internal override void HandleBatchEvent(in Interop.Sys.IoEvent ioControlBlock) + { + if (ioControlBlock.Res < 0) + { + ErrorCode = Map((Interop.Error)(-ioControlBlock.Res)); + + if (CanRetry(ErrorCode)) + { + SetWaiting(); + } + else + { + SetComplete(); + } + } + else + { + BytesTransferred = (int)ioControlBlock.Res; + Offset += (int)ioControlBlock.Res; + Count -= (int)ioControlBlock.Res; + ErrorCode = SocketError.Success; + + SetComplete(); + } + + AssociatedContext.ProcessBatchWriteOperation(this); + } } private abstract class ReceiveOperation : ReadOperation @@ -453,6 +674,7 @@ public override void InvokeCallback(bool allowPooling) => private sealed class BufferMemoryReceiveOperation : ReceiveOperation { public Memory Buffer; + internal MemoryHandle PinHandle; public BufferMemoryReceiveOperation(SocketAsyncContext context) : base(context) { } @@ -489,6 +711,53 @@ public override void InvokeCallback(bool allowPooling) cb(bt, sa, sal, rf, ec); } + + internal override unsafe bool TryBatch(SocketAsyncContext context, int id, ref Interop.Sys.IoControlBlock ioControlBlock) + { + if (Buffer.Length == 0 || IsSync || !TrySetRunning()) + { + return false; + } + + PinHandle = Buffer.Pin(); + + ioControlBlock.AioData = (ulong)id; + ioControlBlock.AioLioOpcode = (ushort)Interop.Sys.IoControlBlockFlags.IOCB_CMD_PREAD; + ioControlBlock.AioFildes = (uint)context._socket.DangerousGetHandle().ToInt32(); + ioControlBlock.AioBuf = (ulong)PinHandle.Pointer; + ioControlBlock.AioNbytes = (ulong)Buffer.Length; + ioControlBlock.AioOffset = 0; + + return true; + } + + internal override void HandleBatchEvent(in Interop.Sys.IoEvent ioControlBlock) + { + PinHandle.Dispose(); + + if (ioControlBlock.Res < 0) + { + ErrorCode = Map((Interop.Error)(-ioControlBlock.Res)); + + if (CanRetry(ErrorCode)) + { + SetWaiting(); + } + else + { + SetComplete(); + } + } + else + { + BytesTransferred = (int)ioControlBlock.Res; + ErrorCode = SocketError.Success; + + SetComplete(); + } + + AssociatedContext.ProcessBatchReadOperation(this); + } } private sealed class BufferListReceiveOperation : ReceiveOperation @@ -527,6 +796,49 @@ public BufferPtrReceiveOperation(SocketAsyncContext context) : base(context) { } protected override bool DoTryComplete(SocketAsyncContext context) => SocketPal.TryCompleteReceiveFrom(context._socket, new Span(BufferPtr, Length), null, Flags, SocketAddress, ref SocketAddressLen, out BytesTransferred, out ReceivedFlags, out ErrorCode); + + internal override bool TryBatch(SocketAsyncContext context, int id, ref Interop.Sys.IoControlBlock ioControlBlock) + { + if (IsSync || !TrySetRunning()) + { + return false; + } + + ioControlBlock.AioData = (ulong)id; + ioControlBlock.AioLioOpcode = Interop.Sys.IoControlBlockFlags.IOCB_CMD_PREAD; + ioControlBlock.AioFildes = (uint)context._socket.DangerousGetHandle().ToInt32(); + ioControlBlock.AioBuf = (ulong)BufferPtr; + ioControlBlock.AioNbytes = (ulong)Length; + ioControlBlock.AioOffset = 0; + + return true; + } + + internal override void HandleBatchEvent(in Interop.Sys.IoEvent ioControlBlock) + { + if (ioControlBlock.Res < 0) + { + ErrorCode = Map((Interop.Error)(-ioControlBlock.Res)); + + if (CanRetry(ErrorCode)) + { + SetWaiting(); + } + else + { + SetComplete(); + } + } + else + { + BytesTransferred = (int)ioControlBlock.Res; + ErrorCode = SocketError.Success; + + SetComplete(); + } + + AssociatedContext.ProcessBatchReadOperation(this); + } } private sealed class ReceiveMessageFromOperation : ReadOperation @@ -712,6 +1024,7 @@ private enum QueueState : byte // If this happens, we MUST retry the operation, otherwise we risk // "losing" the notification and causing the operation to pend indefinitely. private AsyncOperation _tail; // Queue of pending IO operations to process when data becomes available. + private int _batchedOperationsCount; // The _queueLock is used to ensure atomic access to the queue state above. // The lock is only ever held briefly, to read and/or update queue state, and @@ -727,6 +1040,7 @@ public void Init() _state = QueueState.Ready; _sequenceNumber = 0; + _batchedOperationsCount = 0; } // IsReady returns the current _sequenceNumber, which must be passed to StartAsyncOperation below. @@ -869,6 +1183,59 @@ public void HandleEvent(SocketAsyncContext context) op.Dispatch(); } + internal void BatchOrDispatch(SocketAsyncContext context, Span ioControlBlocks, Span batchedOperations, ref int batchedCount) + { + AsyncOperation nextOperation; + using (Lock()) + { + switch (_state) + { + case QueueState.Ready: + Debug.Assert(_tail == null, "State == Ready but queue is not empty!"); + _sequenceNumber++; + return; + + case QueueState.Waiting: + Debug.Assert(_tail != null, "State == Waiting but queue is empty!"); + _state = QueueState.Processing; + nextOperation = _tail.Next; + + while (batchedCount < ioControlBlocks.Length && nextOperation.TryBatch(context, batchedCount, ref ioControlBlocks[batchedCount])) + { + batchedOperations[batchedCount++] = nextOperation; + _batchedOperationsCount++; + + if (nextOperation == nextOperation.Next) + { + // we have batched some operations and reached the end of the queue + return; + } + + nextOperation = nextOperation.Next; + } + break; + + case QueueState.Processing: + Debug.Assert(_tail != null, "State == Processing but queue is empty!"); + _sequenceNumber++; + return; + + case QueueState.Stopped: + Debug.Assert(_tail == null); + return; + + default: + Environment.FailFast("unexpected queue state"); + return; + } + + // we can't batch more (batchedCount < ioControlBlocks.Length) + // or nextOperation.TryAsBatch failed + // so we dispatch outside of lock + nextOperation?.Dispatch(); + } + } + internal void ProcessAsyncOperation(TOperation op) { OperationResult result = ProcessQueuedOperation(op); @@ -1008,6 +1375,47 @@ public OperationResult ProcessQueuedOperation(TOperation op) return (wasCompleted ? OperationResult.Completed : OperationResult.Cancelled); } + public void ProcessBatchedOperation(TOperation op) + { + using (Lock()) + { + _batchedOperationsCount--; + + if (_state == QueueState.Stopped) + { + Debug.Assert(_tail == null); + return; + } + + if (op.IsWaiting) + { + _state = QueueState.Waiting; + } + else if (op == _tail) + { + // No more operations to process + _tail = null; + _state = QueueState.Ready; + _sequenceNumber++; + } + else if (_tail.Next == op) + { + // Pop current operation and advance to next + _tail.Next = op.Next; + } + else + { + throw new Exception($"Something went wrong, {_batchedOperationsCount}"); + } + } + + // todo: probably this is not the best place to call this stuff + if (op.IsCompleted) + { + ThreadPool.UnsafeQueueUserWorkItem(op, preferLocal: false); + } + } + public void CancelAndContinueProcessing(TOperation op) { // Note, only sync operations use this method. @@ -1296,6 +1704,10 @@ private bool ShouldRetrySyncOperation(out SocketError errorCode) private void ProcessAsyncWriteOperation(WriteOperation op) => _sendQueue.ProcessAsyncOperation(op); + private void ProcessBatchReadOperation(ReadOperation op) => _receiveQueue.ProcessBatchedOperation(op); + + private void ProcessBatchWriteOperation(WriteOperation op) => _sendQueue.ProcessBatchedOperation(op); + public SocketError Accept(byte[] socketAddress, ref int socketAddressLen, out IntPtr acceptedFd) { Debug.Assert(socketAddress != null, "Expected non-null socketAddress"); @@ -1946,7 +2358,7 @@ public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long co return SocketError.IOPending; } - public unsafe void HandleEvents(Interop.Sys.SocketEvents events) + public void HandleEvents(Interop.Sys.SocketEvents events) { if ((events & Interop.Sys.SocketEvents.Error) != 0) { @@ -1966,6 +2378,21 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events) } } + public void AddWaitingOperationsToBatch(Interop.Sys.SocketEvents events, in Span ioControlBlocks, in Span batchedOperations, ref int batchedCount) + { + Debug.Assert((events & Interop.Sys.SocketEvents.Error) == 0, "This method must not be used for handling errors!"); + + if ((events & Interop.Sys.SocketEvents.Read) != 0) + { + _receiveQueue.BatchOrDispatch(this, ioControlBlocks, batchedOperations, ref batchedCount); + } + + if ((events & Interop.Sys.SocketEvents.Write) != 0) + { + _sendQueue.BatchOrDispatch(this, ioControlBlocks, batchedOperations, ref batchedCount); + } + } + // // Tracing stuff // diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 84f733c314cc32..23ce3285f56ff2 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -75,6 +75,10 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error) private readonly IntPtr _port; private readonly Interop.Sys.SocketEvent* _buffer; + private readonly Interop.Sys.AioContext _aioContext; + private readonly Interop.Sys.IoEvent* _aioEvents; + private readonly Interop.Sys.IoControlBlock* _aioBlocks; + private readonly Interop.Sys.IoControlBlock** _aioBlocksPointers; // // The read and write ends of a native pipe, used to signal that this instance's event loop should stop @@ -136,6 +140,8 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error) // private bool IsFull { get { return _nextHandle == MaxHandles; } } + private bool IsAio => _aioContext.Ring != null; + // True if we've don't have sufficient active sockets to allow allocating a new engine. private bool HasLowNumberOfSockets { @@ -277,6 +283,29 @@ private SocketAsyncEngine() throw new InternalException(err); } + Interop.Sys.AioContext aioContext = default; + if (Interop.Sys.IoSetup(EventBufferCount, &aioContext) == 0) + { + _aioEvents = (Interop.Sys.IoEvent*)Marshal.AllocHGlobal(sizeof(Interop.Sys.IoEvent) * EventBufferCount); + _aioBlocks = (Interop.Sys.IoControlBlock*)Marshal.AllocHGlobal(sizeof(Interop.Sys.IoControlBlock) * EventBufferCount); + _aioBlocksPointers = (Interop.Sys.IoControlBlock**)Marshal.AllocHGlobal(sizeof(Interop.Sys.IoControlBlock*) * EventBufferCount); + + // Marshal.AllocHGlobal allocated memory might contain some garbage + new Span(_aioEvents, EventBufferCount).Clear(); + new Span(_aioBlocks, EventBufferCount).Clear(); + + for (int i = 0; i < EventBufferCount; i++) + { + _aioBlocksPointers[i] = &_aioBlocks[i]; + } + + _aioContext = aioContext; + } + else + { + Debug.Assert(Interop.Sys.IsAioSupported() == false, "When AIO is supported IoSetup should always succeed"); + } + // // Start the event loop on its own thread. // @@ -307,44 +336,118 @@ private void EventLoop() { try { - bool shutdown = false; - while (!shutdown) + if (IsAio) + { + EventLoopBodyAio(); + } + else { - int numEvents = EventBufferCount; - Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, _buffer, &numEvents); - if (err != Interop.Error.SUCCESS) + EventLoopBody(); + } + + FreeNativeResources(); + } + catch (Exception e) + { + Environment.FailFast("Exception thrown from SocketAsyncEngine event loop: " + e.ToString(), e); + } + } + + private void EventLoopBody() + { + bool shutdown = false; + while (!shutdown) + { + int numEvents = EventBufferCount; + Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, _buffer, &numEvents); + if (err != Interop.Error.SUCCESS) + { + throw new InternalException(err); + } + + // The native shim is responsible for ensuring this condition. + Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}"); + + for (int i = 0; i < numEvents; i++) + { + IntPtr handle = _buffer[i].Data; + if (handle == ShutdownHandle) { - throw new InternalException(err); + shutdown = true; } + else + { + Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}"); + _handleToContextMap.TryGetValue(handle, out SocketAsyncContext context); + if (context != null) + { + context.HandleEvents(_buffer[i].Events); + context = null; + } + } + } + } + } - // The native shim is responsible for ensuring this condition. - Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}"); + private void EventLoopBodyAio() + { + bool shutdown = false; + SocketAsyncContext.AsyncOperation[] batchedOperations = new SocketAsyncContext.AsyncOperation[EventBufferCount]; + Span ioControlBlocks = new Span(_aioBlocks, EventBufferCount); + while (!shutdown) + { + int numEvents = EventBufferCount; + Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, _buffer, &numEvents); + if (err != Interop.Error.SUCCESS) + { + throw new InternalException(err); + } - for (int i = 0; i < numEvents; i++) + var socketEvents = new ReadOnlySpan(_buffer, numEvents); + int batchedCount = 0; + foreach (var socketEvent in socketEvents) + { + if (socketEvent.Data == ShutdownHandle) + { + shutdown = true; + } + else if (_handleToContextMap.TryGetValue(socketEvent.Data, out SocketAsyncContext context)) { - IntPtr handle = _buffer[i].Data; - if (handle == ShutdownHandle) + if ((socketEvent.Events & Interop.Sys.SocketEvents.Error) != 0) { - shutdown = true; + // there was an error, we use the non-buffered execution path (this should be very rare) + context.HandleEvents(socketEvent.Events); } else { - Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}"); - _handleToContextMap.TryGetValue(handle, out SocketAsyncContext context); - if (context != null) - { - context.HandleEvents(_buffer[i].Events); - context = null; - } + context.AddWaitingOperationsToBatch(socketEvent.Events, ioControlBlocks, batchedOperations, ref batchedCount); } } } - FreeNativeResources(); - } - catch (Exception e) - { - Environment.FailFast("Exception thrown from SocketAsyncEngine event loop: " + e.ToString(), e); + if (batchedCount > 0) + { + int result = Interop.Sys.IoSubmit(_aioContext.Ring, batchedCount, _aioBlocksPointers); + if (result != batchedCount) + { + Interop.Error lastError = Interop.Sys.GetLastError(); + throw new InternalException($"IoSubmit has failed with {lastError}, returned {result} instead {batchedCount}"); // todo: implement a while loop + } + + // todo: perf: avoid the syscall by using a well known pattern that reads from the ring + result = Interop.Sys.IoGetEvents(_aioContext.Ring, batchedCount, batchedCount, _aioEvents); + if (result != batchedCount) + { + Interop.Error lastError = Interop.Sys.GetLastError(); + throw new InternalException($"IoGetEvents has failed with {lastError}, returned {result} instead {batchedCount}"); + } + + ReadOnlySpan events = new ReadOnlySpan(_aioEvents, batchedCount); + for (int i = 0; i < events.Length; i++) + { + batchedOperations[events[i].Data].HandleBatchEvent(in events[i]); + } + } } } @@ -379,6 +482,13 @@ private void FreeNativeResources() { Interop.Sys.CloseSocketEventPort(_port); } + if (_aioContext.Ring != null) + { + Interop.Sys.IoDestroy(_aioContext.Ring); + Marshal.FreeHGlobal(new IntPtr((void*)_aioEvents)); + Marshal.FreeHGlobal(new IntPtr((void*)_aioBlocksPointers)); + Marshal.FreeHGlobal(new IntPtr((void*)_aioBlocks)); + } } private bool TryRegister(SafeSocketHandle socket, IntPtr handle, out Interop.Error error)