From 9e4fdad1166b931bf03e314b9d10651a6ead53d3 Mon Sep 17 00:00:00 2001 From: FaFre Date: Sun, 1 Aug 2021 13:04:34 +0700 Subject: [PATCH 1/3] implementation of cancellation token --- lib/http.dart | 74 ++++++++++++++------ lib/retry.dart | 3 + lib/src/base_client.dart | 62 +++++++++++------ lib/src/base_request.dart | 5 +- lib/src/browser_client.dart | 17 +++-- lib/src/cancelation_token.dart | 120 +++++++++++++++++++++++++++++++++ lib/src/client.dart | 33 ++++++--- lib/src/exception.dart | 15 +++++ lib/src/io_client.dart | 12 +++- lib/src/mock_client.dart | 25 ++++--- lib/src/multipart_request.dart | 5 +- lib/src/request.dart | 5 +- lib/src/streamed_request.dart | 6 +- test/http_retry_test.dart | 50 ++++++++++---- 14 files changed, 350 insertions(+), 82 deletions(-) create mode 100644 lib/src/cancelation_token.dart diff --git a/lib/http.dart b/lib/http.dart index 1ea751eab1..f2f70a20af 100644 --- a/lib/http.dart +++ b/lib/http.dart @@ -6,6 +6,7 @@ import 'dart:convert'; import 'dart:typed_data'; +import 'src/cancelation_token.dart'; import 'src/client.dart'; import 'src/exception.dart'; import 'src/request.dart'; @@ -16,6 +17,7 @@ export 'src/base_client.dart'; export 'src/base_request.dart'; export 'src/base_response.dart'; export 'src/byte_stream.dart'; +export 'src/cancelation_token.dart'; export 'src/client.dart'; export 'src/exception.dart'; export 'src/multipart_file.dart'; @@ -32,8 +34,10 @@ export 'src/streamed_response.dart'; /// the same server, you should use a single [Client] for all of those requests. /// /// For more fine-grained control over the request, use [Request] instead. -Future head(Uri url, {Map? headers}) => - _withClient((client) => client.head(url, headers: headers)); +Future head(Uri url, + {Map? headers, CancellationToken? cancellationToken}) => + _withClient((client) => client.head(url, + headers: headers, cancellationToken: cancellationToken)); /// Sends an HTTP GET request with the given headers to the given URL. /// @@ -42,8 +46,10 @@ Future head(Uri url, {Map? headers}) => /// the same server, you should use a single [Client] for all of those requests. /// /// For more fine-grained control over the request, use [Request] instead. -Future get(Uri url, {Map? headers}) => - _withClient((client) => client.get(url, headers: headers)); +Future get(Uri url, + {Map? headers, CancellationToken? cancellationToken}) => + _withClient((client) => client.get(url, + headers: headers, cancellationToken: cancellationToken)); /// Sends an HTTP POST request with the given headers and body to the given URL. /// @@ -64,9 +70,15 @@ Future get(Uri url, {Map? headers}) => /// For more fine-grained control over the request, use [Request] or /// [StreamedRequest] instead. Future post(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _withClient((client) => - client.post(url, headers: headers, body: body, encoding: encoding)); + {Map? headers, + CancellationToken? cancellationToken, + Object? body, + Encoding? encoding}) => + _withClient((client) => client.post(url, + headers: headers, + cancellationToken: cancellationToken, + body: body, + encoding: encoding)); /// Sends an HTTP PUT request with the given headers and body to the given URL. /// @@ -87,9 +99,15 @@ Future post(Uri url, /// For more fine-grained control over the request, use [Request] or /// [StreamedRequest] instead. Future put(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _withClient((client) => - client.put(url, headers: headers, body: body, encoding: encoding)); + {Map? headers, + CancellationToken? cancellationToken, + Object? body, + Encoding? encoding}) => + _withClient((client) => client.put(url, + headers: headers, + cancellationToken: cancellationToken, + body: body, + encoding: encoding)); /// Sends an HTTP PATCH request with the given headers and body to the given /// URL. @@ -111,9 +129,15 @@ Future put(Uri url, /// For more fine-grained control over the request, use [Request] or /// [StreamedRequest] instead. Future patch(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _withClient((client) => - client.patch(url, headers: headers, body: body, encoding: encoding)); + {Map? headers, + CancellationToken? cancellationToken, + Object? body, + Encoding? encoding}) => + _withClient((client) => client.patch(url, + headers: headers, + cancellationToken: cancellationToken, + body: body, + encoding: encoding)); /// Sends an HTTP DELETE request with the given headers to the given URL. /// @@ -123,9 +147,15 @@ Future patch(Uri url, /// /// For more fine-grained control over the request, use [Request] instead. Future delete(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _withClient((client) => - client.delete(url, headers: headers, body: body, encoding: encoding)); + {Map? headers, + CancellationToken? cancellationToken, + Object? body, + Encoding? encoding}) => + _withClient((client) => client.delete(url, + headers: headers, + cancellationToken: cancellationToken, + body: body, + encoding: encoding)); /// Sends an HTTP GET request with the given headers to the given URL and /// returns a Future that completes to the body of the response as a [String]. @@ -139,8 +169,10 @@ Future delete(Uri url, /// /// For more fine-grained control over the request and response, use [Request] /// instead. -Future read(Uri url, {Map? headers}) => - _withClient((client) => client.read(url, headers: headers)); +Future read(Uri url, + {Map? headers, CancellationToken? cancellationToken}) => + _withClient((client) => client.read(url, + headers: headers, cancellationToken: cancellationToken)); /// Sends an HTTP GET request with the given headers to the given URL and /// returns a Future that completes to the body of the response as a list of @@ -155,8 +187,10 @@ Future read(Uri url, {Map? headers}) => /// /// For more fine-grained control over the request and response, use [Request] /// instead. -Future readBytes(Uri url, {Map? headers}) => - _withClient((client) => client.readBytes(url, headers: headers)); +Future readBytes(Uri url, + {Map? headers, CancellationToken? cancellationToken}) => + _withClient((client) => client.readBytes(url, + headers: headers, cancellationToken: cancellationToken)); Future _withClient(Future Function(Client) fn) async { var client = Client(); diff --git a/lib/retry.dart b/lib/retry.dart index 4d34a99c07..3e9faed5a0 100644 --- a/lib/retry.dart +++ b/lib/retry.dart @@ -100,6 +100,9 @@ class RetryClient extends BaseClient { @override Future send(BaseRequest request) async { + assert(request.cancellationToken?.autoDispose == false, + 'Auto-dispose Tokens are not allowed for retrys'); + final splitter = StreamSplitter(request.finalize()); var i = 0; diff --git a/lib/src/base_client.dart b/lib/src/base_client.dart index efb065f70e..8d38c26383 100644 --- a/lib/src/base_client.dart +++ b/lib/src/base_client.dart @@ -7,6 +7,7 @@ import 'dart:typed_data'; import 'base_request.dart'; import 'byte_stream.dart'; +import 'cancelation_token.dart'; import 'client.dart'; import 'exception.dart'; import 'request.dart'; @@ -19,43 +20,66 @@ import 'streamed_response.dart'; /// maybe [close], and then they get various convenience methods for free. abstract class BaseClient implements Client { @override - Future head(Uri url, {Map? headers}) => - _sendUnstreamed('HEAD', url, headers); + Future head(Uri url, + {Map? headers, + CancellationToken? cancellationToken}) => + _sendUnstreamed('HEAD', url, headers, cancellationToken); @override - Future get(Uri url, {Map? headers}) => - _sendUnstreamed('GET', url, headers); + Future get(Uri url, + {Map? headers, + CancellationToken? cancellationToken}) => + _sendUnstreamed('GET', url, headers, cancellationToken); @override Future post(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _sendUnstreamed('POST', url, headers, body, encoding); + {Map? headers, + Object? body, + Encoding? encoding, + CancellationToken? cancellationToken}) => + _sendUnstreamed('POST', url, headers, cancellationToken, body, encoding); @override Future put(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _sendUnstreamed('PUT', url, headers, body, encoding); + {Map? headers, + Object? body, + Encoding? encoding, + CancellationToken? cancellationToken}) => + _sendUnstreamed('PUT', url, headers, cancellationToken, body, encoding); @override Future patch(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _sendUnstreamed('PATCH', url, headers, body, encoding); + {Map? headers, + Object? body, + Encoding? encoding, + CancellationToken? cancellationToken}) => + _sendUnstreamed('PATCH', url, headers, cancellationToken, body, encoding); @override Future delete(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _sendUnstreamed('DELETE', url, headers, body, encoding); + {Map? headers, + Object? body, + Encoding? encoding, + CancellationToken? cancellationToken}) => + _sendUnstreamed( + 'DELETE', url, headers, cancellationToken, body, encoding); @override - Future read(Uri url, {Map? headers}) async { - final response = await get(url, headers: headers); + Future read(Uri url, + {Map? headers, + CancellationToken? cancellationToken}) async { + final response = + await get(url, headers: headers, cancellationToken: cancellationToken); _checkResponseSuccess(url, response); return response.body; } @override - Future readBytes(Uri url, {Map? headers}) async { - final response = await get(url, headers: headers); + Future readBytes(Uri url, + {Map? headers, + CancellationToken? cancellationToken}) async { + final response = + await get(url, headers: headers, cancellationToken: cancellationToken); _checkResponseSuccess(url, response); return response.bodyBytes; } @@ -71,10 +95,10 @@ abstract class BaseClient implements Client { Future send(BaseRequest request); /// Sends a non-streaming [Request] and returns a non-streaming [Response]. - Future _sendUnstreamed( - String method, Uri url, Map? headers, + Future _sendUnstreamed(String method, Uri url, + Map? headers, CancellationToken? cancellationToken, [body, Encoding? encoding]) async { - var request = Request(method, url); + var request = Request(method, url, cancellationToken: cancellationToken); if (headers != null) request.headers.addAll(headers); if (encoding != null) request.encoding = encoding; diff --git a/lib/src/base_request.dart b/lib/src/base_request.dart index fd18bad332..af94219614 100644 --- a/lib/src/base_request.dart +++ b/lib/src/base_request.dart @@ -10,6 +10,7 @@ import '../http.dart' show get; import 'base_client.dart'; import 'base_response.dart'; import 'byte_stream.dart'; +import 'cancelation_token.dart'; import 'client.dart'; import 'streamed_response.dart'; import 'utils.dart'; @@ -30,6 +31,8 @@ abstract class BaseRequest { /// The URL to which the request will be sent. final Uri url; + final CancellationToken? cancellationToken; + /// The size of the request body, in bytes. /// /// This defaults to `null`, which indicates that the size of the request is @@ -96,7 +99,7 @@ abstract class BaseRequest { return method; } - BaseRequest(String method, this.url) + BaseRequest(String method, this.url, {this.cancellationToken}) : method = _validateMethod(method), headers = LinkedHashMap( equals: (key1, key2) => key1.toLowerCase() == key2.toLowerCase(), diff --git a/lib/src/browser_client.dart b/lib/src/browser_client.dart index f41d0ba15b..0ac20ef8be 100644 --- a/lib/src/browser_client.dart +++ b/lib/src/browser_client.dart @@ -9,6 +9,7 @@ import 'dart:typed_data'; import 'base_client.dart'; import 'base_request.dart'; import 'byte_stream.dart'; +import 'cancelation_token.dart'; import 'exception.dart'; import 'streamed_response.dart'; import 'utils.dart' show unawaited; @@ -30,7 +31,7 @@ class BrowserClient extends BaseClient { /// The currently active XHRs. /// /// These are aborted if the client is closed. - final _xhrs = {}; + final _xhrs = {}; /// Whether to send credentials such as cookies or authorization headers for /// cross-site requests. @@ -43,7 +44,14 @@ class BrowserClient extends BaseClient { Future send(BaseRequest request) async { var bytes = await request.finalize().toBytes(); var xhr = HttpRequest(); - _xhrs.add(xhr); + + final cancellationToken = + request.cancellationToken ?? CancellationToken(autoDispose: true); + + await cancellationToken.registerRequest(xhr); + + _xhrs.add(cancellationToken); + xhr ..open(request.method, '${request.url}', async: true) ..responseType = 'arraybuffer' @@ -75,7 +83,8 @@ class BrowserClient extends BaseClient { try { return await completer.future; } finally { - _xhrs.remove(xhr); + await cancellationToken.completeRequest(request); + _xhrs.remove(cancellationToken); } } @@ -85,7 +94,7 @@ class BrowserClient extends BaseClient { @override void close() { for (var xhr in _xhrs) { - xhr.abort(); + xhr.disposeToken(); } } } diff --git a/lib/src/cancelation_token.dart b/lib/src/cancelation_token.dart new file mode 100644 index 0000000000..d43c1a28ab --- /dev/null +++ b/lib/src/cancelation_token.dart @@ -0,0 +1,120 @@ +import 'dart:async'; + +import 'exception.dart'; + +typedef _AbortFunc = void Function(); + +class CancellationToken { + /// Will automatically dispose the token after a request has been completed. + /// + /// Set it to `true` when you use the token for a single request. + /// + /// For retries or multi-request, set this to `false` and + /// `completeToken()` has to be called manually. + final bool autoDispose; + + bool _cancellationPending = false; + + final _cancellationStreamController = StreamController.broadcast(); + final _requestSubscriptions = >{}; + + CancellationToken({required this.autoDispose}); + + /// If token is not disposed yet + bool get isDisposed => _cancellationStreamController.isClosed; + + Future _cancel() { + _cancellationPending = true; + + if (_cancellationStreamController.hasListener) { + _cancellationStreamController.add(null); + if (autoDispose) { + return disposeToken(); + } + } + + return Future.value(); + } + + /// Cancels all registered requests and eventually disposes the token + Future cancel() { + if (!isDisposed) { + return _cancel(); + } + + throw CancellationTokenException('Token already disposed', this); + } + + _AbortFunc? _getAbortOfRequest(dynamic request) { + //We have to do this completely dynamically, because no imports for + //typechecking are possible + try { + // ignore: avoid_dynamic_calls + return request.abort as _AbortFunc; + // ignore: avoid_catching_errors + } on Error catch (_) { + return null; + } + } + + /// Check if a request and it's type is supported by the token + bool isRequestSupported(dynamic request) => + _getAbortOfRequest(request) != null; + + /// Registers a request + /// + /// You only have to call this, when you are manually working with `Request`'s + /// yourself. Elsewise it is handled by `Client`. + Future registerRequest(dynamic request) { + if (!isDisposed) { + final abortFunc = _getAbortOfRequest(request); + if (abortFunc != null) { + _requestSubscriptions.putIfAbsent( + request, + () => + _cancellationStreamController.stream.listen((_) => abortFunc)); + + if (_cancellationPending) { + abortFunc.call(); + return completeRequest(request); + } + + return Future.value(); + } + + throw UnsupportedError( + '$CancellationToken does not support ${request.runtimeType}' + ' as request'); + } + + throw CancellationTokenException('Token already disposed', this); + } + + /// Marks a request as completed and eventually disposes the token + /// + /// You only have to call this, when you are manually working with `Request`'s + /// yourself. Elsewise it is handled by `Client`. + Future completeRequest(dynamic request) async { + if (!isDisposed) { + final subscription = _requestSubscriptions[request]; + if (subscription != null) { + await subscription.cancel(); + _requestSubscriptions.remove(request); + + if (autoDispose && _requestSubscriptions.isEmpty) { + return disposeToken(); + } + + return Future.value(); + } + } + } + + /// Disposes token + Future disposeToken() async { + if (!isDisposed) { + _requestSubscriptions.clear(); + return _cancellationStreamController.close(); + } + } +} diff --git a/lib/src/client.dart b/lib/src/client.dart index 12695e7123..c3d7f44034 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -7,6 +7,7 @@ import 'dart:typed_data'; import 'base_client.dart'; import 'base_request.dart'; +import 'cancelation_token.dart'; import 'client_stub.dart' if (dart.library.html) 'browser_client.dart' if (dart.library.io) 'io_client.dart'; @@ -34,12 +35,14 @@ abstract class Client { /// Sends an HTTP HEAD request with the given headers to the given URL. /// /// For more fine-grained control over the request, use [send] instead. - Future head(Uri url, {Map? headers}); + Future head(Uri url, + {Map? headers, CancellationToken? cancellationToken}); /// Sends an HTTP GET request with the given headers to the given URL. /// /// For more fine-grained control over the request, use [send] instead. - Future get(Uri url, {Map? headers}); + Future get(Uri url, + {Map? headers, CancellationToken? cancellationToken}); /// Sends an HTTP POST request with the given headers and body to the given /// URL. @@ -60,7 +63,10 @@ abstract class Client { /// /// For more fine-grained control over the request, use [send] instead. Future post(Uri url, - {Map? headers, Object? body, Encoding? encoding}); + {Map? headers, + CancellationToken? cancellationToken, + Object? body, + Encoding? encoding}); /// Sends an HTTP PUT request with the given headers and body to the given /// URL. @@ -81,7 +87,10 @@ abstract class Client { /// /// For more fine-grained control over the request, use [send] instead. Future put(Uri url, - {Map? headers, Object? body, Encoding? encoding}); + {Map? headers, + CancellationToken? cancellationToken, + Object? body, + Encoding? encoding}); /// Sends an HTTP PATCH request with the given headers and body to the given /// URL. @@ -102,13 +111,19 @@ abstract class Client { /// /// For more fine-grained control over the request, use [send] instead. Future patch(Uri url, - {Map? headers, Object? body, Encoding? encoding}); + {Map? headers, + CancellationToken? cancellationToken, + Object? body, + Encoding? encoding}); /// Sends an HTTP DELETE request with the given headers to the given URL. /// /// For more fine-grained control over the request, use [send] instead. Future delete(Uri url, - {Map? headers, Object? body, Encoding? encoding}); + {Map? headers, + CancellationToken? cancellationToken, + Object? body, + Encoding? encoding}); /// Sends an HTTP GET request with the given headers to the given URL and /// returns a Future that completes to the body of the response as a String. @@ -118,7 +133,8 @@ abstract class Client { /// /// For more fine-grained control over the request and response, use [send] or /// [get] instead. - Future read(Uri url, {Map? headers}); + Future read(Uri url, + {Map? headers, CancellationToken? cancellationToken}); /// Sends an HTTP GET request with the given headers to the given URL and /// returns a Future that completes to the body of the response as a list of @@ -129,7 +145,8 @@ abstract class Client { /// /// For more fine-grained control over the request and response, use [send] or /// [get] instead. - Future readBytes(Uri url, {Map? headers}); + Future readBytes(Uri url, + {Map? headers, CancellationToken? cancellationToken}); /// Sends an HTTP request and asynchronously returns the response. Future send(BaseRequest request); diff --git a/lib/src/exception.dart b/lib/src/exception.dart index 5ac1a6441d..0cd69473f6 100644 --- a/lib/src/exception.dart +++ b/lib/src/exception.dart @@ -2,6 +2,8 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'cancelation_token.dart'; + /// An exception caused by an error in a pkg/http client. class ClientException implements Exception { final String message; @@ -14,3 +16,16 @@ class ClientException implements Exception { @override String toString() => message; } + +/// An exception caused by an CancellationToken. +class CancellationTokenException implements Exception { + final String message; + + /// The related token + final CancellationToken? token; + + CancellationTokenException(this.message, [this.token]); + + @override + String toString() => message; +} diff --git a/lib/src/io_client.dart b/lib/src/io_client.dart index 85d3681861..da0e8061ea 100644 --- a/lib/src/io_client.dart +++ b/lib/src/io_client.dart @@ -30,17 +30,23 @@ class IOClient extends BaseClient { } var stream = request.finalize(); + HttpClientRequest? ioRequest; try { - var ioRequest = (await _inner!.openUrl(request.method, request.url)) + ioRequest = (await _inner!.openUrl(request.method, request.url)) ..followRedirects = request.followRedirects ..maxRedirects = request.maxRedirects ..contentLength = (request.contentLength ?? -1) ..persistentConnection = request.persistentConnection; request.headers.forEach((name, value) { - ioRequest.headers.set(name, value); + ioRequest!.headers.set(name, value); }); + } on HttpException catch (error) { + throw ClientException(error.message, error.uri); + } + try { + await request.cancellationToken?.registerRequest(ioRequest); var response = await stream.pipe(ioRequest) as HttpClientResponse; var headers = {}; @@ -64,6 +70,8 @@ class IOClient extends BaseClient { inner: response); } on HttpException catch (error) { throw ClientException(error.message, error.uri); + } finally { + await request.cancellationToken?.completeRequest(ioRequest); } } diff --git a/lib/src/mock_client.dart b/lib/src/mock_client.dart index abfcde20e3..08ac85c543 100644 --- a/lib/src/mock_client.dart +++ b/lib/src/mock_client.dart @@ -39,15 +39,22 @@ class MockClient extends BaseClient { ..bodyBytes = bodyBytes ..finalize(); - final response = await fn(request); - return StreamedResponse( - ByteStream.fromBytes(response.bodyBytes), response.statusCode, - contentLength: response.contentLength, - request: baseRequest, - headers: response.headers, - isRedirect: response.isRedirect, - persistentConnection: response.persistentConnection, - reasonPhrase: response.reasonPhrase); + await baseRequest.cancellationToken?.registerRequest(request); + + try { + final response = await fn(request); + + return StreamedResponse( + ByteStream.fromBytes(response.bodyBytes), response.statusCode, + contentLength: response.contentLength, + request: baseRequest, + headers: response.headers, + isRedirect: response.isRedirect, + persistentConnection: response.persistentConnection, + reasonPhrase: response.reasonPhrase); + } finally { + await baseRequest.cancellationToken?.completeRequest(request); + } }); /// Creates a [MockClient] with a handler that receives [StreamedRequest]s and diff --git a/lib/src/multipart_request.dart b/lib/src/multipart_request.dart index 9aeffa23eb..d93592a854 100644 --- a/lib/src/multipart_request.dart +++ b/lib/src/multipart_request.dart @@ -8,6 +8,7 @@ import 'dart:math'; import 'base_request.dart'; import 'boundary_characters.dart'; import 'byte_stream.dart'; +import 'cancelation_token.dart'; import 'multipart_file.dart'; import 'utils.dart'; @@ -45,7 +46,9 @@ class MultipartRequest extends BaseRequest { /// The list of files to upload for this request. final files = []; - MultipartRequest(String method, Uri url) : super(method, url); + MultipartRequest(String method, Uri url, + {CancellationToken? cancellationToken}) + : super(method, url, cancellationToken: cancellationToken); /// The total length of the request body, in bytes. /// diff --git a/lib/src/request.dart b/lib/src/request.dart index bbb8448abe..fcd6986006 100644 --- a/lib/src/request.dart +++ b/lib/src/request.dart @@ -9,6 +9,7 @@ import 'package:http_parser/http_parser.dart'; import 'base_request.dart'; import 'byte_stream.dart'; +import 'cancelation_token.dart'; import 'utils.dart'; /// An HTTP request where the entire request body is known in advance. @@ -137,10 +138,10 @@ class Request extends BaseRequest { body = mapToQuery(fields, encoding: encoding); } - Request(String method, Uri url) + Request(String method, Uri url, {CancellationToken? cancellationToken}) : _defaultEncoding = utf8, _bodyBytes = Uint8List(0), - super(method, url); + super(method, url, cancellationToken: cancellationToken); /// Freezes all mutable fields and returns a single-subscription [ByteStream] /// containing the request body. diff --git a/lib/src/streamed_request.dart b/lib/src/streamed_request.dart index 7e94ef1386..be78f6577a 100644 --- a/lib/src/streamed_request.dart +++ b/lib/src/streamed_request.dart @@ -7,6 +7,7 @@ import 'dart:async'; import 'base_client.dart'; import 'base_request.dart'; import 'byte_stream.dart'; +import 'cancelation_token.dart'; /// An HTTP request where the request body is sent asynchronously after the /// connection has been established and the headers have been sent. @@ -29,9 +30,10 @@ class StreamedRequest extends BaseRequest { final StreamController> _controller; /// Creates a new streaming request. - StreamedRequest(String method, Uri url) + StreamedRequest(String method, Uri url, + {CancellationToken? cancellationToken}) : _controller = StreamController>(sync: true), - super(method, url); + super(method, url, cancellationToken: cancellationToken); /// Freezes all mutable fields and returns a single-subscription [ByteStream] /// that emits the data being written to [sink]. diff --git a/test/http_retry_test.dart b/test/http_retry_test.dart index faf35b3f48..251036fbca 100644 --- a/test/http_retry_test.dart +++ b/test/http_retry_test.dart @@ -5,6 +5,7 @@ import 'package:fake_async/fake_async.dart'; import 'package:http/http.dart'; import 'package:http/retry.dart'; +import 'package:http/src/cancelation_token.dart'; import 'package:http/testing.dart'; import 'package:test/test.dart'; @@ -13,7 +14,8 @@ void main() { test('a request has a non-503 error code', () async { final client = RetryClient( MockClient(expectAsync1((_) async => Response('', 502), count: 1))); - final response = await client.get(Uri.http('example.org', '')); + final response = await client.get(Uri.http('example.org', ''), + cancellationToken: CancellationToken(autoDispose: false)); expect(response.statusCode, equals(502)); }); @@ -21,7 +23,8 @@ void main() { final client = RetryClient( MockClient(expectAsync1((_) async => Response('', 503), count: 1)), when: (_) => false); - final response = await client.get(Uri.http('example.org', '')); + final response = await client.get(Uri.http('example.org', ''), + cancellationToken: CancellationToken(autoDispose: false)); expect(response.statusCode, equals(503)); }); @@ -29,7 +32,8 @@ void main() { final client = RetryClient( MockClient(expectAsync1((_) async => Response('', 503), count: 1)), retries: 0); - final response = await client.get(Uri.http('example.org', '')); + final response = await client.get(Uri.http('example.org', ''), + cancellationToken: CancellationToken(autoDispose: false)); expect(response.statusCode, equals(503)); }); }); @@ -43,7 +47,8 @@ void main() { }, count: 2)), delay: (_) => Duration.zero); - final response = await client.get(Uri.http('example.org', '')); + final response = await client.get(Uri.http('example.org', ''), + cancellationToken: CancellationToken(autoDispose: false)); expect(response.statusCode, equals(200)); }); @@ -58,7 +63,8 @@ void main() { when: (response) => response.headers['retry'] == 'true', delay: (_) => Duration.zero); - final response = await client.get(Uri.http('example.org', '')); + final response = await client.get(Uri.http('example.org', ''), + cancellationToken: CancellationToken(autoDispose: false)); expect(response.headers, containsPair('retry', 'false')); expect(response.statusCode, equals(503)); }); @@ -75,7 +81,8 @@ void main() { error is StateError && error.message == 'oh no', delay: (_) => Duration.zero); - final response = await client.get(Uri.http('example.org', '')); + final response = await client.get(Uri.http('example.org', ''), + cancellationToken: CancellationToken(autoDispose: false)); expect(response.statusCode, equals(200)); }); @@ -85,7 +92,9 @@ void main() { whenError: (error, _) => error == 'oh yeah', delay: (_) => Duration.zero); - expect(client.get(Uri.http('example.org', '')), + expect( + client.get(Uri.http('example.org', ''), + cancellationToken: CancellationToken(autoDispose: false)), throwsA(isStateError.having((e) => e.message, 'message', 'oh no'))); }); @@ -93,7 +102,8 @@ void main() { final client = RetryClient( MockClient(expectAsync1((_) async => Response('', 503), count: 4)), delay: (_) => Duration.zero); - final response = await client.get(Uri.http('example.org', '')); + final response = await client.get(Uri.http('example.org', ''), + cancellationToken: CancellationToken(autoDispose: false)); expect(response.statusCode, equals(503)); }); @@ -102,7 +112,8 @@ void main() { MockClient(expectAsync1((_) async => Response('', 503), count: 13)), retries: 12, delay: (_) => Duration.zero); - final response = await client.get(Uri.http('example.org', '')); + final response = await client.get(Uri.http('example.org', ''), + cancellationToken: CancellationToken(autoDispose: false)); expect(response.statusCode, equals(503)); }); @@ -124,7 +135,10 @@ void main() { return Response('', 503); }, count: 4))); - expect(client.get(Uri.http('example.org', '')), completes); + expect( + client.get(Uri.http('example.org', ''), + cancellationToken: CancellationToken(autoDispose: false)), + completes); fake.elapse(const Duration(minutes: 10)); }); }); @@ -149,7 +163,10 @@ void main() { }, count: 4)), delay: (requestCount) => Duration(seconds: requestCount)); - expect(client.get(Uri.http('example.org', '')), completes); + expect( + client.get(Uri.http('example.org', ''), + cancellationToken: CancellationToken(autoDispose: false)), + completes); fake.elapse(const Duration(minutes: 10)); }); }); @@ -178,7 +195,10 @@ void main() { Duration(seconds: 12) ]); - expect(client.get(Uri.http('example.org', '')), completes); + expect( + client.get(Uri.http('example.org', ''), + cancellationToken: CancellationToken(autoDispose: false)), + completes); fake.elapse(const Duration(minutes: 10)); }); }); @@ -195,7 +215,8 @@ void main() { expect(retryCount, equals(count)); count++; }, count: 2)); - final response = await client.get(Uri.http('example.org', '')); + final response = await client.get(Uri.http('example.org', ''), + cancellationToken: CancellationToken(autoDispose: false)); expect(response.statusCode, equals(503)); }); @@ -214,7 +235,8 @@ void main() { }, count: 2)), [Duration.zero]); - final request = Request('POST', Uri.parse('http://example.org')) + final request = Request('POST', Uri.parse('http://example.org'), + cancellationToken: CancellationToken(autoDispose: false)) ..body = 'hello' ..followRedirects = false ..headers['foo'] = 'bar' From 1649f2ea930e4bab25a666e60a000ecbe37c8042 Mon Sep 17 00:00:00 2001 From: FaFre Date: Mon, 2 Aug 2021 08:42:04 +0700 Subject: [PATCH 2/3] finished initial implementation of cancellation --- lib/src/browser_client.dart | 35 +++++++++++++------- lib/src/cancelation_token.dart | 59 +++++++++++++++------------------- lib/src/io_client.dart | 7 +++- lib/src/mock_client.dart | 10 ++++-- 4 files changed, 62 insertions(+), 49 deletions(-) diff --git a/lib/src/browser_client.dart b/lib/src/browser_client.dart index 0ac20ef8be..b7f1a73feb 100644 --- a/lib/src/browser_client.dart +++ b/lib/src/browser_client.dart @@ -42,14 +42,16 @@ class BrowserClient extends BaseClient { /// Sends an HTTP request and asynchronously returns the response. @override Future send(BaseRequest request) async { + if (request.cancellationToken?.isCancellationPending == true) { + throw ClientException('Request has been aborted', request.url); + } + var bytes = await request.finalize().toBytes(); var xhr = HttpRequest(); final cancellationToken = request.cancellationToken ?? CancellationToken(autoDispose: true); - await cancellationToken.registerRequest(xhr); - _xhrs.add(cancellationToken); xhr @@ -60,22 +62,31 @@ class BrowserClient extends BaseClient { var completer = Completer(); + unawaited(xhr.onReadyStateChange + .firstWhere((_) => xhr.readyState >= HttpRequest.LOADING) + .then((value) => cancellationToken.registerRequest(xhr.abort, + completer: completer, debugRequest: xhr, baseRequest: request))); + unawaited(xhr.onLoad.first.then((_) { - var body = (xhr.response as ByteBuffer).asUint8List(); - completer.complete(StreamedResponse( - ByteStream.fromBytes(body), xhr.status!, - contentLength: body.length, - request: request, - headers: xhr.responseHeaders, - reasonPhrase: xhr.statusText)); + if (!completer.isCompleted) { + var body = (xhr.response as ByteBuffer).asUint8List(); + completer.complete(StreamedResponse( + ByteStream.fromBytes(body), xhr.status!, + contentLength: body.length, + request: request, + headers: xhr.responseHeaders, + reasonPhrase: xhr.statusText)); + } })); unawaited(xhr.onError.first.then((_) { // Unfortunately, the underlying XMLHttpRequest API doesn't expose any // specific information about the error itself. - completer.completeError( - ClientException('XMLHttpRequest error.', request.url), - StackTrace.current); + if (!completer.isCompleted) { + completer.completeError( + ClientException('XMLHttpRequest error.', request.url), + StackTrace.current); + } })); xhr.send(bytes); diff --git a/lib/src/cancelation_token.dart b/lib/src/cancelation_token.dart index d43c1a28ab..f33be4b264 100644 --- a/lib/src/cancelation_token.dart +++ b/lib/src/cancelation_token.dart @@ -1,8 +1,9 @@ import 'dart:async'; +import 'base_request.dart'; import 'exception.dart'; -typedef _AbortFunc = void Function(); +typedef AbortFunc = void Function(); class CancellationToken { /// Will automatically dispose the token after a request has been completed. @@ -23,6 +24,9 @@ class CancellationToken { /// If token is not disposed yet bool get isDisposed => _cancellationStreamController.isClosed; + /// If cancellation has been requested for token + bool get isCancellationPending => _cancellationPending; + Future _cancel() { _cancellationPending = true; @@ -45,46 +49,35 @@ class CancellationToken { throw CancellationTokenException('Token already disposed', this); } - _AbortFunc? _getAbortOfRequest(dynamic request) { - //We have to do this completely dynamically, because no imports for - //typechecking are possible - try { - // ignore: avoid_dynamic_calls - return request.abort as _AbortFunc; - // ignore: avoid_catching_errors - } on Error catch (_) { - return null; - } - } - - /// Check if a request and it's type is supported by the token - bool isRequestSupported(dynamic request) => - _getAbortOfRequest(request) != null; - /// Registers a request /// /// You only have to call this, when you are manually working with `Request`'s /// yourself. Elsewise it is handled by `Client`. - Future registerRequest(dynamic request) { + Future registerRequest(AbortFunc requestAbort, + {dynamic debugRequest, BaseRequest? baseRequest, Completer? completer}) { + Future cancel() { + //TODO: Debugging web abort() + //print('Abort for ${baseRequest?.url} at ${debugRequest.readyState}'); + requestAbort.call(); + + if (completer?.isCompleted == false) { + completer!.completeError( + ClientException('Request has been aborted', baseRequest?.url), + StackTrace.current); + } + + return completeRequest(requestAbort); + } + if (!isDisposed) { - final abortFunc = _getAbortOfRequest(request); - if (abortFunc != null) { - _requestSubscriptions.putIfAbsent( - request, - () => - _cancellationStreamController.stream.listen((_) => abortFunc)); - - if (_cancellationPending) { - abortFunc.call(); - return completeRequest(request); - } + _requestSubscriptions.putIfAbsent(requestAbort, + () => _cancellationStreamController.stream.listen((_) => cancel())); - return Future.value(); + if (_cancellationPending) { + return cancel(); } - throw UnsupportedError( - '$CancellationToken does not support ${request.runtimeType}' - ' as request'); + return Future.value(); } throw CancellationTokenException('Token already disposed', this); diff --git a/lib/src/io_client.dart b/lib/src/io_client.dart index da0e8061ea..04d11a86c0 100644 --- a/lib/src/io_client.dart +++ b/lib/src/io_client.dart @@ -29,6 +29,10 @@ class IOClient extends BaseClient { 'HTTP request failed. Client is already closed.', request.url); } + if (request.cancellationToken?.isCancellationPending == true) { + throw ClientException('Request has been aborted', request.url); + } + var stream = request.finalize(); HttpClientRequest? ioRequest; @@ -46,7 +50,8 @@ class IOClient extends BaseClient { } try { - await request.cancellationToken?.registerRequest(ioRequest); + await request.cancellationToken + ?.registerRequest(ioRequest.abort, baseRequest: request); var response = await stream.pipe(ioRequest) as HttpClientResponse; var headers = {}; diff --git a/lib/src/mock_client.dart b/lib/src/mock_client.dart index 08ac85c543..850d137bfa 100644 --- a/lib/src/mock_client.dart +++ b/lib/src/mock_client.dart @@ -5,6 +5,7 @@ import 'base_client.dart'; import 'base_request.dart'; import 'byte_stream.dart'; +import 'exception.dart'; import 'request.dart'; import 'response.dart'; import 'streamed_request.dart'; @@ -30,8 +31,13 @@ class MockClient extends BaseClient { /// [Response]s. MockClient(MockClientHandler fn) : this._((baseRequest, bodyStream) async { + if (baseRequest.cancellationToken?.isCancellationPending == true) { + throw ClientException('Request has been aborted', baseRequest.url); + } + final bodyBytes = await bodyStream.toBytes(); - var request = Request(baseRequest.method, baseRequest.url) + var request = Request(baseRequest.method, baseRequest.url, + cancellationToken: baseRequest.cancellationToken) ..persistentConnection = baseRequest.persistentConnection ..followRedirects = baseRequest.followRedirects ..maxRedirects = baseRequest.maxRedirects @@ -39,8 +45,6 @@ class MockClient extends BaseClient { ..bodyBytes = bodyBytes ..finalize(); - await baseRequest.cancellationToken?.registerRequest(request); - try { final response = await fn(request); From 020838f96f51ebcd12e2a300627934fc018c8cd8 Mon Sep 17 00:00:00 2001 From: FaFre Date: Tue, 7 Dec 2021 00:57:36 +0100 Subject: [PATCH 3/3] register request at earliest possible readyState --- lib/src/browser_client.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/browser_client.dart b/lib/src/browser_client.dart index 0d6738dfe7..019fef7826 100644 --- a/lib/src/browser_client.dart +++ b/lib/src/browser_client.dart @@ -62,7 +62,7 @@ class BrowserClient extends BaseClient { var completer = Completer(); unawaited(xhr.onReadyStateChange - .firstWhere((_) => xhr.readyState >= HttpRequest.LOADING) + .firstWhere((_) => xhr.readyState >= HttpRequest.HEADERS_RECEIVED) .then((value) => cancellationToken.registerRequest(xhr.abort, completer: completer, debugRequest: xhr, baseRequest: request)));