Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,17 @@ public void tearDown() throws Exception {
serviceHelper.stop();
}

private StreamWriter.Builder getTestStreamWriterBuilder() {
return StreamWriter.newBuilder(TEST_STREAM)
private StreamWriter.Builder getTestStreamWriterBuilder(String testStream) {
return StreamWriter.newBuilder(testStream)
.setChannelProvider(channelProvider)
.setExecutorProvider(SINGLE_THREAD_EXECUTOR)
.setCredentialsProvider(NoCredentialsProvider.create());
}

private StreamWriter.Builder getTestStreamWriterBuilder() {
return getTestStreamWriterBuilder(TEST_STREAM);
}

private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
Expand Down Expand Up @@ -821,9 +825,10 @@ public void testBuilderInvalidArguments() {

@Test
public void testAwaitTermination() throws Exception {
StreamWriter writer = getTestStreamWriterBuilder().build();
StreamWriter writer =
getTestStreamWriterBuilder("projects/p/datasets/d/tables/t/streams/await").build();
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"AWAIT"});
writer.shutdown();
assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES));
}
Expand Down