Skip to content

Commit dafd90e

Browse files
committed
[HUDI-4898] presto/hive respect payload during merge parquet file and logfile when reading mor table
1 parent cbf9b83 commit dafd90e

8 files changed

Lines changed: 635 additions & 9 deletions

File tree

hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,18 @@
2424
import org.apache.hudi.exception.HoodieException;
2525
import org.apache.hudi.common.table.TableSchemaResolver;
2626
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
27+
import org.apache.hudi.hadoop.utils.HiveAvroSerializer;
2728

2829
import org.apache.avro.Schema;
2930
import org.apache.avro.Schema.Field;
3031
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
3132
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
33+
import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
34+
import org.apache.hadoop.hive.serde.serdeConstants;
35+
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
36+
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
37+
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
38+
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
3239
import org.apache.hadoop.mapred.JobConf;
3340
import org.apache.log4j.LogManager;
3441
import org.apache.log4j.Logger;
@@ -55,13 +62,19 @@ public abstract class AbstractRealtimeRecordReader {
5562
private Schema writerSchema;
5663
private Schema hiveSchema;
5764
private HoodieTableMetaClient metaClient;
65+
// support merge operation
66+
protected boolean supportPayload = true;
67+
// handle hive type to avro record
68+
protected HiveAvroSerializer serializer;
5869

5970
public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) {
6071
this.split = split;
6172
this.jobConf = job;
6273
LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
6374
LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
6475
LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
76+
this.supportPayload = Boolean.parseBoolean(job.get("hoodie.support.payload", "true"));
77+
prepareHiveAvroSerializer();
6578
try {
6679
metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build();
6780
if (metaClient.getTableConfig().getPreCombineField() != null) {
@@ -80,6 +93,26 @@ private boolean usesCustomPayload(HoodieTableMetaClient metaClient) {
8093
|| metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload"));
8194
}
8295

96+
private void prepareHiveAvroSerializer() {
97+
try {
98+
List<String> hiveInternalColumns = Arrays.asList(new String[] {"BLOCK__OFFSET__INSIDE__FILE", "INPUT__FILE__NAME", "ROW__ID"});
99+
List<String> columnNameList = Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
100+
int dropNum = columnNameList.stream().filter(f -> hiveInternalColumns.contains(f)).map(f -> columnNameList.indexOf(f)).collect(Collectors.toList()).size();
101+
List<TypeInfo> columnTypeList = TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
102+
for (int i = 0; i < dropNum; i++) {
103+
columnNameList.remove(columnNameList.size() - 1);
104+
columnTypeList.remove(columnTypeList.size() - 1);
105+
}
106+
StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
107+
this.serializer = new HiveAvroSerializer(new ArrayWritableObjectInspector(rowTypeInfo), columnNameList, columnTypeList);
108+
} catch (Exception e) {
109+
// fallback to origin logical
110+
LOG.warn("fall to init HiveAvroSerializer to support payload merge", e);
111+
this.supportPayload = false;
112+
}
113+
114+
}
115+
83116
/**
84117
* Gets schema from HoodieTableMetaClient. If not, falls
85118
* back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into the

hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSpli
7272
// For e:g _hoodie_record_key would be missing and merge step would throw exceptions.
7373
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
7474
// time.
75-
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, Option.empty());
75+
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, Option.empty(), Option.empty());
7676

7777
this.conf = jobConf;
7878
this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");

hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
import org.apache.hadoop.mapred.Reporter;
2828
import org.apache.hudi.common.fs.FSUtils;
2929
import org.apache.hudi.common.util.ValidationUtils;
30+
import org.apache.hudi.common.table.HoodieTableConfig;
31+
import org.apache.hudi.common.table.HoodieTableMetaClient;
32+
import org.apache.hudi.common.util.Option;
33+
import org.apache.hudi.common.util.StringUtils;
3034
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
3135
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
3236
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
@@ -61,7 +65,10 @@ public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSpli
6165
ValidationUtils.checkArgument(split instanceof RealtimeSplit,
6266
"HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split);
6367
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
64-
addProjectionToJobConf(realtimeSplit, jobConf);
68+
// add preCombineKey
69+
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(realtimeSplit.getBasePath()).build();
70+
HoodieTableConfig tableConfig = metaClient.getTableConfig();
71+
addProjectionToJobConf(realtimeSplit, jobConf, metaClient.getTableConfig().getPreCombineField());
6572
LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
6673
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
6774

@@ -74,7 +81,7 @@ public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSpli
7481
super.getRecordReader(split, jobConf, reporter));
7582
}
7683

77-
void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) {
84+
void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf, String preCombineKey) {
7885
// Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the
7986
// same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the
8087
// risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
@@ -94,7 +101,8 @@ void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf job
94101
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
95102
// time.
96103
if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
97-
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, realtimeSplit.getVirtualKeyInfo());
104+
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, realtimeSplit.getVirtualKeyInfo(),
105+
StringUtils.isNullOrEmpty(preCombineKey) ? Option.empty() : Option.of(preCombineKey));
98106
}
99107
jobConf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
100108
setConf(jobConf);

hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
3434
import org.apache.hudi.common.util.Option;
3535
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
36+
import org.apache.hudi.hadoop.utils.HiveAvroSerializer;
3637
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
3738
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
3839
import org.apache.log4j.LogManager;
@@ -81,7 +82,7 @@ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOExcept
8182
.withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf))
8283
.withBasePath(split.getBasePath())
8384
.withLogFilePaths(split.getDeltaLogPaths())
84-
.withReaderSchema(usesCustomPayload ? getWriterSchema() : getReaderSchema())
85+
.withReaderSchema(getWriterSchema())
8586
.withLatestInstantTime(split.getMaxCommitTime())
8687
.withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf))
8788
.withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
@@ -112,9 +113,7 @@ public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOEx
112113
if (deltaRecordMap.containsKey(key)) {
113114
// mark the key as handled
114115
this.deltaRecordKeys.remove(key);
115-
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
116-
// deltaRecord may not be a full record and needs values of columns from the parquet
117-
Option<GenericRecord> rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
116+
Option<GenericRecord> rec = supportPayload ? mergeRecord(deltaRecordMap.get(key), arrayWritable) : buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
118117
// If the record is not present, this is a delete record using an empty payload so skip this base record
119118
// and move to the next record
120119
if (!rec.isPresent()) {
@@ -173,6 +172,24 @@ private void setUpWritable(Option<GenericRecord> rec, ArrayWritable arrayWritabl
173172
}
174173
}
175174

175+
private Option<GenericRecord> mergeRecord(HoodieRecord<? extends HoodieRecordPayload> newRecord, ArrayWritable writableFromParquet) throws IOException {
176+
GenericRecord oldRecord = convertArrayWritableToHoodieRecord(writableFromParquet);
177+
// presto will not append partition columns to jobConf.get(serdeConstants.LIST_COLUMNS), but hive will do it. This will lead following results
178+
// eg: current table: col1: int, col2: int, par: string, and column par is partition columns.
179+
// for hive engine, the hiveSchema will be: col1,col2,par, and the writerSchema will be col1,col2,par
180+
// for presto engine, the hiveSchema will be: col1,col2, but the writerSchema will be col1,col2,par
181+
// so to be compatible with hive and presto, we should rewrite oldRecord before we call combineAndGetUpdateValue,
182+
// once presto on hudi have it's own mor reader, we can remove the rewrite logical.
183+
Option<GenericRecord> combinedValue = newRecord.getData().combineAndGetUpdateValue(HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord,
184+
getWriterSchema()), getWriterSchema(), payloadProps);
185+
return combinedValue;
186+
}
187+
188+
private GenericRecord convertArrayWritableToHoodieRecord(ArrayWritable arrayWritable) {
189+
GenericRecord record = serializer.serialize(arrayWritable, getHiveSchema());
190+
return record;
191+
}
192+
176193
@Override
177194
public NullWritable createKey() {
178195
return parquetReader.createKey();

0 commit comments

Comments
 (0)