Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 54 additions & 20 deletions lib/http.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import 'dart:convert';
import 'dart:typed_data';

import 'src/cancelation_token.dart';
import 'src/client.dart';
import 'src/exception.dart';
import 'src/request.dart';
Expand All @@ -16,6 +17,7 @@ export 'src/base_client.dart';
export 'src/base_request.dart';
export 'src/base_response.dart';
export 'src/byte_stream.dart';
export 'src/cancelation_token.dart';
export 'src/client.dart';
export 'src/exception.dart';
export 'src/multipart_file.dart';
Expand All @@ -32,8 +34,10 @@ export 'src/streamed_response.dart';
/// the same server, you should use a single [Client] for all of those requests.
///
/// For more fine-grained control over the request, use [Request] instead.
Future<Response> head(Uri url, {Map<String, String>? headers}) =>
_withClient((client) => client.head(url, headers: headers));
Future<Response> head(Uri url,
{Map<String, String>? headers, CancellationToken? cancellationToken}) =>
_withClient((client) => client.head(url,
headers: headers, cancellationToken: cancellationToken));

/// Sends an HTTP GET request with the given headers to the given URL.
///
Expand All @@ -42,8 +46,10 @@ Future<Response> head(Uri url, {Map<String, String>? headers}) =>
/// the same server, you should use a single [Client] for all of those requests.
///
/// For more fine-grained control over the request, use [Request] instead.
Future<Response> get(Uri url, {Map<String, String>? headers}) =>
_withClient((client) => client.get(url, headers: headers));
Future<Response> get(Uri url,
{Map<String, String>? headers, CancellationToken? cancellationToken}) =>
_withClient((client) => client.get(url,
headers: headers, cancellationToken: cancellationToken));

/// Sends an HTTP POST request with the given headers and body to the given URL.
///
Expand All @@ -64,9 +70,15 @@ Future<Response> get(Uri url, {Map<String, String>? headers}) =>
/// For more fine-grained control over the request, use [Request] or
/// [StreamedRequest] instead.
Future<Response> post(Uri url,
{Map<String, String>? headers, Object? body, Encoding? encoding}) =>
_withClient((client) =>
client.post(url, headers: headers, body: body, encoding: encoding));
{Map<String, String>? headers,
CancellationToken? cancellationToken,
Object? body,
Encoding? encoding}) =>
_withClient((client) => client.post(url,
headers: headers,
cancellationToken: cancellationToken,
body: body,
encoding: encoding));

/// Sends an HTTP PUT request with the given headers and body to the given URL.
///
Expand All @@ -87,9 +99,15 @@ Future<Response> post(Uri url,
/// For more fine-grained control over the request, use [Request] or
/// [StreamedRequest] instead.
Future<Response> put(Uri url,
{Map<String, String>? headers, Object? body, Encoding? encoding}) =>
_withClient((client) =>
client.put(url, headers: headers, body: body, encoding: encoding));
{Map<String, String>? headers,
CancellationToken? cancellationToken,
Object? body,
Encoding? encoding}) =>
_withClient((client) => client.put(url,
headers: headers,
cancellationToken: cancellationToken,
body: body,
encoding: encoding));

/// Sends an HTTP PATCH request with the given headers and body to the given
/// URL.
Expand All @@ -111,9 +129,15 @@ Future<Response> put(Uri url,
/// For more fine-grained control over the request, use [Request] or
/// [StreamedRequest] instead.
Future<Response> patch(Uri url,
{Map<String, String>? headers, Object? body, Encoding? encoding}) =>
_withClient((client) =>
client.patch(url, headers: headers, body: body, encoding: encoding));
{Map<String, String>? headers,
CancellationToken? cancellationToken,
Object? body,
Encoding? encoding}) =>
_withClient((client) => client.patch(url,
headers: headers,
cancellationToken: cancellationToken,
body: body,
encoding: encoding));

/// Sends an HTTP DELETE request with the given headers to the given URL.
///
Expand All @@ -123,9 +147,15 @@ Future<Response> patch(Uri url,
///
/// For more fine-grained control over the request, use [Request] instead.
Future<Response> delete(Uri url,
{Map<String, String>? headers, Object? body, Encoding? encoding}) =>
_withClient((client) =>
client.delete(url, headers: headers, body: body, encoding: encoding));
{Map<String, String>? headers,
CancellationToken? cancellationToken,
Object? body,
Encoding? encoding}) =>
_withClient((client) => client.delete(url,
headers: headers,
cancellationToken: cancellationToken,
body: body,
encoding: encoding));

