Skip to content

Commit f3dad51

Browse files
authored
Merge pull request #991 from adamhathcock/async-reader-methods
Add more Async tests and complete Zip tests
2 parents aa1c0d0 + f518408 commit f3dad51

14 files changed

Lines changed: 1331 additions & 82 deletions

src/SharpCompress/Common/Zip/StreamingZipHeaderFactory.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,10 @@ internal IEnumerable<ZipHeader> ReadStreamHeader(Stream stream)
3636
throw new ArgumentException("Stream must be a SharpCompressStream", nameof(stream));
3737
}
3838
}
39-
SharpCompressStream rewindableStream = (SharpCompressStream)stream;
39+
var rewindableStream = (SharpCompressStream)stream;
4040

4141
while (true)
4242
{
43-
ZipHeader? header;
4443
var reader = new BinaryReader(rewindableStream);
4544
uint headerBytes = 0;
4645
if (
@@ -155,7 +154,7 @@ internal IEnumerable<ZipHeader> ReadStreamHeader(Stream stream)
155154
}
156155

157156
_lastEntryHeader = null;
158-
header = ReadHeader(headerBytes, reader);
157+
var header = ReadHeader(headerBytes, reader);
159158
if (header is null)
160159
{
161160
yield break;

src/SharpCompress/Compressors/Deflate64/Deflate64Stream.cs

Lines changed: 110 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for more information.
44

5-
#nullable disable
6-
75
using System;
86
using System.Diagnostics;
97
using System.IO;
108
using System.Runtime.CompilerServices;
9+
using System.Threading;
10+
using System.Threading.Tasks;
1111
using SharpCompress.Common;
1212
using SharpCompress.Common.Zip;
1313
using SharpCompress.IO;
@@ -39,7 +39,6 @@ void IStreamStack.SetPosition(long position) { }
3939
private const int DEFAULT_BUFFER_SIZE = 8192;
4040

4141
private Stream _stream;
42-
private CompressionMode _mode;
4342
private InflaterManaged _inflater;
4443
private byte[] _buffer;
4544

@@ -62,61 +61,23 @@ public Deflate64Stream(Stream stream, CompressionMode mode)
6261
throw new ArgumentException("Deflate64: input stream is not readable", nameof(stream));
6362
}
6463

65-
InitializeInflater(stream, ZipCompressionMethod.Deflate64);
66-
#if DEBUG_STREAMS
67-
this.DebugConstruct(typeof(Deflate64Stream));
68-
#endif
69-
}
70-
71-
/// <summary>
72-
/// Sets up this DeflateManagedStream to be used for Inflation/Decompression
73-
/// </summary>
74-
private void InitializeInflater(
75-
Stream stream,
76-
ZipCompressionMethod method = ZipCompressionMethod.Deflate
77-
)
78-
{
79-
Debug.Assert(stream != null);
80-
Debug.Assert(
81-
method == ZipCompressionMethod.Deflate || method == ZipCompressionMethod.Deflate64
82-
);
8364
if (!stream.CanRead)
8465
{
8566
throw new ArgumentException("Deflate64: input stream is not readable", nameof(stream));
8667
}
8768

88-
_inflater = new InflaterManaged(method == ZipCompressionMethod.Deflate64);
69+
_inflater = new InflaterManaged(true);
8970

9071
_stream = stream;
91-
_mode = CompressionMode.Decompress;
9272
_buffer = new byte[DEFAULT_BUFFER_SIZE];
73+
#if DEBUG_STREAMS
74+
this.DebugConstruct(typeof(Deflate64Stream));
75+
#endif
9376
}
9477

95-
public override bool CanRead
96-
{
97-
get
98-
{
99-
if (_stream is null)
100-
{
101-
return false;
102-
}
103-
104-
return (_mode == CompressionMode.Decompress && _stream.CanRead);
105-
}
106-
}
107-
108-
public override bool CanWrite
109-
{
110-
get
111-
{
112-
if (_stream is null)
113-
{
114-
return false;
115-
}
78+
public override bool CanRead => _stream.CanRead;
11679

117-
return (_mode == CompressionMode.Compress && _stream.CanWrite);
118-
}
119-
}
80+
public override bool CanWrite => false;
12081

12182
public override bool CanSeek => false;
12283

@@ -138,7 +99,6 @@ public override void SetLength(long value) =>
13899

139100
public override int Read(byte[] array, int offset, int count)
140101
{
141-
EnsureDecompressionMode();
142102
ValidateParameters(array, offset, count);
143103
EnsureNotDisposed();
144104

@@ -185,6 +145,106 @@ public override int Read(byte[] array, int offset, int count)
185145
return count - remainingCount;
186146
}
187147

