Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2142,6 +2142,28 @@ public void testFetchingCheckpointFromPreviousCommits() throws IOException {
.getCommitsTimeline()).get().getMetadata(CHECKPOINT_KEY), "def");
}

@Test
public void testDropPartitionColumns() throws Exception {
String tableBasePath = dfsBasePath + "/test_drop_partition_columns" + testNum++;
// ingest data with dropping partition columns enabled
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.configs.add(String.format("%s=%s", HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), "true"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
// assert ingest successful
TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs);

TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(dfs.getConf()).build());
// get schema from data file written in the latest commit
Schema tableSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
assertNotNull(tableSchema);

List<String> tableFields = tableSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
// now assert that the partition column is not in the target schema
assertFalse(tableFields.contains("partition_path"));
}

class TestDeltaSync extends DeltaSync {

public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props,
Expand Down