Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,19 @@ public static HoodieTableMetaClient init(Configuration hadoopConf, String basePa
}

public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
HoodieFileFormat baseFileFormat)
HoodieFileFormat baseFileFormat) throws IOException {
return init(hadoopConf, basePath, tableType, baseFileFormat, false, null, true);
}

public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
HoodieFileFormat baseFileFormat, boolean setKeyGen, String keyGenerator, boolean populateMetaFields)
throws IOException {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), baseFileFormat.toString());
if (setKeyGen) {
properties.setProperty("hoodie.datasource.write.keygenerator.class", keyGenerator);
}
properties.setProperty("hoodie.populate.meta.fields", Boolean.toString(populateMetaFields));
return init(hadoopConf, basePath, tableType, properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
Expand Down Expand Up @@ -275,16 +276,16 @@ protected static Option<HoodieVirtualKeyInfo> getHoodieVirtualKeyInfo(HoodieTabl
if (tableConfig.populateMetaFields()) {
return Option.empty();
}

TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
try {
Schema schema = tableSchemaResolver.getTableAvroSchema();
boolean isNonPartitionedKeyGen = StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp());
return Option.of(
new HoodieVirtualKeyInfo(
tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp(),
isNonPartitionedKeyGen ? Option.empty() : Option.of(tableConfig.getPartitionFieldProp()),
schema.getField(tableConfig.getRecordKeyFieldProp()).pos(),
schema.getField(tableConfig.getPartitionFieldProp()).pos()));
isNonPartitionedKeyGen ? Option.empty() : Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos())));
} catch (Exception exception) {
throw new HoodieException("Fetching table schema failed with exception ", exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.hadoop.realtime;

import org.apache.hudi.common.util.Option;

import java.io.Serializable;

/**
Expand All @@ -26,11 +28,11 @@
public class HoodieVirtualKeyInfo implements Serializable {

private final String recordKeyField;
private final String partitionPathField;
private final Option<String> partitionPathField;
private final int recordKeyFieldIndex;
private final int partitionPathFieldIndex;
private final Option<Integer> partitionPathFieldIndex;

public HoodieVirtualKeyInfo(String recordKeyField, String partitionPathField, int recordKeyFieldIndex, int partitionPathFieldIndex) {
public HoodieVirtualKeyInfo(String recordKeyField, Option<String> partitionPathField, int recordKeyFieldIndex, Option<Integer> partitionPathFieldIndex) {
this.recordKeyField = recordKeyField;
this.partitionPathField = partitionPathField;
this.recordKeyFieldIndex = recordKeyFieldIndex;
Expand All @@ -41,25 +43,25 @@ public String getRecordKeyField() {
return recordKeyField;
}

public String getPartitionPathField() {
public Option<String> getPartitionPathField() {
return partitionPathField;
}

public int getRecordKeyFieldIndex() {
return recordKeyFieldIndex;
}

public int getPartitionPathFieldIndex() {
public Option<Integer> getPartitionPathFieldIndex() {
return partitionPathFieldIndex;
}

@Override
public String toString() {
return "HoodieVirtualKeyInfo{"
+ "recordKeyField='" + recordKeyField + '\''
+ ", partitionPathField='" + partitionPathField + '\''
+ ", partitionPathField='" + (partitionPathField.isPresent() ? partitionPathField.get() : "null") + '\''
+ ", recordKeyFieldIndex=" + recordKeyFieldIndex
+ ", partitionPathFieldIndex=" + partitionPathFieldIndex
+ ", partitionPathFieldIndex=" + (partitionPathFieldIndex.isPresent() ? partitionPathFieldIndex.get() : "-1")
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ default void writeToOutput(DataOutput out) throws IOException {
} else {
InputSplitUtils.writeBoolean(true, out);
InputSplitUtils.writeString(virtualKeyInfoOpt.get().getRecordKeyField(), out);
InputSplitUtils.writeString(virtualKeyInfoOpt.get().getPartitionPathField(), out);
InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getRecordKeyFieldIndex()), out);
InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getPartitionPathFieldIndex()), out);
InputSplitUtils.writeBoolean(virtualKeyInfoOpt.get().getPartitionPathField().isPresent(), out);
if (virtualKeyInfoOpt.get().getPartitionPathField().isPresent()) {
InputSplitUtils.writeString(virtualKeyInfoOpt.get().getPartitionPathField().get(), out);
InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getPartitionPathFieldIndex()), out);
}
}
}

Expand All @@ -130,9 +133,10 @@ default void readFromInput(DataInput in) throws IOException {
boolean hoodieVirtualKeyPresent = InputSplitUtils.readBoolean(in);
if (hoodieVirtualKeyPresent) {
String recordKeyField = InputSplitUtils.readString(in);
String partitionPathField = InputSplitUtils.readString(in);
int recordFieldIndex = Integer.parseInt(InputSplitUtils.readString(in));
int partitionPathIndex = Integer.parseInt(InputSplitUtils.readString(in));
boolean isPartitionPathFieldPresent = InputSplitUtils.readBoolean(in);
Option<String> partitionPathField = isPartitionPathFieldPresent ? Option.of(InputSplitUtils.readString(in)) : Option.empty();
Option<Integer> partitionPathIndex = isPartitionPathFieldPresent ? Option.of(Integer.parseInt(InputSplitUtils.readString(in))) : Option.empty();
setVirtualKeyInfo(Option.of(new HoodieVirtualKeyInfo(recordKeyField, partitionPathField, recordFieldIndex, partitionPathIndex)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ public static void addRequiredProjectionFields(Configuration configuration, Opti
} else {
HoodieVirtualKeyInfo hoodieVirtualKey = hoodieVirtualKeyInfo.get();
addProjectionField(configuration, hoodieVirtualKey.getRecordKeyField(), hoodieVirtualKey.getRecordKeyFieldIndex());
addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField(), hoodieVirtualKey.getPartitionPathFieldIndex());
if (hoodieVirtualKey.getPartitionPathField().isPresent()) {
addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField().get(), hoodieVirtualKey.getPartitionPathFieldIndex().get());
}
}
}

Expand All @@ -99,7 +101,8 @@ public static boolean requiredProjectionFieldsExistInConf(Configuration configur
&& readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
} else {
return readColNames.contains(hoodieVirtualKeyInfo.get().getRecordKeyField())
&& readColNames.contains(hoodieVirtualKeyInfo.get().getPartitionPathField());
&& (hoodieVirtualKeyInfo.get().getPartitionPathField().isPresent() ? readColNames.contains(hoodieVirtualKeyInfo.get().getPartitionPathField().get())
: true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,25 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.Job;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
Expand All @@ -55,6 +61,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
Expand Down Expand Up @@ -167,6 +174,26 @@ public void testInputFormatLoad() throws IOException {
assertEquals(10, files.length);
}

@Test
public void testInputFormatLoadForNonPartitionedAndVirtualKeyedTable() throws IOException {
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
File partitionDir = InputFormatTestUtil.prepareCustomizedTable(basePath, baseFileFormat, 10, "100", true, false,
true, schema);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(basePath.toString(), "100", Option.of(commitMetadata));

// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());

InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 10);
assertEquals(10, inputSplits.length);

FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);
}

@Test
public void testInputFormatLoadWithEmptyTable() throws IOException {
// initial hoodie table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,35 @@ public class InputFormatTestUtil {
private static String TEST_WRITE_TOKEN = "1-0-1";

public static File prepareTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
String commitNumber)
String commitNumber) throws IOException {
return prepareCustomizedTable(basePath, baseFileFormat, numberOfFiles, commitNumber, false, true, false, null);
}

public static File prepareCustomizedTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
String commitNumber, boolean useNonPartitionedKeyGen, boolean populateMetaFields, boolean injectData, Schema schema)
throws IOException {
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat);
if (useNonPartitionedKeyGen) {
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat, true, "org.apache.hudi.keygen.NonpartitionedKeyGenerator", populateMetaFields);
} else {
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat);
}

java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
setupPartition(basePath, partitionPath);

return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1", numberOfFiles,
commitNumber);
if (injectData) {
try {
createSimpleData(schema, partitionPath, numberOfFiles, 100, commitNumber);
return partitionPath.toFile();
} catch (Exception e) {
throw new IOException("Excpetion thrown while writing data ", e);
}
} else {
return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1", numberOfFiles,
commitNumber);
}
}

public static File prepareMultiPartitionTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
Expand Down