Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
561 changes: 281 additions & 280 deletions hudi-client/hudi-flink-client/pom.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ public Option<String> getProperty(EngineProperty prop) {
// no operation for now
return Option.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@
import java.util.Map;
import java.util.stream.Collectors;

/**
* Flink hoodie write client.
*
* <p>The client is used both on driver (for starting/committing transactions)
* and executor (for writing dataset).
*
* @param <T> type of the payload
*/
@SuppressWarnings("checkstyle:LineLength")
public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
BaseHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.util.FlinkClientUtil;

import org.apache.flink.api.common.functions.RuntimeContext;

Expand All @@ -45,10 +48,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.util.FlinkClientUtil;

import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapToPairWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
import java.util.Iterator;
import java.util.List;

/**
* Flink lazy iterable that supports explicit write handler.
*
* @param <T> type of the payload
*/
public class FlinkLazyInsertIterable<T extends HoodieRecordPayload> extends HoodieLazyInsertIterable<T> {

public FlinkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public interface MiniBatchHandle {
* come from one checkpoint interval. The file handle may roll over to new name
* if the name conflicts, give a chance to clean the intermediate file.
*/
default void finalizeWrite() {}
default void finalizeWrite() {
}

/**
* Close the file handle gracefully, if any error happens during the file handle close,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.io.storage.HoodieParquetConfig;

import org.apache.flink.table.data.RowData;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.table.types.logical.TimestampType;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
Expand Down Expand Up @@ -76,7 +75,7 @@ public static TypeInformation<?> fromParquetType(MessageType type) {
* Converts Flink Internal Type to Parquet schema.
*
* @param typeInformation Flink type information
* @param legacyMode is standard LIST and MAP schema or back-compatible schema
* @param legacyMode is standard LIST and MAP schema or back-compatible schema
* @return Parquet schema
*/
public static MessageType toParquetType(
Expand Down Expand Up @@ -569,7 +568,7 @@ private static Type convertToParquetType(
int scale = ((DecimalType) type).getScale();
int numBytes = computeMinBytesForDecimalPrecision(precision);
return Types.primitive(
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
.as(LogicalTypeAnnotation.decimalType(scale, precision))
.length(numBytes)
.named(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import java.util.List;
import java.util.Map;

/**
* Flink hoodie backed table metadata writer.
*/
public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {

private static final Logger LOG = LogManager.getLogger(FlinkHoodieBackedTableMetadataWriter.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ HoodieWriteMetadata<List<WriteStatus>> insert(
* @param context HoodieEngineContext
* @param writeHandle The write handle
* @param instantTime Instant Time for the action
* @param keys {@link List} of {@link HoodieKey}s to be deleted
* @param keys {@link List} of {@link HoodieKey}s to be deleted
* @return HoodieWriteMetadata
*/
HoodieWriteMetadata<List<WriteStatus>> delete(
Expand All @@ -96,9 +96,9 @@ HoodieWriteMetadata<List<WriteStatus>> delete(
* <p>Specifies the write handle explicitly in order to have fine grained control with
* the underneath file.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param preppedRecords hoodieRecords to upsert
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param preppedRecords hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(
Expand All @@ -115,9 +115,9 @@ HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(
* <p>Specifies the write handle explicitly in order to have fine grained control with
* the underneath file.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param preppedRecords hoodieRecords to upsert
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param preppedRecords hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
HoodieWriteMetadata<List<WriteStatus>> insertPrepped(
Expand All @@ -130,10 +130,10 @@ HoodieWriteMetadata<List<WriteStatus>> insertPrepped(
* Replaces all the existing records and inserts the specified new records into Hoodie table at the supplied instantTime,
* for the partition paths contained in input records.
*
* @param context HoodieEngineContext
* @param context HoodieEngineContext
* @param writeHandle The write handle
* @param instantTime Instant time for the replace action
* @param records input records
* @param records input records
* @return HoodieWriteMetadata
*/
HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(
Expand All @@ -146,10 +146,10 @@ HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(
* Deletes all the existing records of the Hoodie table and inserts the specified new records into Hoodie table at the supplied instantTime,
* for the partition paths contained in input records.
*
* @param context HoodieEngineContext
* @param context HoodieEngineContext
* @param writeHandle The write handle
* @param instantTime Instant time for the replace action
* @param records input records
* @param records input records
* @return HoodieWriteMetadata
*/
HoodieWriteMetadata<List<WriteStatus>> insertOverwriteTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -147,7 +148,7 @@ public HoodieWriteMetadata<List<WriteStatus>> insert(
* @param context HoodieEngineContext
* @param writeHandle The write handle
* @param instantTime Instant Time for the action
* @param keys {@link List} of {@link HoodieKey}s to be deleted
* @param keys {@link List} of {@link HoodieKey}s to be deleted
* @return HoodieWriteMetadata
*/
public HoodieWriteMetadata<List<WriteStatus>> delete(
Expand All @@ -166,9 +167,9 @@ public HoodieWriteMetadata<List<WriteStatus>> delete(
* <p>Specifies the write handle explicitly in order to have fine grained control with
* the underneath file.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param preppedRecords hoodieRecords to upsert
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param preppedRecords hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(
Expand All @@ -187,9 +188,9 @@ public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(
* <p>Specifies the write handle explicitly in order to have fine grained control with
* the underneath file.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param preppedRecords hoodieRecords to upsert
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param preppedRecords hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
public HoodieWriteMetadata<List<WriteStatus>> insertPrepped(
Expand Down Expand Up @@ -287,7 +288,7 @@ public HoodieWriteMetadata<List<WriteStatus>> compact(

@Override
public Option<HoodieClusteringPlan> scheduleClustering(final HoodieEngineContext context, final String instantTime, final Option<Map<String, String>> extraMetadata) {
return new ClusteringPlanActionExecutor<>(context, config,this, instantTime, extraMetadata).execute();
return new ClusteringPlanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute();
}

@Override
Expand All @@ -306,8 +307,8 @@ public void rollbackBootstrap(HoodieEngineContext context, String instantTime) {
}

/**
* @param context HoodieEngineContext
* @param instantTime Instant Time for scheduling cleaning
* @param context HoodieEngineContext
* @param instantTime Instant Time for scheduling cleaning
* @param extraMetadata additional metadata to write into plan
* @return
*/
Expand Down Expand Up @@ -371,7 +372,7 @@ public Iterator<List<WriteStatus>> handleUpdate(
return handleUpdateInternal(upsertHandle, instantTime, fileId);
}

protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?,?,?> upsertHandle, String instantTime,
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String instantTime,
String fileId) throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import java.util.List;
import java.util.Map;

/**
* Flink MERGE_ON_READ table.
*/
public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
extends HoodieFlinkCopyOnWriteTable<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@

import java.util.List;

/**
* Impl of a flink hoodie table.
*/
public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
implements ExplicitWriteHandleTable<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import java.util.List;

/**
* Flink delete commit action executor.
*/
public class FlinkDeleteCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseFlinkCommitActionExecutor<T> {
private final List<HoodieKey> keys;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import java.util.List;
import java.util.stream.Collectors;

/**
* Flink delete helper.
*/
@SuppressWarnings("checkstyle:LineLength")
public class FlinkDeleteHelper<R> extends
BaseDeleteHelper<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>, R> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import java.util.List;

/**
* Flink insert commit action executor.
*/
public class FlinkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseFlinkCommitActionExecutor<T> {

private List<HoodieRecord<T>> inputRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@

import java.util.List;

/**
* Flink INSERT OVERWRITE commit action executor.
*/
public class FlinkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseFlinkCommitActionExecutor<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import java.util.List;

/**
* Flink INSERT OVERWRITE TABLE commit action executor.
*/
public class FlinkInsertOverwriteTableCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends FlinkInsertOverwriteCommitActionExecutor<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import java.util.List;

/**
* Flink insert prepped commit action executor.
*/
public class FlinkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseFlinkCommitActionExecutor<T> {

private final List<HoodieRecord<T>> preppedRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import java.util.Iterator;
import java.util.List;

/**
* Flink merge helper.
*/
public class FlinkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHelper<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import java.util.List;

/**
* Flink upsert commit action executor.
*/
public class FlinkUpsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseFlinkCommitActionExecutor<T> {

private List<HoodieRecord<T>> inputRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import java.util.List;

/**
* Flink upsert prepped commit action executor.
*/
public class FlinkUpsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseFlinkCommitActionExecutor<T> {

private final List<HoodieRecord<T>> preppedRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import java.util.Iterator;
import java.util.List;

/**
* Base flink delta commit action executor.
*/
public abstract class BaseFlinkDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseFlinkCommitActionExecutor<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import java.util.List;

/**
* Flink upsert delta commit action executor.
*/
public class FlinkUpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseFlinkDeltaCommitActionExecutor<T> {
private final List<HoodieRecord<T>> inputRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import java.util.List;

/**
* Flink upsert prepped delta commit action executor.
*/
public class FlinkUpsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseFlinkDeltaCommitActionExecutor<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.hudi.client.common;

import org.apache.hudi.client.FlinkTaskContextSupplier;

import org.apache.hudi.common.util.collection.ImmutablePair;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down
Loading