diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index f4ee3fc9f2424..c7cc50967a485 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -294,7 +294,12 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi builder.withProperties(properties); if (writeConfig.isMetricsOn()) { + // Table Name is needed for metric reporters prefix + Properties commonProperties = new Properties(); + commonProperties.put(HoodieWriteConfig.TBL_NAME.key(), tableName); + builder.withMetricsConfig(HoodieMetricsConfig.newBuilder() + .fromProperties(commonProperties) .withReporterType(writeConfig.getMetricsReporterType().toString()) .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled()) .on(true).build()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 6d410ded1ccca..e10c372be64c7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -18,14 +18,6 @@ package org.apache.hudi.client.functional; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.util.Time; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; @@ -81,14 +73,14 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.hash.ColumnIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID; -import org.apache.hudi.config.HoodieClusteringConfig; -import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieIndexConfig; -import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.storage.HoodieHFileReader; @@ -107,6 +99,15 @@ import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.testutils.MetadataMergeWriteStatus; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.util.Time; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; @@ -178,6 +179,19 @@ public static List tableTypeAndEnableOperationArgs() { ); } + public static List tableOperationsTestArgs() { + return asList( + Arguments.of(COPY_ON_WRITE, true, true), + Arguments.of(COPY_ON_WRITE, true, false), + Arguments.of(COPY_ON_WRITE, false, true), + Arguments.of(COPY_ON_WRITE, false, false), + Arguments.of(MERGE_ON_READ, true, true), + Arguments.of(MERGE_ON_READ, true, false), + Arguments.of(MERGE_ON_READ, false, true), + Arguments.of(MERGE_ON_READ, false, false) + ); + } + /** * Metadata Table bootstrap scenarios. */ @@ -441,28 +455,34 @@ public void testOnlyValidPartitionsAdded(HoodieTableType tableType) throws Excep * Test various table operations sync to Metadata Table correctly. */ @ParameterizedTest - @MethodSource("tableTypeAndEnableOperationArgs") - public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception { - init(tableType, true, enableFullScan, false, false); - doWriteInsertAndUpsert(testTable); + @MethodSource("tableOperationsTestArgs") + public void testTableOperations(HoodieTableType tableType, boolean enableFullScan, boolean enableMetrics) throws Exception { + List commitTimeList = new ArrayList<>(); + commitTimeList.add(Long.parseLong(HoodieActiveTimeline.createNewInstantTime())); + for (int i = 0; i < 8; i++) { + long nextCommitTime = getNextCommitTime(commitTimeList.get(commitTimeList.size() - 1)); + commitTimeList.add(nextCommitTime); + } + init(tableType, true, enableFullScan, enableMetrics, false); + doWriteInsertAndUpsert(testTable, commitTimeList.get(0).toString(), commitTimeList.get(1).toString(), false); // trigger an upsert - doWriteOperationAndValidate(testTable, "0000003"); + doWriteOperationAndValidate(testTable, commitTimeList.get(2).toString()); // trigger compaction if (MERGE_ON_READ.equals(tableType)) { - doCompactionAndValidate(testTable, "0000004"); + doCompactionAndValidate(testTable, commitTimeList.get(3).toString()); } // trigger an upsert - doWriteOperation(testTable, "0000005"); + doWriteOperation(testTable, commitTimeList.get(4).toString()); // trigger clean - doCleanAndValidate(testTable, "0000006", singletonList("0000001")); + doCleanAndValidate(testTable, commitTimeList.get(5).toString(), singletonList(commitTimeList.get(0).toString())); // trigger few upserts and validate - doWriteOperation(testTable, "0000007"); - doWriteOperation(testTable, "0000008"); + doWriteOperation(testTable, commitTimeList.get(6).toString()); + doWriteOperation(testTable, commitTimeList.get(7).toString()); validateMetadata(testTable, emptyList(), true); }