Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions hudi-client/hudi-flink-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>${flink.streaming.java.artifactId}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<artifactId>${flink.clients.artifactId}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<artifactId>${flink.hadoop.compatibility.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand All @@ -70,7 +70,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<artifactId>${flink.parquet.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
Expand Down Expand Up @@ -164,7 +164,7 @@
<!-- Flink - Tests -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>${flink.test.utils.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -177,7 +177,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>${flink.streaming.java.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@

import java.io.IOException;
import java.util.Iterator;

import scala.collection.immutable.List;
import java.util.List;

public class FlinkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHelper<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> {
Expand Down
22 changes: 8 additions & 14 deletions hudi-examples/hudi-examples-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>${flink.streaming.java.artifactId}</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<artifactId>${flink.clients.artifactId}</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
Expand All @@ -138,7 +138,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<artifactId>${flink.connector.kafka.artifactId}</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
Expand All @@ -148,12 +148,12 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<artifactId>${flink.hadoop.compatibility.artifactId}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<artifactId>${flink.parquet.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -183,7 +183,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<artifactId>${flink.statebackend.rocksdb.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -304,17 +304,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.hudi</groupId>-->
<!-- <artifactId>hudi-flink_${scala.binary.version}</artifactId>-->
<!-- <version>${project.version}</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->

<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>${flink.test.utils.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -327,7 +321,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>${flink.streaming.java.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.examples.quickstart.source;

import org.apache.hudi.adapter.DataStreamScanProviderAdapter;

import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
Expand All @@ -28,7 +30,6 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -74,7 +75,7 @@ public ContinuousFileSource(

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
return new DataStreamScanProvider() {
return new DataStreamScanProviderAdapter() {

@Override
public boolean isBounded() {
Expand Down
22 changes: 11 additions & 11 deletions hudi-flink-datasource/hudi-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,13 @@
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>${flink.streaming.java.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<artifactId>${flink.clients.artifactId}</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
Expand All @@ -144,7 +145,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<artifactId>${flink.connector.kafka.artifactId}</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
Expand All @@ -154,12 +155,12 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<artifactId>${flink.hadoop.compatibility.artifactId}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<artifactId>${flink.parquet.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -189,7 +190,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<artifactId>${flink.statebackend.rocksdb.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -348,7 +349,7 @@
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>${flink.test.utils.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -361,7 +362,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>${flink.streaming.java.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
Expand All @@ -375,14 +376,13 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void close() {
* End input action for batch source.
*/
public void endInput() {
super.endInput();
flushRemaining(true);
this.writeClient.cleanHandles();
this.writeStatuses.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void processElement(I value, Context ctx, Collector<Object> out) throws E
* End input action for batch source.
*/
public void endInput() {
super.endInput();
flushData(true);
this.writeStatuses.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ public abstract class AbstractStreamWriteFunction<I>
*/
private transient CkpMetadata ckpMetadata;

/**
* Since flink 1.15, the streaming job with bounded source triggers one checkpoint
* after calling #endInput, use this flag to avoid unnecessary data flush.
*/
private transient boolean inputEnded;

/**
* Constructs a StreamWriteFunctionBase.
*
Expand Down Expand Up @@ -154,13 +160,21 @@ public void initializeState(FunctionInitializationContext context) throws Except

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
if (inputEnded) {
return;
}
snapshotState();
// Reload the snapshot state as the current state.
reloadWriteMetaState();
}

public abstract void snapshotState();

@Override
public void endInput() {
this.inputEnded = true;
}

// -------------------------------------------------------------------------
// Getter/Setter
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void startInstant(String instant) {
try {
fs.createNewFile(path);
} catch (IOException e) {
throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant);
throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant, e);
}
// cleaning
clean(instant);
Expand Down Expand Up @@ -142,7 +142,7 @@ public void commitInstant(String instant) {
try {
fs.createNewFile(path);
} catch (IOException e) {
throw new HoodieException("Exception while adding checkpoint commit metadata for instant: " + instant);
throw new HoodieException("Exception while adding checkpoint commit metadata for instant: " + instant, e);
}
}

Expand All @@ -166,7 +166,7 @@ private void load() {
try {
this.messages = scanCkpMetadata(this.path);
} catch (IOException e) {
throw new HoodieException("Exception while scanning the checkpoint meta files under path: " + this.path);
throw new HoodieException("Exception while scanning the checkpoint meta files under path: " + this.path, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table;

import org.apache.hudi.adapter.DataStreamSinkProviderAdapter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
Expand All @@ -30,7 +31,6 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
Expand Down Expand Up @@ -60,7 +60,7 @@ public HoodieTableSink(Configuration conf, ResolvedSchema schema, boolean overwr

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return (DataStreamSinkProvider) dataStream -> {
return (DataStreamSinkProviderAdapter) dataStream -> {

// setup configuration
long ckpTimeout = dataStream.getExecutionEnvironment()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table;

import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieBaseFile;
Expand Down Expand Up @@ -64,7 +65,6 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
Expand Down Expand Up @@ -167,7 +167,7 @@ public HoodieTableSource(

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
return new DataStreamScanProvider() {
return new DataStreamScanProviderAdapter() {

@Override
public boolean isBounded() {
Expand Down
Loading