diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java index 87afd56d83c06..f5a0fadc87fdd 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java @@ -21,32 +21,25 @@ import org.apache.hudi.avro.model.HoodieIndexCommitMetadata; import org.apache.hudi.avro.model.HoodieIndexPartitionInfo; -import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; -import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.testutils.providers.SparkProvider; -import org.apache.hadoop.fs.Path; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -75,46 +68,29 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkProvider { +public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implements SparkProvider { - private static transient SparkSession spark; - private static transient SQLContext sqlContext; - private static transient JavaSparkContext jsc; - private static transient HoodieSparkEngineContext context; - private static int colStatsFileGroupCount; + private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator(0L); + private static int colStatsFileGroupCount = HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue(); + private HoodieTableMetaClient metaClient; @BeforeEach public void init() throws IOException { - boolean initialized = spark != null; - if (!initialized) { - SparkConf sparkConf = conf(); - SparkRDDWriteClient.registerClasses(sparkConf); - SparkRDDReadClient.addHoodieSupport(sparkConf); - spark = SparkSession.builder().config(sparkConf).getOrCreate(); - sqlContext = spark.sqlContext(); - jsc = new JavaSparkContext(spark.sparkContext()); - context = new HoodieSparkEngineContext(jsc); - colStatsFileGroupCount = HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue(); - } - initPath(); - initMetaClient(); + this.metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE); } - protected void initMetaClient() throws IOException { - String rootPathStr = "file://" + tempDir.toAbsolutePath().toString(); - Path rootPath = new Path(rootPathStr); - rootPath.getFileSystem(jsc.hadoopConfiguration()).mkdirs(rootPath); - metaClient = HoodieTestUtils.init(rootPathStr, getTableType()); - basePath = metaClient.getBasePath(); + @AfterAll + public static void cleanup() { + DATA_GENERATOR.close(); } @Test public void testGetRequestedPartitionTypes() { HoodieIndexer.Config config = new HoodieIndexer.Config(); - config.basePath = basePath; + config.basePath = basePath(); config.tableName = "indexer_test"; config.indexTypes = "FILES,BLOOM_FILTERS,COLUMN_STATS"; - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); List partitionTypes = indexer.getRequestedPartitionTypes(config.indexTypes, Option.empty()); assertTrue(partitionTypes.contains(FILES)); assertTrue(partitionTypes.contains(BLOOM_FILTERS)); @@ -124,10 +100,10 @@ public void testGetRequestedPartitionTypes() { @Test public void testIsIndexBuiltForAllRequestedTypes() { HoodieIndexer.Config config = new HoodieIndexer.Config(); - config.basePath = basePath; + config.basePath = basePath(); config.tableName = "indexer_test"; config.indexTypes = "BLOOM_FILTERS,COLUMN_STATS"; - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); HoodieIndexCommitMetadata commitMetadata = HoodieIndexCommitMetadata.newBuilder() .setIndexPartitionInfos(Arrays.asList(new HoodieIndexPartitionInfo( 1, @@ -137,39 +113,37 @@ public void testIsIndexBuiltForAllRequestedTypes() { assertFalse(indexer.isIndexBuiltForAllRequestedTypes(commitMetadata.getIndexPartitionInfos())); config.indexTypes = "COLUMN_STATS"; - indexer = new HoodieIndexer(jsc, config); + indexer = new HoodieIndexer(jsc(), config); assertTrue(indexer.isIndexBuiltForAllRequestedTypes(commitMetadata.getIndexPartitionInfos())); } @Test public void testIndexerWithNotAllIndexesEnabled() { - initTestDataGenerator(); - tableName = "indexer_test"; + String tableName = "indexer_test"; // enable files and bloom_filters on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true); - initializeWriteClient(metadataConfigBuilder.build()); + initializeWriteClient(metadataConfigBuilder.build(), tableName); // validate table config assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); // build indexer config which has only column_stats enabled (files and bloom filter is already enabled) - indexMetadataPartitionsAndAssert(COLUMN_STATS, Arrays.asList(new MetadataPartitionType[] {FILES, BLOOM_FILTERS}), Collections.emptyList()); + indexMetadataPartitionsAndAssert(COLUMN_STATS, Arrays.asList(new MetadataPartitionType[] {FILES, BLOOM_FILTERS}), Collections.emptyList(), tableName); } @Test public void testIndexerWithFilesPartition() { - initTestDataGenerator(); - tableName = "indexer_test"; + String tableName = "indexer_test"; // enable files and bloom_filters on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true); - initializeWriteClient(metadataConfigBuilder.build()); + initializeWriteClient(metadataConfigBuilder.build(), tableName); // validate table config assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); // build indexer config which has only files enabled - indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS})); + indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS}), tableName); } private static Stream colStatsFileGroupCountParams() { @@ -185,20 +159,19 @@ private static Stream colStatsFileGroupCountParams() { @MethodSource("colStatsFileGroupCountParams") public void testColStatsFileGroupCount(int colStatsFileGroupCount) { TestHoodieIndexer.colStatsFileGroupCount = colStatsFileGroupCount; - initTestDataGenerator(); - tableName = "indexer_test"; + String tableName = "indexer_test"; // enable files and bloom_filters on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true); - initializeWriteClient(metadataConfigBuilder.build()); + initializeWriteClient(metadataConfigBuilder.build(), tableName); // validate table config assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); // build indexer config which has only files enabled - indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS})); + indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS}), tableName); // build indexer config which has only col stats enabled - indexMetadataPartitionsAndAssert(COLUMN_STATS, Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[] {BLOOM_FILTERS})); + indexMetadataPartitionsAndAssert(COLUMN_STATS, Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[] {BLOOM_FILTERS}), tableName); HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getMetaPath() + "/metadata").build(); List partitionFileSlices = @@ -212,24 +185,23 @@ public void testColStatsFileGroupCount(int colStatsFileGroupCount) { */ @Test public void testIndexerForExceptionWithNonFilesPartition() { - initTestDataGenerator(); - tableName = "indexer_test"; + String tableName = "indexer_test"; // enable files and bloom_filters on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false); - initializeWriteClient(metadataConfigBuilder.build()); + initializeWriteClient(metadataConfigBuilder.build(), tableName); // validate table config assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); // build indexer config which has only column stats enabled. expected to throw exception. HoodieIndexer.Config config = new HoodieIndexer.Config(); String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath(); - config.basePath = basePath; + config.basePath = basePath(); config.tableName = tableName; config.indexTypes = COLUMN_STATS.name(); config.runningMode = SCHEDULE_AND_EXECUTE; config.propsFilePath = propsPath; // start the indexer and validate index building fails - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); assertEquals(-1, indexer.start(0)); // validate table config @@ -238,13 +210,13 @@ public void testIndexerForExceptionWithNonFilesPartition() { assertFalse(metaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())); assertFalse(metaClient.getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); // validate metadata partitions actually exist - assertFalse(metadataPartitionExists(basePath, context, FILES)); + assertFalse(metadataPartitionExists(basePath(), context(), FILES)); // trigger FILES partition and indexing should succeed. - indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS})); + indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS}), tableName); // build indexer config which has only col stats enabled - indexMetadataPartitionsAndAssert(COLUMN_STATS, Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[] {BLOOM_FILTERS})); + indexMetadataPartitionsAndAssert(COLUMN_STATS, Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[] {BLOOM_FILTERS}), tableName); HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getMetaPath() + "/metadata").build(); List partitionFileSlices = @@ -252,23 +224,24 @@ public void testIndexerForExceptionWithNonFilesPartition() { assertEquals(partitionFileSlices.size(), HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue()); } - private void initializeWriteClient(HoodieMetadataConfig metadataConfig) { - HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName); + private void initializeWriteClient(HoodieMetadataConfig metadataConfig, String tableName) { + HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath(), tableName); HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfig).build(); // do one upsert with synchronous metadata update - SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig); String instant = "0001"; writeClient.startCommitWithTime(instant); - List records = dataGen.generateInserts(instant, 100); - JavaRDD result = writeClient.upsert(jsc.parallelize(records, 1), instant); + List records = DATA_GENERATOR.generateInserts(instant, 100); + JavaRDD result = writeClient.upsert(jsc().parallelize(records, 1), instant); List statuses = result.collect(); assertNoWriteErrors(statuses); } - private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTypeToIndex, List alreadyCompletedPartitions, List nonExistantPartitions) { + private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTypeToIndex, List alreadyCompletedPartitions, List nonExistantPartitions, + String tableName) { HoodieIndexer.Config config = new HoodieIndexer.Config(); String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath(); - config.basePath = basePath; + config.basePath = basePath(); config.tableName = tableName; config.indexTypes = partitionTypeToIndex.name(); config.runningMode = SCHEDULE_AND_EXECUTE; @@ -277,7 +250,7 @@ private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTyp config.configs.add(HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.key() + "=" + colStatsFileGroupCount); } // start the indexer and validate files index is completely built out - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); assertEquals(0, indexer.start(0)); // validate table config @@ -288,140 +261,138 @@ private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTyp nonExistantPartitions.forEach(entry -> assertFalse(completedPartitions.contains(entry.getPartitionPath()))); // validate metadata partitions actually exist - assertTrue(metadataPartitionExists(basePath, context, partitionTypeToIndex)); - alreadyCompletedPartitions.forEach(entry -> assertTrue(metadataPartitionExists(basePath, context, entry))); + assertTrue(metadataPartitionExists(basePath(), context(), partitionTypeToIndex)); + alreadyCompletedPartitions.forEach(entry -> assertTrue(metadataPartitionExists(basePath(), context(), entry))); } @Test public void testIndexerDropPartitionDeletesInstantFromTimeline() { - initTestDataGenerator(); String tableName = "indexer_test"; - HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName); + HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath(), tableName); // enable files on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true); HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build(); // do one upsert with synchronous metadata update - SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig); String instant = "0001"; writeClient.startCommitWithTime(instant); - List records = dataGen.generateInserts(instant, 100); - JavaRDD result = writeClient.upsert(jsc.parallelize(records, 1), instant); + List records = DATA_GENERATOR.generateInserts(instant, 100); + JavaRDD result = writeClient.upsert(jsc().parallelize(records, 1), instant); List statuses = result.collect(); assertNoWriteErrors(statuses); // validate partitions built successfully assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, FILES)); + assertTrue(metadataPartitionExists(basePath(), context(), FILES)); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS)); // build indexer config which has only column_stats enabled (files is enabled by default) HoodieIndexer.Config config = new HoodieIndexer.Config(); String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath(); - config.basePath = basePath; + config.basePath = basePath(); config.tableName = tableName; config.indexTypes = COLUMN_STATS.name(); config.runningMode = SCHEDULE; config.propsFilePath = propsPath; // schedule indexing and validate column_stats index is also initialized - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); assertEquals(0, indexer.start(0)); Option indexInstantInTimeline = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); assertTrue(indexInstantInTimeline.isPresent()); assertEquals(REQUESTED, indexInstantInTimeline.get().getState()); - assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS)); + assertTrue(metadataPartitionExists(basePath(), context(), COLUMN_STATS)); // drop column_stats and validate indexing.requested is also removed from the timeline config.runningMode = DROP_INDEX; - indexer = new HoodieIndexer(jsc, config); + indexer = new HoodieIndexer(jsc(), config); assertEquals(0, indexer.start(0)); indexInstantInTimeline = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); assertFalse(indexInstantInTimeline.isPresent()); - assertFalse(metadataPartitionExists(basePath, context, COLUMN_STATS)); + assertFalse(metadataPartitionExists(basePath(), context(), COLUMN_STATS)); // check other partitions are intact assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, FILES)); + assertTrue(metadataPartitionExists(basePath(), context(), FILES)); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS)); } @Test public void testTwoIndexersOneCreateOneDropPartition() { - initTestDataGenerator(); String tableName = "indexer_test"; - HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName); + HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath(), tableName); // enable files on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false); HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build(); // do one upsert with synchronous metadata update - SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig); String instant = "0001"; writeClient.startCommitWithTime(instant); - List records = dataGen.generateInserts(instant, 100); - JavaRDD result = writeClient.upsert(jsc.parallelize(records, 1), instant); + List records = DATA_GENERATOR.generateInserts(instant, 100); + JavaRDD result = writeClient.upsert(jsc().parallelize(records, 1), instant); List statuses = result.collect(); assertNoWriteErrors(statuses); // validate files partition built successfully assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, FILES)); + assertTrue(metadataPartitionExists(basePath(), context(), FILES)); // build indexer config which has only bloom_filters enabled - HoodieIndexer.Config config = getHoodieIndexConfig(BLOOM_FILTERS.name(), SCHEDULE_AND_EXECUTE, "delta-streamer-config/indexer-only-bloom.properties"); + HoodieIndexer.Config config = getHoodieIndexConfig(BLOOM_FILTERS.name(), SCHEDULE_AND_EXECUTE, "delta-streamer-config/indexer-only-bloom.properties", tableName); // start the indexer and validate bloom_filters index is also complete - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); assertEquals(0, indexer.start(0)); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS)); // completed index timeline for later validation Option bloomIndexInstant = metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant(); assertTrue(bloomIndexInstant.isPresent()); // build indexer config which has only column_stats enabled - config = getHoodieIndexConfig(COLUMN_STATS.name(), SCHEDULE, "delta-streamer-config/indexer.properties"); + config = getHoodieIndexConfig(COLUMN_STATS.name(), SCHEDULE, "delta-streamer-config/indexer.properties", tableName); // schedule indexing and validate column_stats index is also initialized // and indexing.requested instant is present - indexer = new HoodieIndexer(jsc, config); + indexer = new HoodieIndexer(jsc(), config); assertEquals(0, indexer.start(0)); Option columnStatsIndexInstant = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); assertTrue(columnStatsIndexInstant.isPresent()); assertEquals(REQUESTED, columnStatsIndexInstant.get().getState()); - assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS)); + assertTrue(metadataPartitionExists(basePath(), context(), COLUMN_STATS)); // drop column_stats and validate indexing.requested is also removed from the timeline // and completed indexing instant corresponding to bloom_filters index is still present - dropIndexAndAssert(COLUMN_STATS, "delta-streamer-config/indexer.properties", Option.empty()); + dropIndexAndAssert(COLUMN_STATS, "delta-streamer-config/indexer.properties", Option.empty(), tableName); // check other partitions are intact assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, FILES)); + assertTrue(metadataPartitionExists(basePath(), context(), FILES)); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS)); // drop bloom filter partition. timeline files should not be deleted since the index building is complete. - dropIndexAndAssert(BLOOM_FILTERS, "delta-streamer-config/indexer-only-bloom.properties", bloomIndexInstant); + dropIndexAndAssert(BLOOM_FILTERS, "delta-streamer-config/indexer-only-bloom.properties", bloomIndexInstant, tableName); } - private void dropIndexAndAssert(MetadataPartitionType indexType, String resourceFilePath, Option completedIndexInstant) { - HoodieIndexer.Config config = getHoodieIndexConfig(indexType.name(), DROP_INDEX, resourceFilePath); - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + private void dropIndexAndAssert(MetadataPartitionType indexType, String resourceFilePath, Option completedIndexInstant, String tableName) { + HoodieIndexer.Config config = getHoodieIndexConfig(indexType.name(), DROP_INDEX, resourceFilePath, tableName); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); assertEquals(0, indexer.start(0)); Option pendingFlights = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); assertFalse(pendingFlights.isPresent()); - assertFalse(metadataPartitionExists(basePath, context, indexType)); + assertFalse(metadataPartitionExists(basePath(), context(), indexType)); if (completedIndexInstant.isPresent()) { assertEquals(completedIndexInstant, metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant()); } } - private HoodieIndexer.Config getHoodieIndexConfig(String indexType, String runMode, String resourceFilePath) { + private HoodieIndexer.Config getHoodieIndexConfig(String indexType, String runMode, String resourceFilePath, String tableName) { HoodieIndexer.Config config = new HoodieIndexer.Config(); String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource(resourceFilePath)).getPath(); - config.basePath = basePath; + config.basePath = basePath(); config.tableName = tableName; config.indexTypes = indexType; config.runningMode = runMode; @@ -446,24 +417,4 @@ private static HoodieMetadataConfig.Builder getMetadataConfigBuilder(boolean ena .enable(enable) .withAsyncIndex(asyncIndex); } - - @Override - public HoodieEngineContext context() { - return context; - } - - @Override - public SparkSession spark() { - return spark; - } - - @Override - public SQLContext sqlContext() { - return sqlContext; - } - - @Override - public JavaSparkContext jsc() { - return jsc; - } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java index 018dd0f67d777..16d190ac45d15 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java @@ -21,7 +21,6 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.utilities.sources.AvroKafkaSource; import org.apache.hudi.utilities.sources.helpers.SchemaTestProvider; -import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; @@ -45,7 +44,7 @@ /** * Tests {@link KafkaAvroSchemaDeserializer}. */ -public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase { +public class TestKafkaAvroSchemaDeserializer { private final SchemaRegistryClient schemaRegistry; private final KafkaAvroSerializer avroSerializer; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java index 2106afe11f755..1ac3f91fa8c7e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java @@ -29,8 +29,6 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -63,14 +61,8 @@ public static void beforeAll() throws Exception { UtilitiesTestBase.initTestServices(false, false); } - @AfterAll - public static void afterAll() { - UtilitiesTestBase.cleanupClass(); - } - @BeforeEach public void beforeEach() throws Exception { - super.setup(); schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc); MockitoAnnotations.initMocks(this); @@ -79,11 +71,6 @@ public void beforeEach() throws Exception { props.put(PUBSUB_SUBSCRIPTION_ID, "dummy-subscription"); } - @AfterEach - public void afterEach() throws Exception { - super.teardown(); - } - @Test public void shouldReturnEmptyOnNoMessages() { when(pubsubMessagesFetcher.fetchMessages()).thenReturn(Collections.emptyList());