148+
public override async Task<int> ReadAsync(
149+
byte[] array,
150+
int offset,
151+
int count,
152+
CancellationToken cancellationToken
153+
)
154+
{
155+
ValidateParameters(array, offset, count);
156+
EnsureNotDisposed();
157+
158+
int bytesRead;
159+
var currentOffset = offset;
160+
var remainingCount = count;
161+
162+
while (true)
163+
{
164+
bytesRead = _inflater.Inflate(array, currentOffset, remainingCount);
165+
currentOffset += bytesRead;
166+
remainingCount -= bytesRead;
167+
168+
if (remainingCount == 0)
169+
{
170+
break;
171+
}
172+
173+
if (_inflater.Finished())
174+
{
175+
// if we finished decompressing, we can't have anything left in the outputwindow.
176+
Debug.Assert(
177+
_inflater.AvailableOutput == 0,
178+
"We should have copied all stuff out!"
179+
);
180+
break;
181+
}
182+
183+
var bytes = await _stream
184+
.ReadAsync(_buffer, 0, _buffer.Length, cancellationToken)
185+
.ConfigureAwait(false);
186+
if (bytes <= 0)
187+
{
188+
break;
189+
}
190+
else if (bytes > _buffer.Length)
191+
{
192+
// The stream is either malicious or poorly implemented and returned a number of
193+
// bytes larger than the buffer supplied to it.
194+
throw new InvalidFormatException("Deflate64: invalid data");
195+
}
196+
197+
_inflater.SetInput(_buffer, 0, bytes);
198+
}
199+
200+
return count - remainingCount;
201+
}
202+
203+
#if !NETFRAMEWORK && !NETSTANDARD2_0
204+
public override async ValueTask<int> ReadAsync(
205+
Memory<byte> buffer,
206+
CancellationToken cancellationToken = default
207+
)
208+
{
209+
EnsureNotDisposed();
210+
211+
// InflaterManaged doesn't have a Span-based Inflate method, so we need to work with arrays
212+
// For large buffers, we could rent from ArrayPool, but for simplicity we'll use the buffer's array if available
213+
if (
214+
System.Runtime.InteropServices.MemoryMarshal.TryGetArray<byte>(
215+
buffer,
216+
out var arraySegment
217+
)
218+
)
219+
{
220+
// Fast path: the Memory<byte> is backed by an array
221+
return await ReadAsync(
222+
arraySegment.Array!,
223+
arraySegment.Offset,
224+
arraySegment.Count,
225+
cancellationToken
226+
)
227+
.ConfigureAwait(false);
228+
}
229+
else
230+
{
231+
// Slow path: rent a temporary array
232+
var tempBuffer = System.Buffers.ArrayPool<byte>.Shared.Rent(buffer.Length);
233+
try
234+
{
235+
var bytesRead = await ReadAsync(tempBuffer, 0, buffer.Length, cancellationToken)
236+
.ConfigureAwait(false);
237+
tempBuffer.AsMemory(0, bytesRead).CopyTo(buffer);
238+
return bytesRead;
239+
}
240+
finally
241+
{
242+
System.Buffers.ArrayPool<byte>.Shared.Return(tempBuffer);
243+
}
244+
}
245+
}
246+
#endif
247+
188248
private void ValidateParameters(byte[] array, int offset, int count)
189249
{
190250
if (array is null)
@@ -220,26 +280,6 @@ private void EnsureNotDisposed()
220280
private static void ThrowStreamClosedException() =>
221281
throw new ObjectDisposedException(null, "Deflate64: stream has been disposed");
222282

223-
private void EnsureDecompressionMode()
224-
{
225-
if (_mode != CompressionMode.Decompress)
226-
{
227-
ThrowCannotReadFromDeflateManagedStreamException();
228-
}
229-
}
230-
231-
[MethodImpl(MethodImplOptions.NoInlining)]
232-
private static void ThrowCannotReadFromDeflateManagedStreamException() =>
233-
throw new InvalidOperationException("Deflate64: cannot read from this stream");
234-
235-
private void EnsureCompressionMode()
236-
{
237-
if (_mode != CompressionMode.Compress)
238-
{
239-
ThrowCannotWriteToDeflateManagedStreamException();
240-
}
241-
}
242-
243283
[MethodImpl(MethodImplOptions.NoInlining)]
244284
private static void ThrowCannotWriteToDeflateManagedStreamException() =>
245285
throw new InvalidOperationException("Deflate64: cannot write to this stream");
@@ -281,20 +321,17 @@ protected override void Dispose(bool disposing)
281321
#endif
282322
if (disposing)
283323
{
284-
_stream?.Dispose();
324+
_stream.Dispose();
285325
}
286326
}
287327
finally
288328
{
289-
_stream = null;
290-
291329
try
292330
{
293-
_inflater?.Dispose();
331+
_inflater.Dispose();
294332
}
295333
finally
296334
{
297-
_inflater = null;
298335
base.Dispose(disposing);
299336
}
300337
}

src/SharpCompress/IO/ReadOnlySubStream.cs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
22
using System.Diagnostics;
33
using System.IO;
4+
using System.Threading;
5+
using System.Threading.Tasks;
46

57
namespace SharpCompress.IO;
68

@@ -93,6 +95,47 @@ public override int Read(Span<byte> buffer)
9395
}
9496
#endif
9597

98+
public override async Task<int> ReadAsync(
99+
byte[] buffer,
100+
int offset,
101+
int count,
102+
CancellationToken cancellationToken
103+
)
104+
{
105+
if (BytesLeftToRead < count)
106+
{
107+
count = (int)BytesLeftToRead;
108+
}
109+
var read = await Stream
110+
.ReadAsync(buffer, offset, count, cancellationToken)
111+
.ConfigureAwait(false);
112+
if (read > 0)
113+
{
114+
BytesLeftToRead -= read;
115+
_position += read;
116+
}
117+
return read;
118+
}
119+
120+
#if !NETFRAMEWORK && !NETSTANDARD2_0
121+
public override async ValueTask<int> ReadAsync(
122+
Memory<byte> buffer,
123+
CancellationToken cancellationToken = default
124+
)
125+
{
126+
var sliceLen = BytesLeftToRead < buffer.Length ? BytesLeftToRead : buffer.Length;
127+
var read = await Stream
128+
.ReadAsync(buffer.Slice(0, (int)sliceLen), cancellationToken)
129+
.ConfigureAwait(false);
130+
if (read > 0)
131+
{
132+
BytesLeftToRead -= read;
133+
_position += read;
134+
}
135+
return read;
136+
}
137+
#endif
138+
96139
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
97140

98141
public override void SetLength(long value) => throw new NotSupportedException();

0 commit comments

Comments
 (0)