Skip to content

Commit 6cfae1a

Browse files
committed
Temp commit
1 parent 3adb571 commit 6cfae1a

6 files changed

Lines changed: 34 additions & 8 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,15 @@ private Metrics(HoodieWriteConfig metricConfig) {
5353
}
5454
reporter.start();
5555

56-
Runtime.getRuntime().addShutdownHook(new Thread(this::reportAndCloseReporter));
56+
Runtime.getRuntime().addShutdownHook(new Thread(Metrics::shutdown));
5757
}
5858

5959
private void reportAndCloseReporter() {
6060
try {
6161
registerHoodieCommonMetrics();
6262
reporter.report();
6363
if (getReporter() != null) {
64+
LOG.info("Closing metrics reporter...");
6465
getReporter().close();
6566
}
6667
} catch (Exception e) {
@@ -139,4 +140,8 @@ public MetricRegistry getRegistry() {
139140
public Closeable getReporter() {
140141
return reporter.getReporter();
141142
}
143+
144+
public static boolean isInitialized() {
145+
return initialized;
146+
}
142147
}

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ class DefaultSource extends RelationProvider
181181
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
182182
} else {
183183
HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
184+
HoodieSparkSqlWriter.cleanup()
184185
}
185186
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
186187
}

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
4444
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper}
4545
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
4646
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
47+
import org.apache.hudi.metrics.Metrics
4748
import org.apache.hudi.sync.common.HoodieSyncConfig
4849
import org.apache.hudi.sync.common.util.SyncUtilHelpers
4950
import org.apache.hudi.table.BulkInsertPartitioner
@@ -593,6 +594,10 @@ object HoodieSparkSqlWriter {
593594
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
594595
}
595596

597+
def cleanup() : Unit = {
598+
Metrics.shutdown()
599+
}
600+
596601
private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
597602
operation: WriteOperationType, fs: FileSystem): Unit = {
598603
if (mode == SaveMode.Append && tableExists) {

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.hudi.config.HoodieWriteConfig
3131
import org.apache.hudi.exception.{HoodieException, HoodieUpsertException}
3232
import org.apache.hudi.keygen._
3333
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
34+
import org.apache.hudi.metrics.Metrics
3435
import org.apache.hudi.testutils.HoodieClientTestBase
3536
import org.apache.hudi.util.JFunction
3637
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
@@ -738,6 +739,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
738739
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime1)
739740
.load(basePath)
740741
assertEquals(N + 1, hoodieIncViewDF1.count())
742+
assertEquals(false, Metrics.isInitialized)
741743
}
742744

743745
@Test def testSchemaEvolution(): Unit = {

hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.hudi.exception.HoodieIOException;
5454
import org.apache.hudi.exception.HoodieUpsertException;
5555
import org.apache.hudi.hive.HiveSyncTool;
56+
import org.apache.hudi.metrics.Metrics;
5657
import org.apache.hudi.utilities.HiveIncrementalPuller;
5758
import org.apache.hudi.utilities.IdentitySplitter;
5859
import org.apache.hudi.utilities.UtilHelpers;
@@ -208,6 +209,7 @@ public void sync() throws Exception {
208209
throw ex;
209210
} finally {
210211
deltaSyncService.ifPresent(DeltaSyncService::close);
212+
Metrics.shutdown();
211213
LOG.info("Shut down delta streamer");
212214
}
213215
}

hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.hudi.hive.HiveSyncConfig;
5757
import org.apache.hudi.hive.HoodieHiveSyncClient;
5858
import org.apache.hudi.keygen.SimpleKeyGenerator;
59+
import org.apache.hudi.metrics.Metrics;
5960
import org.apache.hudi.utilities.DummySchemaProvider;
6061
import org.apache.hudi.utilities.HoodieClusteringJob;
6162
import org.apache.hudi.utilities.HoodieIndexer;
@@ -739,30 +740,36 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema,
739740

740741
@Test
741742
public void testUpsertsCOWContinuousMode() throws Exception {
742-
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
743+
testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow", false, true);
744+
}
745+
746+
@Test
747+
public void testUpsertsCOW_ContinuousModeDisabled() throws Exception {
748+
testUpserts(HoodieTableType.COPY_ON_WRITE, "non_continuous_cow", false, false);
743749
}
744750

745751
@Test
746752
public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception {
747-
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true);
753+
testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow_shutdown_gracefully", true, true);
748754
}
749755

750756
@Test
751757
public void testUpsertsMORContinuousMode() throws Exception {
752-
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
758+
testUpserts(HoodieTableType.MERGE_ON_READ, "continuous_mor", false, true);
753759
}
754760

755-
private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
756-
testUpsertsContinuousMode(tableType, tempDir, false);
761+
@Test
762+
public void testUpsertsMOR_ContinuousModeDisabled() throws Exception {
763+
testUpserts(HoodieTableType.MERGE_ON_READ, "non_continuous_mor", false, false);
757764
}
758765

759-
private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully) throws Exception {
766+
private void testUpserts(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully, boolean continuousMode) throws Exception {
760767
String tableBasePath = dfsBasePath + "/" + tempDir;
761768
// Keep it higher than batch-size to test continuous mode
762769
int totalRecords = 3000;
763770
// Initial bulk insert
764771
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
765-
cfg.continuousMode = true;
772+
cfg.continuousMode = continuousMode;
766773
if (testShutdownGracefully) {
767774
cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName();
768775
}
@@ -782,6 +789,10 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir
782789
if (testShutdownGracefully) {
783790
TestDataSource.returnEmptyBatch = true;
784791
}
792+
793+
if (!cfg.continuousMode) {
794+
assertFalse(Metrics.isInitialized());
795+
}
785796
return true;
786797
});
787798
}

0 commit comments

Comments
 (0)