Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit a7e516d

Browse files
authored
Merge pull request #427 from peihe/disable-unbounded-read
Add flag to control whether to enable streaming bounded read
2 parents cd74b6e + 261fdf9 commit a7e516d

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,15 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) {
361361
builder.put(View.AsList.class, StreamingViewAsList.class);
362362
builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
363363
builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
364-
builder.put(Read.Bounded.class, StreamingBoundedRead.class);
364+
if (options.getExperiments() == null
365+
|| !options.getExperiments().contains("enable_streaming_bounded_read")) {
366+
builder.put(Read.Bounded.class, UnsupportedIO.class);
367+
builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class);
368+
builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class);
369+
builder.put(TextIO.Read.Bound.class, UnsupportedIO.class);
370+
} else {
371+
builder.put(Read.Bounded.class, StreamingBoundedRead.class);
372+
}
365373
builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
366374
builder.put(Window.Bound.class, AssignWindows.class);
367375
// In streaming mode must use either the custom Pubsub unbounded source/sink or

sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import com.google.cloud.dataflow.sdk.coders.Coder;
4747
import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
4848
import com.google.cloud.dataflow.sdk.io.AvroIO;
49+
import com.google.cloud.dataflow.sdk.io.AvroSource;
50+
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
4951
import com.google.cloud.dataflow.sdk.io.Read;
5052
import com.google.cloud.dataflow.sdk.io.TextIO;
5153
import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
@@ -928,6 +930,33 @@ private void testUnsupportedSource(PTransform<PInput, ?> source, String name, bo
928930
p.run();
929931
}
930932

933+
@Test
934+
public void testBoundedSourceUnsupportedInStreaming() throws Exception {
935+
testUnsupportedSource(
936+
AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true);
937+
}
938+
939+
@Test
940+
public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception {
941+
testUnsupportedSource(
942+
BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true);
943+
}
944+
945+
@Test
946+
public void testAvroIOSourceUnsupportedInStreaming() throws Exception {
947+
testUnsupportedSource(AvroIO.Read.from("foo"), "AvroIO.Read", true);
948+
}
949+
950+
@Test
951+
public void testTextIOSourceUnsupportedInStreaming() throws Exception {
952+
testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true);
953+
}
954+
955+
@Test
956+
public void testReadBoundedSourceUnsupportedInStreaming() throws Exception {
957+
testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true);
958+
}
959+
931960
@Test
932961
public void testReadUnboundedUnsupportedInBatch() throws Exception {
933962
testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);

0 commit comments

Comments
 (0)