diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 175ba3d66fd1f..186450c3157a4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -45,7 +45,7 @@
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
-import org.apache.hudi.utilities.schema.ChainedSchemaPostProcessor;
+import org.apache.hudi.utilities.schema.postprocessor.ChainedSchemaPostProcessor;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ChainedSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/ChainedSchemaPostProcessor.java
similarity index 93%
rename from hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ChainedSchemaPostProcessor.java
rename to hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/ChainedSchemaPostProcessor.java
index 5a88d9fd92f3d..0295e80bed8be 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ChainedSchemaPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/ChainedSchemaPostProcessor.java
@@ -16,11 +16,12 @@
* limitations under the License.
*/
-package org.apache.hudi.utilities.schema;
+package org.apache.hudi.utilities.schema.postprocessor;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.Schema;
+import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.List;
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DeleteSupportSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
similarity index 95%
rename from hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DeleteSupportSchemaPostProcessor.java
rename to hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
index e58dc4e20611b..dd1084d2e1eab 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DeleteSupportSchemaPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
@@ -16,12 +16,13 @@
* limitations under the License.
*/
-package org.apache.hudi.utilities.schema;
+package org.apache.hudi.utilities.schema.postprocessor;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.avro.Schema;
+import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DropColumnSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
similarity index 96%
rename from hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DropColumnSchemaPostProcessor.java
rename to hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
index 4a41b75589a3c..05b446f845176 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DropColumnSchemaPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
@@ -16,13 +16,14 @@
* limitations under the License.
*/
-package org.apache.hudi.utilities.schema;
+package org.apache.hudi.utilities.schema.postprocessor;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.avro.Schema;
+import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java
new file mode 100644
index 0000000000000..f7d31b5cf47ad
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema.postprocessor.add;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
+import org.apache.hudi.utilities.schema.SchemaPostProcessor;
+
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * A {@link SchemaPostProcessor} used to add a new column of primitive types to given schema. Only supports adding one
+ * column at a time.
+ *
+ * The new column will be appended to the end.
+ *
+ * TODO support complex types.
+ */
+public class AddPrimitiveColumnSchemaPostProcessor extends SchemaPostProcessor {
+
+ public AddPrimitiveColumnSchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) {
+ super(props, jssc);
+ }
+
+ @Override
+ public Schema processSchema(Schema schema) {
+ String newColumnName = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
+
+ if (schema.getField(newColumnName) != null) {
+ throw new HoodieSchemaPostProcessException(String.format("Column %s already exist!", newColumnName));
+ }
+
+ List sourceFields = schema.getFields();
+ List targetFields = new ArrayList<>(sourceFields.size() + 1);
+
+
+ for (Schema.Field sourceField : sourceFields) {
+ targetFields.add(new Schema.Field(sourceField.name(), sourceField.schema(), sourceField.doc(), sourceField.defaultVal()));
+ }
+
+ // add new column to the end
+ targetFields.add(buildNewColumn());
+
+ return Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false, targetFields);
+ }
+
+ private Schema.Field buildNewColumn() {
+
+ String columnName = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
+ String type = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key()).toUpperCase(Locale.ROOT);
+ String doc = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), null);
+ Object defaultValue = this.config.getOrDefault(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP.key(),
+ null);
+ boolean nullable = this.config.getBoolean(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.key(),
+ BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.defaultValue());
+
+ ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(columnName));
+ ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(type));
+ ValidationUtils.checkArgument(!Schema.Type.NULL.getName().equals(type));
+
+ Schema newSchema = createSchema(type, nullable);
+
+ return new Schema.Field(columnName, newSchema, doc, defaultValue);
+ }
+
+ private Schema createSchema(String type, boolean nullable) {
+ Schema schema = Schema.create(Schema.Type.valueOf(type));
+ if (nullable) {
+ schema = Schema.createUnion(Schema.create(Schema.Type.NULL), schema);
+ }
+ return schema;
+ }
+
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/BaseSchemaPostProcessorConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/BaseSchemaPostProcessorConfig.java
new file mode 100644
index 0000000000000..d1528c362c84d
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/BaseSchemaPostProcessorConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema.postprocessor.add;
+
+import org.apache.hudi.common.config.ConfigProperty;
+
+/**
+ * Base configs to describe a primitive type column.
+ */
+public class BaseSchemaPostProcessorConfig {
+
+ public static final ConfigProperty SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.name")
+ .noDefaultValue()
+ .withDocumentation("New column's name");
+
+ public static final ConfigProperty SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.type")
+ .noDefaultValue()
+ .withDocumentation("New column's type");
+
+ public static final ConfigProperty SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.nullable")
+ .defaultValue(true)
+ .withDocumentation("New column's nullable");
+
+ public static final ConfigProperty SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.default")
+ .noDefaultValue()
+ .withDocumentation("New column's default value");
+
+ public static final ConfigProperty SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.doc")
+ .noDefaultValue()
+ .withDocumentation("Docs about new column");
+
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
index d228d87446df7..52413ce938456 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
@@ -20,12 +20,14 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
-import org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor;
-import org.apache.hudi.utilities.schema.DropColumnSchemaPostProcessor;
+import org.apache.hudi.utilities.schema.postprocessor.add.AddPrimitiveColumnSchemaPostProcessor;
+import org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor;
+import org.apache.hudi.utilities.schema.postprocessor.DropColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
+import org.apache.hudi.utilities.schema.postprocessor.add.BaseSchemaPostProcessorConfig;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.transform.FlatteningTransformer;
@@ -33,12 +35,17 @@
import org.apache.avro.Schema.Type;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -55,13 +62,18 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
+ "{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\","
+ "\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"}]}";
+ private static Stream configParams() {
+ String[] types = {"bytes", "string", "int", "long", "float", "double", "boolean"};
+ return Stream.of(types).map(Arguments::of);
+ }
+
@Test
public void testPostProcessor() throws IOException {
properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, DummySchemaPostProcessor.class.getName());
SchemaProvider provider =
UtilHelpers.wrapSchemaProviderWithPostProcessor(
- UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
- properties, jsc,null);
+ UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
+ properties, jsc, null);
Schema schema = provider.getSourceSchema();
assertEquals(schema.getType(), Type.RECORD);
@@ -76,9 +88,9 @@ public void testSparkAvro() throws IOException {
transformerClassNames.add(FlatteningTransformer.class.getName());
SchemaProvider provider =
- UtilHelpers.wrapSchemaProviderWithPostProcessor(
- UtilHelpers.createSchemaProvider(SparkAvroSchemaProvider.class.getName(), properties, jsc),
- properties, jsc, transformerClassNames);
+ UtilHelpers.wrapSchemaProviderWithPostProcessor(
+ UtilHelpers.createSchemaProvider(SparkAvroSchemaProvider.class.getName(), properties, jsc),
+ properties, jsc, transformerClassNames);
Schema schema = provider.getSourceSchema();
assertEquals(schema.getType(), Type.RECORD);
@@ -99,7 +111,7 @@ public void testDeleteSupport() {
public void testChainedSchemaPostProcessor() {
// DeleteSupportSchemaPostProcessor first, DummySchemaPostProcessor second
properties.put(Config.SCHEMA_POST_PROCESSOR_PROP,
- "org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor,org.apache.hudi.utilities.DummySchemaPostProcessor");
+ "org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor,org.apache.hudi.utilities.DummySchemaPostProcessor");
SchemaPostProcessor processor = UtilHelpers.createSchemaPostProcessor(properties.getString(Config.SCHEMA_POST_PROCESSOR_PROP), properties, jsc);
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
@@ -111,7 +123,7 @@ public void testChainedSchemaPostProcessor() {
// DummySchemaPostProcessor first, DeleteSupportSchemaPostProcessor second
properties.put(Config.SCHEMA_POST_PROCESSOR_PROP,
- "org.apache.hudi.utilities.DummySchemaPostProcessor,org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor");
+ "org.apache.hudi.utilities.DummySchemaPostProcessor,org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor");
processor = UtilHelpers.createSchemaPostProcessor(properties.getString(Config.SCHEMA_POST_PROCESSOR_PROP), properties, jsc);
schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
@@ -144,6 +156,32 @@ public void testDeleteColumnThrows() {
Assertions.assertThrows(HoodieSchemaPostProcessException.class, () -> processor.processSchema(schema));
}
+ @ParameterizedTest
+ @MethodSource("configParams")
+ public void testAddPrimitiveTypeColumn(String type) {
+ properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "primitive_column");
+ properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), type);
+ properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "primitive column test");
+
+ AddPrimitiveColumnSchemaPostProcessor processor = new AddPrimitiveColumnSchemaPostProcessor(properties, null);
+ Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
+ Schema targetSchema = processor.processSchema(schema);
+
+ Schema.Field newColumn = targetSchema.getField("primitive_column");
+
+ assertNotNull(newColumn);
+ assertEquals("primitive column test", newColumn.doc());
+ // nullable by default, so new column is union type
+ assertNotEquals(type, newColumn.schema().getType().getName());
+
+ // test not nullable
+ properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.key(), false);
+ targetSchema = processor.processSchema(schema);
+ newColumn = targetSchema.getField("primitive_column");
+ assertEquals(type, newColumn.schema().getType().getName());
+
+ }
+
@Test
public void testSparkAvroSchema() throws IOException {
SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, null);