Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
137 commits
Select commit Hold shift + click to select a range
d73d7f5
[HUDI-2815] add partial overwrite payload to support partial overwrit…
stayrascal Jan 30, 2022
6b6a60f
[HUDI-2815] add compareTo test case
stayrascal Feb 6, 2022
2fa2d57
Merge branch 'master' into HUDI-2815
stayrascal Feb 6, 2022
21df6fe
[HUDI-2815] fix conflict by changing HoodieRecord to HoodieAvroRecord
stayrascal Feb 7, 2022
940f6de
Merge remote-tracking branch 'origin/master' into HUDI-2815
stayrascal Feb 21, 2022
14edef0
[HUDI-2815] 1. passing the payload schema instead of embedding it in …
stayrascal Feb 21, 2022
ce561bc
[HUDI-2815] add test case for nest type for testing partial update
stayrascal Feb 21, 2022
c6f524e
[HUDI-2815] remove unused configuration and refactor partial update l…
stayrascal Feb 25, 2022
d3b3e05
[HUDI-2815] pass schema during precombine two record in compaction pr…
stayrascal Feb 26, 2022
10e080b
[MINOR] fix get builtin function issue from Hudi catalog
stayrascal Feb 27, 2022
a86b7ff
Merge branch 'master' into HUDI-2815
stayrascal Mar 8, 2022
d11c670
Merge branch 'master' into HUDI-2815
stayrascal Mar 16, 2022
715d4b0
Merge branch 'master' into HUDI-2815
stayrascal Mar 22, 2022
e89fd60
[HUDI-2815] fix the conflict and small refactor
stayrascal Mar 22, 2022
3f771d3
Merge branch 'master' into HUDI-2815
stayrascal Mar 25, 2022
b823e94
[HUDI-3521] Fixing kakfa key and value serializer value type from cla…
nsivabalan Feb 27, 2022
9c15335
[HUDI-3018] Adding validation to dataframe scheme to ensure reserved …
nsivabalan Feb 27, 2022
eef40bc
[MINOR] Change MINI_BATCH_SIZE to 2048 (#4862)
cuibo01 Feb 28, 2022
3a373c2
[HUDI-2917] rollback insert data appended to log file when using Hbas…
nsivabalan Feb 28, 2022
6fbf453
[HUDI-3528] Fix String convert issue and overwrite putAll method in T…
stayrascal Feb 28, 2022
1e236ba
[HUDI-3341] Fix log file reader for S3 with hadoop-aws 2.7.x (#4897)
yihua Feb 28, 2022
ac3e72a
[HUDI-3450] Avoid passing empty string spark master to hudi cli (#4844)
zhedoubushishi Feb 28, 2022
f1a8d0c
[HUDI-3418] Save timeout option for remote RemoteFileSystemView (#4809)
yuzhaojing Feb 28, 2022
3697d8c
[HUDI-3465] Add validation of column stats and bloom filters in Hoodi…
yihua Mar 1, 2022
975c463
[HUDI-3497] Adding Datatable validator tool (#4902)
nsivabalan Mar 1, 2022
46ea95d
[HUDI-3441] Add support for "marker delete" in hudi-cli (#4922)
XuQianJin-Stars Mar 1, 2022
4aaee39
[HUDI-3516] Implement record iterator for HoodieDataBlock (#4909)
cuibo01 Mar 2, 2022
6a13069
[HUDI-2631] In CompactFunction, set up the write schema each time wit…
yuzhaojing Mar 2, 2022
466a633
[HUDI-3469] Refactor `HoodieTestDataGenerator` to provide for reprodu…
Mar 2, 2022
fe4aefd
[HUDI-3315] RFC-35 Part-1 Support bucket index in Flink writer (#4679)
garyli1019 Mar 2, 2022
0b9f295
[minor] Cosmetic changes following HUDI-3315 (#4934)
danny0405 Mar 2, 2022
6731992
[MINOR] Adding more test props to integ tests (#4935)
nsivabalan Mar 2, 2022
4b975fd
[MINOR] RFC-38 markdown content error (#4933)
liujinhui1994 Mar 2, 2022
7a30b08
[HUDI-3264]: made schema registry urls configurable with MTDS (#4779)
pratyakshsharma Mar 2, 2022
d6e38af
[HUDI-2973] RFC-27: Data skipping index to improve query performance …
manojpec Mar 3, 2022
ef9ff1a
[HUDI-3544] Fixing "populate meta fields" update to metadata table (#…
nsivabalan Mar 3, 2022
dd7e772
[HUDI-3552] Strength the NetworkUtils#getHostname by checking network…
danny0405 Mar 3, 2022
8a4cfb7
[HUDI-3548] Fix if user specify key "hoodie.datasource.clustering.asy…
Mar 4, 2022
6af6076
[HUDI-3445] Support Clustering Command Based on Call Procedure Comman…
huberylee Mar 4, 2022
77b0f3f
[HUDI-3161][RFC-47] Add Call Produce Command for Spark SQL (#4607)
XuQianJin-Stars Mar 4, 2022
6bb4181
[MINOR] fix UTC timezone config (#4950)
YuweiXiao Mar 4, 2022
6c4b714
[HUDI-3348] Add UT to verify HoodieRealtimeFileSplit serde (#4951)
xushiyan Mar 4, 2022
b851feb
[HUDI-3460] Add reader merge memory option for flink (#4911)
cuibo01 Mar 4, 2022
4d86424
[HUDI-2761] Fixing timeline server for repeated refreshes (#4812)
nsivabalan Mar 5, 2022
0b21be2
[HUDI-3130] Fixing Hive getSchema for RT tables addressing different …
aditiwari01 Mar 6, 2022
55f5626
[HUDI-3520] Introduce DeleteSupportSchemaPostProcessor to support add…
wangxianghu Mar 6, 2022
d2aed60
[HUDI-3525] Introduce JsonkafkaSourceProcessor to support data prepro…
wangxianghu Mar 6, 2022
4c15551
[HUDI-3069] Improve HoodieMergedLogRecordScanner avoid putting unnece…
scxwhite Mar 7, 2022
b9230e0
[HUDI-3213] Making commit preserve metadata to true for compaction (#…
nsivabalan Mar 7, 2022
1e68d6f
[HUDI-3561] Avoid including whole `MultipleSparkJobExecutionStrategy`…
Mar 7, 2022
f28bad6
[HUDI-3365] Make sure Metadata Table records are updated appropriatel…
Mar 7, 2022
6fa32a0
[HUDI-2747] support set --sparkMaster for MDT cli (#4964)
zhangyue19921010 Mar 7, 2022
da9962b
[HUDI-3576] Configuring timeline refreshes based on latest commit (#4…
nsivabalan Mar 7, 2022
f52553b
[HUDI-3573] flink cleanFuntion execute clean on initialization (#4936)
todd5167 Mar 8, 2022
a5b9f66
[MINOR][HUDI-3460]Fix HoodieDataSourceITCase
cuibo01 Mar 6, 2022
69f058c
[HUDI-2677] Add DFS based message queue for flink writer[part3] (#4961)
danny0405 Mar 8, 2022
2a18375
[HUDI-3574] Improve maven module configs for different spark profiles…
XuQianJin-Stars Mar 8, 2022
8cba0a9
[HUDI-3584] Skip integ test modules by default (#4986)
xushiyan Mar 8, 2022
ced2def
[HUDI-3356][HUDI-3203] HoodieData for metadata index records; BloomFi…
codope Mar 8, 2022
1409c0b
[HUDI-3221] Support querying a table as of a savepoint (#4720)
XuQianJin-Stars Mar 8, 2022
cd47bc9
[HUDI-3587] Making SupportsUpgradeDowngrade serializable (#4991)
nsivabalan Mar 9, 2022
d22d93f
[HUDI-3568] Introduce ChainedSchemaPostProcessor to support setting m…
wangxianghu Mar 9, 2022
d0d6981
[HUDI-3383] Sync column comments while syncing a hive table (#4960)
MrSleeping123 Mar 10, 2022
180b690
[MINOR] Add IT CI Test timeout option (#5003)
XuQianJin-Stars Mar 10, 2022
b4770df
[HUDI-3396] Make sure `BaseFileOnlyViewRelation` only reads projected…
Mar 10, 2022
f76144b
[HUDI-3581] Reorganize some clazz for hudi flink (#4983)
danny0405 Mar 10, 2022
f7886f8
[HUDI-3602][DOCS] Update docker README to build multi-arch images usi…
codope Mar 10, 2022
fc6c7a7
[HUDI-3586] Add Trino Queries in integration tests (#4988)
yihua Mar 11, 2022
7d89404
[HUDI-3595] Fixing NULL schema provider for empty batch (#5002)
nsivabalan Mar 11, 2022
801c69d
[HUDI-3522] Introduce DropColumnSchemaPostProcessor to support drop c…
wangxianghu Mar 11, 2022
cf03735
[HUDI-2999] [RFC-42] RFC for consistent hashing index (#4326)
YuweiXiao Mar 11, 2022
d963079
[HUDI-3566] Add thread factory in BoundedInMemoryExecutor (#4926)
scxwhite Mar 11, 2022
5f59bcb
[HUDI-3575] Use HoodieTestDataGenerator#TRIP_SCHEMA as example schema…
wangxianghu Mar 11, 2022
04baf70
[HUDI-3567] Refactor HoodieCommonUtils to make code more reasonable (…
huberylee Mar 11, 2022
5da95d5
[HUDI-3513] Make sure Column Stats does not fail in case it fails to …
Mar 11, 2022
a51bdb5
[HUDI-3592] Fix NPE of DefaultHoodieRecordPayload if Property is empt…
Mar 11, 2022
9e1cad8
[HUDI-3569] Introduce ChainedJsonKafkaSourePostProcessor to support s…
wangxianghu Mar 11, 2022
5403db3
[HUDI-3556] Re-use rollback instant for rolling back of clustering an…
nsivabalan Mar 11, 2022
151ce1e
[HUDI-3593] Restore TypedProperties and flush checksum in table confi…
codope Mar 13, 2022
ff16cdc
[HUDI-3583] Fix MarkerBasedRollbackStrategy NoSuchElementException (#…
liujinhui1994 Mar 13, 2022
54808ec
[HUDI-3501] Support savepoints command based on Call Produce Command …
XuQianJin-Stars Mar 13, 2022
6530d83
[HUDI-3613] Adding/fixing yamls for metadata (#5029)
nsivabalan Mar 14, 2022
6570198
[HUDI-3600] Tweak the default cleaning strategy to be more streaming …
danny0405 Mar 14, 2022
967b336
fix NPE when run schdule using spark-sql if the commits time < hoodie…
peanut-chenzhong Mar 14, 2022
399eb8d
[MINODR] Remove repeated kafka-clients dependencies (#5034)
wangxianghu Mar 14, 2022
07d6929
[HUDI-3621] Fixing NullPointerException in DeltaStreamer (#5039)
nsivabalan Mar 14, 2022
f9ae271
[HUDI-3623] Removing hive sync node from non hive yamls (#5040)
nsivabalan Mar 14, 2022
31b54c7
[HUDI-3620] Adding spark3.2.0 profile (#5038)
nsivabalan Mar 14, 2022
95ef13c
[HUDI-3547] Introduce MaxwellSourcePostProcessor to extract data from…
wangxianghu Mar 15, 2022
1a7157a
[HUDI-3606] Add `org.objenesis:objenesis` to hudi-timeline-server-bun…
cdmikechen Mar 15, 2022
145440b
[HUDI-3619] Fix HoodieOperation fromValue using wrong constant value …
Mar 15, 2022
c0eecb5
[HUDI-3514] Rebase Data Skipping flow to rely on MT Column Stats inde…
Mar 15, 2022
035c3ca
[HUDI-3633] Allow non-string values to be set in TypedProperties (#5045)
codope Mar 15, 2022
ece2ae6
[HUDI-3589] flink sync hive metadata supports table properties and se…
todd5167 Mar 15, 2022
a55ce33
[HUDI-3588] Remove hudi-common and hudi-hadoop-mr jars in Presto Dock…
yihua Mar 16, 2022
895becc
[HUDI-3607] Support backend switch in HoodieFlinkStreamer (#5032)
liufangqi Mar 16, 2022
00b2e45
[Hudi-3376] Add an option to skip under deletion files for HoodieMeta…
zhangyue19921010 Mar 17, 2022
4512e96
[HUDI-3404] Automatically adjust write configs based on metadata tabl…
yihua Mar 17, 2022
c163ac2
[HUDI-3494] Consider triggering condition of MOR compaction during ar…
yihua Mar 17, 2022
402f60e
[HUDI-3645] Fix NPE caused by multiple threads accessing non-thread-s…
fengjian428 Mar 17, 2022
b825b8a
[HUDI-2439] Replace RDD with HoodieData in HoodieSparkTable and commi…
xushiyan Mar 17, 2022
029622b
[MINOR] HoodieFileScanRDD could print null path (#5056)
Mar 17, 2022
931747d
[HUDI-3598] Row Data to Hoodie Record Operator parallelism needs to a…
JerryYue-M Mar 18, 2022
75abad6
[HUDI-3656] Adding medium sized dataset for clustering and minor fixe…
nsivabalan Mar 18, 2022
9037045
[HUDI-3659] Reducing the validation frequency with integ tests (#5067)
nsivabalan Mar 18, 2022
9c40d0c
[HUDI-3457] Refactored Spark DataSource Relations to avoid code dupli…
Mar 19, 2022
d9ca8e1
[HUDI-3663] Fixing Column Stats index to properly handle first Data T…
Mar 20, 2022
dfc05b7
[MINOR] Remove flaky assert in TestInLineFileSystem (#5069)
yihua Mar 20, 2022
618fe26
[HUDI-3665] Support flink multiple versions (#5072)
danny0405 Mar 21, 2022
b28f5d2
[MINOR] Fixing sparkUpdateNode for record generation (#5079)
nsivabalan Mar 21, 2022
542cec6
[HUDI-3559] Flink bucket index with COW table throws NoSuchElementExc…
wxplovecc Mar 11, 2022
75056ea
[HUDI-1436]: Provide an option to trigger clean every nth commit (#4385)
pratyakshsharma Mar 22, 2022
d1e31f8
[HUDI-3640] Set SimpleKeyGenerator as default in 2to3 table upgrade f…
yihua Mar 22, 2022
e19b5d1
[HUDI-2883] Refactor hive sync tool / config to use reflection and st…
rmahindra123 Mar 22, 2022
b709f75
[HUDI-3642] Handle NPE due to empty requested replacecommit metadata …
codope Mar 23, 2022
1ce9a5e
Fixing non partitioned all files record in MDT (#5108)
nsivabalan Mar 24, 2022
dcbb074
[minor] Checks the data block type for archived timeline (#5106)
danny0405 Mar 24, 2022
0640f20
[HUDI-3689] Fix glob path and hive sync in deltastreamer tests (#5117)
codope Mar 24, 2022
d482527
[HUDI-3684] Fixing NPE in `ParquetUtils` (#5102)
Mar 24, 2022
7f5ee51
[HUDI-3689] Remove Azure CI cache (#5121)
xushiyan Mar 24, 2022
5558b79
[HUDI-3689] Fix UT failures in TestHoodieDeltaStreamer (#5120)
xushiyan Mar 24, 2022
a9b4110
[HUDI-3706] Downgrade maven surefire and failsafe version (#5123)
yihua Mar 24, 2022
ffac31e
[HUDI-3689] Fix delta streamer tests (#5124)
xushiyan Mar 24, 2022
5854243
[HUDI-3689] Disable flaky tests in TestHoodieDeltaStreamer (#5127)
yihua Mar 24, 2022
f8092a3
[HUDI-3624] Check all instants before starting a commit in metadata t…
yihua Mar 25, 2022
32b9700
[HUDI-3638] Make ZookeeperBasedLockProvider serializable (#5112)
yihua Mar 25, 2022
27adaa2
[HUDI-3701] Flink bulk_insert support bucket hash index (#5118)
danny0405 Mar 25, 2022
9c49e43
[HUDI-1180] Upgrade HBase to 2.4.9 (#5004)
yihua Mar 25, 2022
1959d8b
[HUDI-3703] Reset taskID in restoreWriteMetadata (#5122)
yuzhaojing Mar 25, 2022
4568fae
[HUDI-3580] Claim RFC number 48 for LogCompaction action RFC (#5128)
suryaprasanna Mar 25, 2022
c43747e
[HUDI-3678] Fix record rewrite of create handle when 'preserveMetadat…
danny0405 Mar 25, 2022
5b66abf
[HUDI-3594] Supporting Composite Expressions over Data Table Columns …
Mar 25, 2022
06ac8cb
[HUDI-3711] Fix typo in MaxwellJsonKafkaSourcePostProcessor.Config#PR…
wangxianghu Mar 25, 2022
24cc379
[HUDI-3563] Make quickstart examples covered by CI tests (#5082)
XuQianJin-Stars Mar 25, 2022
5a0a1e9
`Merge branch 'master' into HUDI-2815
stayrascal Apr 12, 2022
d9da263
Merge branch 'master' into HUDI-2815
stayrascal Apr 12, 2022
20b1ee4
fix conflict
stayrascal Apr 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;

import org.apache.hudi.table.action.HoodieWriteMetadata;
Expand Down Expand Up @@ -80,11 +79,5 @@ public I combineOnCondition(
* @param parallelism parallelism or partitions to be used while reducing/deduplicating
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(
I records, HoodieTable<T, I, K, O> table, int parallelism) {
return deduplicateRecords(records, table.getIndex(), parallelism);
}

public abstract I deduplicateRecords(
I records, HoodieIndex<?, ?> index, int parallelism);
public abstract I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int parallelism);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;

public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {

Expand All @@ -51,17 +52,19 @@ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRec

@Override
public HoodieData<HoodieRecord<T>> deduplicateRecords(
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
boolean isIndexingGlobal = index.isGlobal();
HoodieData<HoodieRecord<T>> records,
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
int parallelism) {
boolean isIndexingGlobal = table.getIndex().isGlobal();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
return Pair.of(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec2.getData().preCombine(rec1.getData());
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), null, new Schema.Parser().parse(table.getConfig().getWriteSchema()));
HoodieKey reducedKey = rec2.getData().compareTo(rec1.getData()) < 0 ? rec1.getKey() : rec2.getKey();

return new HoodieAvroRecord<>(reducedKey, reducedData);
}, parallelism).map(Pair::getRight);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.avro.Schema;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -89,7 +90,9 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, Hoodie

@Override
public List<HoodieRecord<T>> deduplicateRecords(
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
List<HoodieRecord<T>> records,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
int parallelism) {
// If index used is global, then records are expected to differ in their partitionPath
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
.collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));
Expand All @@ -98,11 +101,12 @@ public List<HoodieRecord<T>> deduplicateRecords(
final T data1 = rec1.getData();
final T data2 = rec2.getData();

@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1);
Schema writeSchema = new Schema.Parser().parse(table.getConfig().getWriteSchema());
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, null, writeSchema);
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
boolean choosePrev = data1 == reducedData;
boolean choosePrev = data2.compareTo(data1) < 0;
HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we must need a compareTo here ?

Copy link
Contributor Author

@stayrascal stayrascal Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous logic of data2.preCombine(data1) is that return one of data1 or data2 ordering by their orderVal. But if we merge/combine data1 and data2 into a new payload(reduceData), the data1.equals(reduceData) is always false. In order to get the HoodieKey and HoodieOperation for new HoodieRecord with reduceData, we need to get the latest HoodieKey and HoodieOperation from data1 and data2compareTo is used for replace #preCombine to compare their orderingVal.

 @Override
  public int compareTo(OverwriteWithLatestAvroPayload oldValue) {
    return orderingVal.compareTo(oldValue.orderingVal);
  }
@Test
  public void testCompareFunction() {
    GenericRecord record = new GenericData.Record(schema);
    record.put("id", "1");
    record.put("partition", "partition1");
    record.put("ts", 0L);
    record.put("_hoodie_is_deleted", false);
    record.put("city", "NY0");
    record.put("child", Arrays.asList("A"));

    PartialOverwriteWithLatestAvroPayload payload1 = new PartialOverwriteWithLatestAvroPayload(record, 1);
    PartialOverwriteWithLatestAvroPayload payload2 = new PartialOverwriteWithLatestAvroPayload(record, 2);

    assertEquals(payload1.compareTo(payload2), -1);
    assertEquals(payload2.compareTo(payload1), 1);
    assertEquals(payload1.compareTo(payload1), 0);
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, rec1 and rec2 should have same HoodieKey here, right, but the HodieOperation might different.

HoodieRecord<T> hoodieRecord = new HoodieAvroRecord<>(reducedKey, reducedData, operation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class JavaWriteHelper<T extends HoodieRecordPayload,R> extends BaseWriteHelper<T, List<HoodieRecord<T>>,
public class JavaWriteHelper<T extends HoodieRecordPayload, R> extends BaseWriteHelper<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>, R> {

private JavaWriteHelper() {
Expand All @@ -55,9 +54,8 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, Hoodie
}

@Override
public List<HoodieRecord<T>> deduplicateRecords(
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
boolean isIndexingGlobal = index.isGlobal();
public List<HoodieRecord<T>> deduplicateRecords(List<HoodieRecord<T>> records, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, int parallelism) {
boolean isIndexingGlobal = table.getIndex().isGlobal();
Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,16 +460,17 @@ private void testDeduplication(

// Global dedup should be done based on recordKey only
HoodieIndex index = mock(HoodieIndex.class);
HoodieTable table = mock(HoodieTable.class);
when(table.getIndex()).thenReturn(index);
when(index.isGlobal()).thenReturn(true);
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, table, 1).collectAsList();
assertEquals(1, dedupedRecs.size());
assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
assertNodupesWithinPartition(dedupedRecs);

// non-Global dedup should be done based on both recordKey and partitionPath
index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(false);
dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
when(table.getIndex().isGlobal()).thenReturn(false);
dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, table, 1).collectAsList();
assertEquals(2, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,8 @@ public String getFileId() {
public void setFileId(String fileId) {
this.fileId = fileId;
}

public HoodieRecordLocation toLocal(String instantTime) {
return new HoodieRecordLocation(instantTime, fileId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,31 @@ default T preCombine(T oldValue, Properties properties) {
return preCombine(oldValue);
}

/**
*When more than one HoodieRecord have the same HoodieKey in the incoming batch, this function combines them before attempting to insert/upsert by taking in a property map.
*
* @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with.
* @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage.
* @param schema Schema used for record
* @return the combined value
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
default T preCombine(T oldValue, Properties properties, Schema schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think currently established semantic for preCombine -- you select either A or B, but you don't produce new record based on those 2, since it's mostly used to de-dupe records in the incoming batch. I can hardly imagine the case to fuse 2 incoming records into something third. Can you help me understand what use-case you have in mind here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alexeykudinkin for reviewing this.

What we are trying to do is implement partial update purpose. For example, let's assume the record schema is (f0 int , f1 int, f2 int), The first record value is: (1, 2, 3), the second record value is: (4, 5, null) with the field f2 value as null. We hope that the result after run preCombine is (4, 5, 3), which means we need to combine/merge two records to a third one, not only choose one of them.

Actually, what we want to implement is similar with #combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) which used for combine the incoming record with existing record from base/log file.
But #preCombine will be used for combing/merging two incoming records in a batch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that's exactly my question: why do you want to implement such semantic w/in preCombine? What use-case you're trying to accommodate for here?

Essentially with this change you will introduce a way for 2 records w/in the batch to be combined into 1. But why do you need this?

After all you can achieve the same goal if you just stop de-duping your records, and then subsequently merge them against what is on disk

Copy link
Contributor Author

@stayrascal stayrascal Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alexeykudinkin, I got your point. if we have to combine two records to a combined one, we'd better to implement the combine logics in other place, maybe in some util or helper classes, or skip the de-duping logic, right?

Here are some options from mine that #preCombine might be a better place to implement these logics, or create new merge method in HoodieRecordPayload interface.

  • First, from the description of preCombine method, it used for combining multiple records with same HoodieKey before attempting to insert/upsert to disk. The "combine multiple records" might not mean only choosing one of them, we also can combine & merged them to a new one, just depends on how the sub-class implement the preCombine logic(Please correct me if my understanding is wrong :) ). Yeah, it might be a little bit confused that we need Schema if we are trying to merged them.
  • Second, I checked when will we call preCombine method is trying to duplicate records with same HoodieKey before insert/update to disk, especially in Flink write case, even through the duplicated logic is choose the latest record, but we need to ensure that one HoodieKey should only contains one record before comparing to existing record and write to disk, otherwise, some records will missed. For example, in HoodieMergeHandle.init(fieId, newRecordsIter), it will convert the record iterator to a map and treat the recordKey as key. So we might not stop de-duping logics and merge them against what is on disk unless we change the logic here. And also we implement another class/method to handle the merge logic, and switch the existing de-duping logic from calling preCombine to new class/method, we have to add an condition to control whether should we call preCombine or not, I think it might not a good way. Instead, we should handle it in preCombine method by different implemented payload.

That's what my thought here, and I'm glad to listen your useful suggestions. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try to clarify a few things:

preCombine has a very specific semantic: it's de-duplicating by the way of picking "most recent" among records in the batch. Expectation always is that it being handed 2 records it will have to return either of them. It could not produce new record. If we want to revisit this semantic this is a far larger change that will surely require writing an RFC and broader discussion regarding the merits of such migration. Please also keep in mind that as of RFC-46 there's an effort underway to abstract whole "record combination/merging" semantic out of RecordPayload hierarchy into standalone Combination/Merge Engine API.

First, from the description of preCombine method, it used for combining multiple records with same HoodieKey before attempting to insert/upsert to disk. The "combine multiple records" might not mean only choosing one of them, we also can combine & merged them to a new one, just depends on how the sub-class implement the preCombine logic(Please correct me if my understanding is wrong :) ). Yeah, it might be a little bit confused that we need Schema if we are trying to merged them.

Please see my comment regarding preCombine semantic above. I certainly agree with you that the name is confusing, but i've tried to clear that confusion. Let me know if you have more questions about it.

Second, I checked when will we call preCombine method is trying to duplicate records with same HoodieKey before insert/update to disk, especially in Flink write case, even through the duplicated logic is choose the latest record, but we need to ensure that one HoodieKey should only contains one record before comparing to existing record and write to disk, otherwise, some records will missed. For example, in HoodieMergeHandle.init(fieId, newRecordsIter), it will convert the record iterator to a map and treat the recordKey as key. So we might not stop de-duping logics and merge them against what is on disk unless we change the logic here. And also we implement another class/method to handle the merge logic, and switch the existing de-duping logic from calling preCombine to new class/method, we have to add an condition to control whether should we call preCombine or not, I think it might not a good way. Instead, we should handle it in preCombine method by different implemented payload.

You're bringing up a good points, let's dive into them one by one: so currently we have 2 mechanisms

  1. preCombine that allows to select "most recent" record among those having the same key w/in the batch
  2. combineAndGetUpdateValue that allows to combine previous or "historical" record (on Disk) with the new incoming one (all partial merging semantic is currently implemented in this method)

You rightfully mention some of the invariants are currently that the batch would be de-duped at certain level (b/c we have to maintain PK uniqueness on disk), and so we might need to shift that to accommodate for case that you have. And that's exactly what my question was: if you can elaborate on use-case that you have at hand that you're trying to solve w/ this PR, i would be able to better understand where you're coming from and what's the best path forward for us here.

Questions i'm looking an answers for are basically following:

  1. What's nature of your use-case? (domain, record types, frequency, size, etc)
  2. Where requirements for partial updates are coming from?

and etc. I'm happy to set some 30min to talk in person regarding this or connect on Slack and discuss it there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alexeykudinkin , Thanks a lot for you detail clarification.

  1. Regarding the design of preCombine, I'm clear now. I'm sorry I don't know the detail of RFC-46, and also I didn't find the link RFC-46 from here, cloud you please share the link?
  2. and regarding the requirements for partial updates/overwrite, I saw some same requirements from community. In my case, generally, we want to build a customer profile with multiple attributes, these attributes might come from different systems, one system might only provides some attributes in a event/record, and two systems might the events/records with different attributes, we should not only choose the recent one, we need to merged them before writing to disk. Otherwise, we have to keep all change logs, and then start a new job to dedup & merge these attributes among the change logs. For example, we have 10 attributes a1-a10(all of them are optional), source system A only has the a1-a5, source system B only has a6-a10, what result we expect is that the final record contains a1-a10, not only a1-a5 or a6-a10. And because we might receive two events/records in same time, they might be in a same batch, that's why we want to merge them before combineAndGetUpdateValue .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, thanks a lot for you time, will ping you on slack.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CC @rmahindra123 who encountered a necessity to do preCombine but to combine bits and pieces from both records to return a new one. Rajesh: do you wanna go over your use-case may be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin , I'm sorry that I still don't find a suitable time to align with online, may i check any thoughts or suggests on this PR?

return preCombine(oldValue, properties);
}

/**
* When more than one HoodieRecord have the same HoodieKey in the incoming batch, and get the merged result after calling preCombine method instead of choose one of two records,
* can call this method to get the order among combined record with previous records
* @param oldValue instance of the old {@link HoodieRecordPayload} to be compare.
* @return a negative integer, zero, or a positive integer as this object is less than, equal to, or greater than the specified object.
*
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
default int compareTo(T oldValue) {
return 0;
}

/**
* This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.model;

import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;
import java.util.Objects;
import java.util.Properties;
import java.util.function.BiFunction;

import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;

/**
* The only difference with {@link OverwriteNonDefaultsWithLatestAvroPayload} is that it supports
* merging the latest non-null partial fields with the old record instead of replacing the whole record.
* And merging the non-null fields during preCombine multiple records with same record key instead of choosing the latest record based on ordering field.
*
* <p> Regarding #combineAndGetUpdateValue, Assuming a {@link GenericRecord} has row schema: (f0 int , f1 int, f2 int).
* The first record value is: (1, 2, 3), the second record value is: (4, 5, null) with the field f2 value as null.
* Calling the #combineAndGetUpdateValue method of the two records returns record: (4, 5, 3).
* Note that field f2 value is ignored because it is null. </p>
*
* <p> Regarding #preCombine, Assuming a {@link GenericRecord} has row schema: (f0 int , f1 int, f2 int, o1 int),
* and initial two {@link PartialOverwriteWithLatestAvroPayload} with different ordering value.
* The first record value is (1, null, 1, 1) with the filed f1 value as null, the second value is: (2, 2, null, 2) with the f2 value as null.
* Calling the #preCombine method of the two records returns record: (2, 2, 1, 2).
* Note:
* <ol>
* <li>the field f0 value is 2 because the ordering value of second record is bigger.</li>
* <li>the filed f1 value is 2 because the f2 value of first record is null.</li>
* <li>the filed f2 value is 1 because the f2 value of second record is null.</li>
* <li>the filed o1 value is 2 because the ordering value of second record is bigger.</li>
* </ol>
*
* </p>
*/
public class PartialOverwriteWithLatestAvroPayload extends OverwriteWithLatestAvroPayload {

public PartialOverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}

public PartialOverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
super(record); // natural order
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
if (recordBytes.length == 0) {
return Option.empty();
}

GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
GenericRecord currentRecord = (GenericRecord) currentValue;
if (isDeleteRecord(incomingRecord)) {
return Option.empty();
}
return Option.of(overwriteWithNonNullValue(schema, currentRecord, incomingRecord));
}

@Override
public int compareTo(OverwriteWithLatestAvroPayload oldValue) {
return this.orderingVal.compareTo(oldValue.orderingVal);
}

@Override
public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Properties properties, Schema schema) {
if (null == schema) {
// using default preCombine logic
return super.preCombine(oldValue);
}

try {
Option<IndexedRecord> incomingOption = this.getInsertValue(schema);
Option<IndexedRecord> oldRecordOption = oldValue.getInsertValue(schema);

if (incomingOption.isPresent() && oldRecordOption.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general it's better to express common functionality in a way that would allow it to be re-used and adopted in other places: here for ex, we can reuse the same routine of combining 2 records into one, across 2 methods if we properly abstract it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abstracted the merge method, but still in current class.

boolean inComingRecordIsLatest = this.compareTo(oldValue) >= 0;
// ordering two records by ordering value
GenericRecord firstRecord = (GenericRecord) (inComingRecordIsLatest ? oldRecordOption.get() : incomingOption.get());
GenericRecord secondRecord = (GenericRecord) (inComingRecordIsLatest ? incomingOption.get() : oldRecordOption.get());
GenericRecord mergedRecord = overwriteWithNonNullValue(schema, firstRecord, secondRecord);
return new PartialOverwriteWithLatestAvroPayload(mergedRecord, inComingRecordIsLatest ? this.orderingVal : oldValue.orderingVal);
} else {
return super.preCombine(oldValue);
}
} catch (IOException e) {
return super.preCombine(oldValue);
}
}

private GenericRecord mergeRecord(Schema schema, GenericRecord first, GenericRecord second, BiFunction<Object, Object, Object> mergeFunc) {
schema.getFields().forEach(field -> {
Object firstValue = first.get(field.name());
Object secondValue = second.get(field.name());
first.put(field.name(), mergeFunc.apply(firstValue, secondValue));
});
return first;
}

/**
* Merge two records, the merged value of each filed will adopt the filed value from secondRecord if the value is not null, otherwise, adopt the filed value from firstRecord.
*
* @param schema record schema to loop fields
* @param firstRecord the base record need to be updated
* @param secondRecord the new record provide new field value
* @return merged records
*/
private GenericRecord overwriteWithNonNullValue(Schema schema, GenericRecord firstRecord, GenericRecord secondRecord) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stayrascal,

I really liked the idea of having a record payload that does partial merging. However, if I understood it correctly, what's proposed here is to do so in a very specific way: you're favoring the income record field's values, unless they are null (in which case, you would keep the existing one). I'm not saying this is not valuable, but that the idea of doing partial merging is so good that maybe we could have something more generic. I'm going to suggest a few changes in order to accomplish that:

  • Make PartialOverwriteWithLatestAvroPayload an abstract class
  • Instead of having mergeFunc as a parameter of the mergeRecord method, it could become an abstract method. This would lead to the removal of the overwriteWithNonNullValue method, which makes this implementation specific to your merging logic
  • For the original use case (partial merge favoring non-null values), implement the proposed abstract class and implement the mergeFunc method with what you have in overwriteWithNonNullValue: (first, second) -> Objects.isNull(second) ? first : second

It's just an idea, that could make what you proposed useful for many more use cases. Hope this made sense, and thanks for bringing this idea!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alvarolemos , thanks a lot for your useful suggestion. Yeah, I also considered to abstract the merge logic by using an abstract merge method or passing merge function into a generic function, and I choose the later. The reason as follow:

  • the preCombine and combineAndGetUpdateUpdate might have different merge/combine logic, only implement one abstract merge function might not enough for both two cases. For example, these two methods in OverwriteWithLatestAvroPayload have different merge/combine logic.
  • In current implementation, actually, the mergeRecord is a generic method even through it's a private method currently, but it don't care the detail merge logic and can be changed to protected/public scope if need. Instead, the overwriteWithNonNullValue is merge implementation in current "Payload", which is wrapper of mergeFunc and we can create two wrappers for preCombine and combineAndGetUpdateValue two scenarios if need, which is similar with what you mentioned about implement detail mergeFunc logic in sub class. We can still inherit this class implement detail mergeFunc logic, and pass to mergeRecord method.
  • Another reason why i didn't chose creating abstract class currently is that there will only one sub class, we can refactor it if we have many case need to inherit this class, right now, just make it simple as much as possible.

return mergeRecord(schema, firstRecord, secondRecord, (first, second) -> Objects.isNull(second) ? first : second);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ public static void initializeBootstrapDirsIfNotExists(Configuration hadoopConf,
}


// Create bootstrap index by partition folder if it does not exist
// Create bootstrap index by fields folder if it does not exist
final Path bootstrap_index_folder_by_fileids =
new Path(basePath, HoodieTableMetaClient.BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH);
if (!fs.exists(bootstrap_index_folder_by_fileids)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoo

HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
HoodieRecordPayload oldValue = oldRecord.getData();
HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue);
HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue, null, this.readerSchema);
// If combinedValue is oldValue, no need rePut oldRecord
if (combinedValue != oldValue) {
HoodieOperation operation = hoodieRecord.getOperation();
Expand Down
Loading