/// Sends an HTTP GET request with the given headers to the given URL and
/// returns a Future that completes to the body of the response as a [String].
Expand All @@ -139,8 +169,10 @@ Future<Response> delete(Uri url,
///
/// For more fine-grained control over the request and response, use [Request]
/// instead.
Future<String> read(Uri url, {Map<String, String>? headers}) =>
_withClient((client) => client.read(url, headers: headers));
Future<String> read(Uri url,
{Map<String, String>? headers, CancellationToken? cancellationToken}) =>
_withClient((client) => client.read(url,
headers: headers, cancellationToken: cancellationToken));

/// Sends an HTTP GET request with the given headers to the given URL and
/// returns a Future that completes to the body of the response as a list of
Expand All @@ -155,8 +187,10 @@ Future<String> read(Uri url, {Map<String, String>? headers}) =>
///
/// For more fine-grained control over the request and response, use [Request]
/// instead.
Future<Uint8List> readBytes(Uri url, {Map<String, String>? headers}) =>
_withClient((client) => client.readBytes(url, headers: headers));
Future<Uint8List> readBytes(Uri url,
{Map<String, String>? headers, CancellationToken? cancellationToken}) =>
_withClient((client) => client.readBytes(url,
headers: headers, cancellationToken: cancellationToken));

Future<T> _withClient<T>(Future<T> Function(Client) fn) async {
var client = Client();
Expand Down
3 changes: 3 additions & 0 deletions lib/retry.dart
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class RetryClient extends BaseClient {

@override
Future<StreamedResponse> send(BaseRequest request) async {
assert(request.cancellationToken?.autoDispose == false,
'Auto-dispose Tokens are not allowed for retrys');

final splitter = StreamSplitter(request.finalize());

var i = 0;
Expand Down
62 changes: 43 additions & 19 deletions lib/src/base_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'dart:typed_data';

import 'base_request.dart';
import 'byte_stream.dart';
import 'cancelation_token.dart';
import 'client.dart';
import 'exception.dart';
import 'request.dart';
Expand All @@ -19,43 +20,66 @@ import 'streamed_response.dart';
/// maybe [close], and then they get various convenience methods for free.
abstract class BaseClient implements Client {
@override
Future<Response> head(Uri url, {Map<String, String>? headers}) =>
_sendUnstreamed('HEAD', url, headers);
Future<Response> head(Uri url,
{Map<String, String>? headers,
CancellationToken? cancellationToken}) =>
_sendUnstreamed('HEAD', url, headers, cancellationToken);

@override
Future<Response> get(Uri url, {Map<String, String>? headers}) =>
_sendUnstreamed('GET', url, headers);
Future<Response> get(Uri url,
{Map<String, String>? headers,
CancellationToken? cancellationToken}) =>
_sendUnstreamed('GET', url, headers, cancellationToken);

@override
Future<Response> post(Uri url,
{Map<String, String>? headers, Object? body, Encoding? encoding}) =>
_sendUnstreamed('POST', url, headers, body, encoding);
{Map<String, String>? headers,
Object? body,
Encoding? encoding,
CancellationToken? cancellationToken}) =>
_sendUnstreamed('POST', url, headers, cancellationToken, body, encoding);

@override
Future<Response> put(Uri url,
{Map<String, String>? headers, Object? body, Encoding? encoding}) =>
_sendUnstreamed('PUT', url, headers, body, encoding);
{Map<String, String>? headers,
Object? body,
Encoding? encoding,
CancellationToken? cancellationToken}) =>
_sendUnstreamed('PUT', url, headers, cancellationToken, body, encoding);

@override
Future<Response> patch(Uri url,
{Map<String, String>? headers, Object? body, Encoding? encoding}) =>
_sendUnstreamed('PATCH', url, headers, body, encoding);
{Map<String, String>? headers,
Object? body,
Encoding? encoding,
CancellationToken? cancellationToken}) =>
_sendUnstreamed('PATCH', url, headers, cancellationToken, body, encoding);

@override
Future<Response> delete(Uri url,
{Map<String, String>? headers, Object? body, Encoding? encoding}) =>
_sendUnstreamed('DELETE', url, headers, body, encoding);
{Map<String, String>? headers,
Object? body,
Encoding? encoding,
CancellationToken? cancellationToken}) =>
_sendUnstreamed(
'DELETE', url, headers, cancellationToken, body, encoding);

@override
Future<String> read(Uri url, {Map<String, String>? headers}) async {
final response = await get(url, headers: headers);
Future<String> read(Uri url,
{Map<String, String>? headers,
CancellationToken? cancellationToken}) async {
final response =
await get(url, headers: headers, cancellationToken: cancellationToken);
_checkResponseSuccess(url, response);
return response.body;
}

@override
Future<Uint8List> readBytes(Uri url, {Map<String, String>? headers}) async {
final response = await get(url, headers: headers);
Future<Uint8List> readBytes(Uri url,
{Map<String, String>? headers,
CancellationToken? cancellationToken}) async {
final response =
await get(url, headers: headers, cancellationToken: cancellationToken);
_checkResponseSuccess(url, response);
return response.bodyBytes;
}
Expand All @@ -71,10 +95,10 @@ abstract class BaseClient implements Client {
Future<StreamedResponse> send(BaseRequest request);

/// Sends a non-streaming [Request] and returns a non-streaming [Response].
Future<Response> _sendUnstreamed(
String method, Uri url, Map<String, String>? headers,
Future<Response> _sendUnstreamed(String method, Uri url,
Map<String, String>? headers, CancellationToken? cancellationToken,
[body, Encoding? encoding]) async {
var request = Request(method, url);
var request = Request(method, url, cancellationToken: cancellationToken);

if (headers != null) request.headers.addAll(headers);
if (encoding != null) request.encoding = encoding;
Expand Down
5 changes: 4 additions & 1 deletion lib/src/base_request.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import '../http.dart' show get;
import 'base_client.dart';
import 'base_response.dart';
import 'byte_stream.dart';
import 'cancelation_token.dart';
import 'client.dart';
import 'streamed_response.dart';
import 'utils.dart';
Expand All @@ -30,6 +31,8 @@ abstract class BaseRequest {
/// The URL to which the request will be sent.
final Uri url;

final CancellationToken? cancellationToken;

/// The size of the request body, in bytes.
///
/// This defaults to `null`, which indicates that the size of the request is
Expand Down Expand Up @@ -96,7 +99,7 @@ abstract class BaseRequest {
return method;
}

BaseRequest(String method, this.url)
BaseRequest(String method, this.url, {this.cancellationToken})
: method = _validateMethod(method),
headers = LinkedHashMap(
equals: (key1, key2) => key1.toLowerCase() == key2.toLowerCase(),
Expand Down
48 changes: 34 additions & 14 deletions lib/src/browser_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'dart:typed_data';
import 'base_client.dart';
import 'base_request.dart';
import 'byte_stream.dart';
import 'cancelation_token.dart';
import 'exception.dart';
import 'streamed_response.dart';

Expand All @@ -29,7 +30,7 @@ class BrowserClient extends BaseClient {
/// The currently active XHRs.
///
/// These are aborted if the client is closed.
final _xhrs = <HttpRequest>{};
final _xhrs = <CancellationToken>{};

/// Whether to send credentials such as cookies or authorization headers for
/// cross-site requests.
Expand All @@ -40,9 +41,18 @@ class BrowserClient extends BaseClient {
/// Sends an HTTP request and asynchronously returns the response.
@override
Future<StreamedResponse> send(BaseRequest request) async {
if (request.cancellationToken?.isCancellationPending == true) {
throw ClientException('Request has been aborted', request.url);
}

var bytes = await request.finalize().toBytes();
var xhr = HttpRequest();
_xhrs.add(xhr);

final cancellationToken =
request.cancellationToken ?? CancellationToken(autoDispose: true);

_xhrs.add(cancellationToken);

xhr
..open(request.method, '${request.url}', async: true)
..responseType = 'arraybuffer'
Expand All @@ -51,30 +61,40 @@ class BrowserClient extends BaseClient {

var completer = Completer<StreamedResponse>();

unawaited(xhr.onReadyStateChange
.firstWhere((_) => xhr.readyState >= HttpRequest.HEADERS_RECEIVED)
.then((value) => cancellationToken.registerRequest(xhr.abort,
completer: completer, debugRequest: xhr, baseRequest: request)));

unawaited(xhr.onLoad.first.then((_) {
var body = (xhr.response as ByteBuffer).asUint8List();
completer.complete(StreamedResponse(
ByteStream.fromBytes(body), xhr.status!,
contentLength: body.length,
request: request,
headers: xhr.responseHeaders,
reasonPhrase: xhr.statusText));
if (!completer.isCompleted) {
var body = (xhr.response as ByteBuffer).asUint8List();
completer.complete(StreamedResponse(
ByteStream.fromBytes(body), xhr.status!,
contentLength: body.length,
request: request,
headers: xhr.responseHeaders,
reasonPhrase: xhr.statusText));
}
}));

unawaited(xhr.onError.first.then((_) {
// Unfortunately, the underlying XMLHttpRequest API doesn't expose any
// specific information about the error itself.
completer.completeError(
ClientException('XMLHttpRequest error.', request.url),
StackTrace.current);
if (!completer.isCompleted) {
completer.completeError(
ClientException('XMLHttpRequest error.', request.url),
StackTrace.current);
}
}));

xhr.send(bytes);

try {
return await completer.future;
} finally {
_xhrs.remove(xhr);
await cancellationToken.completeRequest(request);
_xhrs.remove(cancellationToken);
}
}

Expand All @@ -84,7 +104,7 @@ class BrowserClient extends BaseClient {
@override
void close() {
for (var xhr in _xhrs) {
xhr.abort();
xhr.disposeToken();
}
}
}
Loading