Skip to content

Commit a9ab150

Browse files
Merge branch 'master' into kumudt/aws-glue-sync-fixes
2 parents f06a546 + b18c323 commit a9ab150

File tree

200 files changed

+9006
-3268
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

200 files changed

+9006
-3268
lines changed

docker/demo/sparksql-incremental.commands

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.hudi.DataSourceWriteOptions;
2121
import org.apache.spark.sql.SaveMode;
2222
import org.apache.hudi.config.HoodieWriteConfig;
2323
import org.apache.hudi.HoodieDataSourceHelpers;
24-
import org.apache.hudi.hive.HiveSyncConfig;
24+
import org.apache.hudi.hive.HiveSyncConfigHolder;
2525
import org.apache.hudi.sync.common.HoodieSyncConfig;
2626
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
2727
import org.apache.hadoop.fs.FileSystem;
@@ -47,10 +47,10 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
4747
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
4848
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
4949
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
50-
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
51-
option(HiveSyncConfig.HIVE_USER.key(), "hive").
52-
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
53-
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
50+
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
51+
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
52+
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
53+
option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
5454
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
5555
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
5656
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
@@ -79,10 +79,10 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
7979
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
8080
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
8181
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
82-
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
83-
option(HiveSyncConfig.HIVE_USER.key(), "hive").
84-
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
85-
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
82+
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
83+
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
84+
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
85+
option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
8686
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
8787
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
8888
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").

hudi-aws/pom.xml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,16 @@
4848

4949
<!-- Logging -->
5050
<dependency>
51-
<groupId>log4j</groupId>
52-
<artifactId>log4j</artifactId>
51+
<groupId>org.apache.logging.log4j</groupId>
52+
<artifactId>log4j-api</artifactId>
53+
</dependency>
54+
<dependency>
55+
<groupId>org.apache.logging.log4j</groupId>
56+
<artifactId>log4j-core</artifactId>
57+
</dependency>
58+
<dependency>
59+
<groupId>org.apache.logging.log4j</groupId>
60+
<artifactId>log4j-1.2-api</artifactId>
5361
</dependency>
5462

5563
<!-- Hadoop -->

hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import org.apache.hudi.common.fs.FSUtils;
2222
import org.apache.hudi.common.util.CollectionUtils;
2323
import org.apache.hudi.common.util.Option;
24-
import org.apache.hudi.hive.AbstractHiveSyncHoodieClient;
2524
import org.apache.hudi.hive.HiveSyncConfig;
25+
import org.apache.hudi.sync.common.HoodieSyncClient;
2626
import org.apache.hudi.sync.common.model.Partition;
2727

2828
import com.amazonaws.services.glue.AWSGlue;
@@ -50,10 +50,6 @@
5050
import com.amazonaws.services.glue.model.Table;
5151
import com.amazonaws.services.glue.model.TableInput;
5252
import com.amazonaws.services.glue.model.UpdateTableRequest;
53-
import org.apache.avro.Schema;
54-
import org.apache.hadoop.conf.Configuration;
55-
import org.apache.hadoop.fs.FileSystem;
56-
import org.apache.hadoop.hive.metastore.api.FieldSchema;
5753
import org.apache.log4j.LogManager;
5854
import org.apache.log4j.Logger;
5955
import org.apache.parquet.schema.MessageType;
@@ -69,8 +65,12 @@
6965

7066
import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
7167
import static org.apache.hudi.common.util.MapUtils.isNullOrEmpty;
68+
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
69+
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
7270
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
7371
import static org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema;
72+
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
73+
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
7474
import static org.apache.hudi.sync.common.util.TableUtils.tableId;
7575

7676
/**
@@ -79,18 +79,18 @@
7979
*
8080
* @Experimental
8181
*/
82-
public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
82+
public class AWSGlueCatalogSyncClient extends HoodieSyncClient {
8383

8484
private static final Logger LOG = LogManager.getLogger(AWSGlueCatalogSyncClient.class);
8585
private static final int MAX_PARTITIONS_PER_REQUEST = 100;
8686
private static final long BATCH_REQUEST_SLEEP_MILLIS = 1000L;
8787
private final AWSGlue awsGlue;
8888
private final String databaseName;
8989

90-
public AWSGlueCatalogSyncClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
91-
super(syncConfig, hadoopConf, fs);
90+
public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
91+
super(config);
9292
this.awsGlue = AWSGlueClientBuilder.standard().build();
93-
this.databaseName = syncConfig.databaseName;
93+
this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
9494
}
9595

9696
@Override
@@ -126,7 +126,7 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
126126
StorageDescriptor sd = table.getStorageDescriptor();
127127
List<PartitionInput> partitionInputs = partitionsToAdd.stream().map(partition -> {
128128
StorageDescriptor partitionSd = sd.clone();
129-
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
129+
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
130130
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
131131
partitionSd.setLocation(fullPartitionPath);
132132
return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
@@ -160,7 +160,7 @@ public void updatePartitionsToTable(String tableName, List<String> changedPartit
160160
StorageDescriptor sd = table.getStorageDescriptor();
161161
List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> {
162162
StorageDescriptor partitionSd = sd.clone();
163-
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
163+
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
164164
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
165165
sd.setLocation(fullPartitionPath);
166166
PartitionInput partitionInput = new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
@@ -204,12 +204,12 @@ public void updateTableProperties(String tableName, Map<String, String> tablePro
204204
}
205205

206206
@Override
207-
public void updateTableDefinition(String tableName, MessageType newSchema) {
207+
public void updateTableSchema(String tableName, MessageType newSchema) {
208208
// ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta
209-
boolean cascade = syncConfig.partitionFields.size() > 0;
209+
boolean cascade = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0;
210210
try {
211211
Table table = getTable(awsGlue, databaseName, tableName);
212-
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.supportTimestamp, false);
212+
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
213213
List<Column> newColumns = getColumnsFromSchema(newSchemaMap);
214214
StorageDescriptor sd = table.getStorageDescriptor();
215215
sd.setColumns(newColumns);
@@ -234,21 +234,6 @@ public void updateTableDefinition(String tableName, MessageType newSchema) {
234234
}
235235
}
236236

237-
@Override
238-
public List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName) {
239-
throw new UnsupportedOperationException("Not supported: `getTableCommentUsingMetastoreClient`");
240-
}
241-
242-
@Override
243-
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, List<Schema.Field> newSchema) {
244-
throw new UnsupportedOperationException("Not supported: `updateTableComments`");
245-
}
246-
247-
@Override
248-
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, Map<String, String> newComments) {
249-
throw new UnsupportedOperationException("Not supported: `updateTableComments`");
250-
}
251-
252237
@Override
253238
public void createTable(String tableName,
254239
MessageType storageSchema,
@@ -262,18 +247,18 @@ public void createTable(String tableName,
262247
}
263248
CreateTableRequest request = new CreateTableRequest();
264249
Map<String, String> params = new HashMap<>();
265-
if (!syncConfig.createManagedTable) {
250+
if (!config.getBoolean(HIVE_CREATE_MANAGED_TABLE)) {
266251
params.put("EXTERNAL", "TRUE");
267252
}
268253
params.putAll(tableProperties);
269254

270255
try {
271-
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false);
256+
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
272257

273258
List<Column> schemaWithoutPartitionKeys = getColumnsFromSchema(mapSchema);
274259

275260
// now create the schema partition
276-
List<Column> schemaPartitionKeys = syncConfig.partitionFields.stream().map(partitionKey -> {
261+
List<Column> schemaPartitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).stream().map(partitionKey -> {
277262
String keyType = getPartitionKeyType(mapSchema, partitionKey);
278263
return new Column().withName(partitionKey).withType(keyType.toLowerCase()).withComment("");
279264
}).collect(Collectors.toList());
@@ -282,7 +267,7 @@ public void createTable(String tableName,
282267
serdeProperties.put("serialization.format", "1");
283268
storageDescriptor
284269
.withSerdeInfo(new SerDeInfo().withSerializationLibrary(serdeClass).withParameters(serdeProperties))
285-
.withLocation(s3aToS3(syncConfig.basePath))
270+
.withLocation(s3aToS3(getBasePath()))
286271
.withInputFormat(inputFormatClass)
287272
.withOutputFormat(outputFormatClass)
288273
.withColumns(schemaWithoutPartitionKeys);
@@ -309,7 +294,7 @@ public void createTable(String tableName,
309294
}
310295

311296
@Override
312-
public Map<String, String> getTableSchema(String tableName) {
297+
public Map<String, String> getMetastoreSchema(String tableName) {
313298
try {
314299
// GlueMetastoreClient returns partition keys separate from Columns, hence get both and merge to
315300
// get the Schema of the table.
@@ -329,11 +314,6 @@ public Map<String, String> getTableSchema(String tableName) {
329314
}
330315
}
331316

332-
@Override
333-
public boolean doesTableExist(String tableName) {
334-
return tableExists(tableName);
335-
}
336-
337317
@Override
338318
public boolean tableExists(String tableName) {
339319
GetTableRequest request = new GetTableRequest()
@@ -401,11 +381,11 @@ public void close() {
401381

402382
@Override
403383
public void updateLastCommitTimeSynced(String tableName) {
404-
if (!activeTimeline.lastInstant().isPresent()) {
384+
if (!getActiveTimeline().lastInstant().isPresent()) {
405385
LOG.warn("No commit in active timeline.");
406386
return;
407387
}
408-
final String lastCommitTimestamp = activeTimeline.lastInstant().get().getTimestamp();
388+
final String lastCommitTimestamp = getActiveTimeline().lastInstant().get().getTimestamp();
409389
try {
410390
updateTableParameters(awsGlue, databaseName, tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTimestamp), false);
411391
} catch (Exception e) {
@@ -432,7 +412,7 @@ private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
432412
List<Column> cols = new ArrayList<>();
433413
for (String key : mapSchema.keySet()) {
434414
// In Glue, the full schema should exclude the partition keys
435-
if (!syncConfig.partitionFields.contains(key)) {
415+
if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) {
436416
String keyType = getPartitionKeyType(mapSchema, key);
437417
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
438418
cols.add(column);

hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java renamed to hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncTool.java

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,53 +18,44 @@
1818

1919
package org.apache.hudi.aws.sync;
2020

21-
import org.apache.hudi.common.config.TypedProperties;
22-
import org.apache.hudi.common.fs.FSUtils;
2321
import org.apache.hudi.hive.HiveSyncConfig;
2422
import org.apache.hudi.hive.HiveSyncTool;
2523

2624
import com.beust.jcommander.JCommander;
2725
import org.apache.hadoop.conf.Configuration;
28-
import org.apache.hadoop.fs.FileSystem;
29-
import org.apache.hadoop.hive.conf.HiveConf;
26+
27+
import java.util.Properties;
3028

3129
/**
3230
* Currently Experimental. Utility class that implements syncing a Hudi Table with the
3331
* AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html)
3432
* to enable querying via Glue ETLs, Athena etc.
35-
*
33+
* <p>
3634
* Extends HiveSyncTool since most logic is similar to Hive syncing,
3735
* expect using a different client {@link AWSGlueCatalogSyncClient} that implements
3836
* the necessary functionality using Glue APIs.
3937
*
4038
* @Experimental
4139
*/
42-
public class AwsGlueCatalogSyncTool extends HiveSyncTool {
43-
44-
public AwsGlueCatalogSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
45-
super(props, new HiveConf(conf, HiveConf.class), fs);
46-
}
40+
public class AWSGlueCatalogSyncTool extends HiveSyncTool {
4741

48-
public AwsGlueCatalogSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) {
49-
super(hiveSyncConfig, hiveConf, fs);
42+
public AWSGlueCatalogSyncTool(Properties props, Configuration hadoopConf) {
43+
super(props, hadoopConf);
5044
}
5145

5246
@Override
53-
protected void initClient(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) {
54-
hoodieHiveClient = new AWSGlueCatalogSyncClient(hiveSyncConfig, hiveConf, fs);
47+
protected void initSyncClient(HiveSyncConfig hiveSyncConfig) {
48+
syncClient = new AWSGlueCatalogSyncClient(hiveSyncConfig);
5549
}
5650

5751
public static void main(String[] args) {
58-
// parse the params
59-
final HiveSyncConfig cfg = new HiveSyncConfig();
60-
JCommander cmd = new JCommander(cfg, null, args);
61-
if (cfg.help || args.length == 0) {
52+
final HiveSyncConfig.HiveSyncConfigParams params = new HiveSyncConfig.HiveSyncConfigParams();
53+
JCommander cmd = JCommander.newBuilder().addObject(params).build();
54+
cmd.parse(args);
55+
if (params.isHelp()) {
6256
cmd.usage();
63-
System.exit(1);
57+
System.exit(0);
6458
}
65-
FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
66-
HiveConf hiveConf = new HiveConf();
67-
hiveConf.addResource(fs.getConf());
68-
new AwsGlueCatalogSyncTool(cfg, hiveConf, fs).syncHoodieTable();
59+
new AWSGlueCatalogSyncTool(params.toProps(), new Configuration()).syncHoodieTable();
6960
}
7061
}

hudi-cli/pom.xml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,16 @@
201201

202202
<!-- Logging -->
203203
<dependency>
204-
<groupId>log4j</groupId>
205-
<artifactId>log4j</artifactId>
204+
<groupId>org.apache.logging.log4j</groupId>
205+
<artifactId>log4j-api</artifactId>
206+
</dependency>
207+
<dependency>
208+
<groupId>org.apache.logging.log4j</groupId>
209+
<artifactId>log4j-core</artifactId>
210+
</dependency>
211+
<dependency>
212+
<groupId>org.apache.logging.log4j</groupId>
213+
<artifactId>log4j-1.2-api</artifactId>
206214
</dependency>
207215

208216
<dependency>

hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to roll
233233
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
234234
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
235235
help = "Spark executor memory") final String sparkMemory,
236-
@CliOption(key = "rollbackUsingMarkers", unspecifiedDefaultValue = "true",
236+
@CliOption(key = "rollbackUsingMarkers", unspecifiedDefaultValue = "false",
237237
help = "Enabling marker based rollback") final String rollbackUsingMarkers)
238238
throws Exception {
239239
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();

hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ public String unscheduleCompaction(
558558
@CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule Compaction for a fileId")
559559
public String unscheduleCompactFile(
560560
@CliOption(key = "fileId", mandatory = true, help = "File Id") final String fileId,
561-
@CliOption(key = "partitionPath", mandatory = true, help = "partition path") final String partitionPath,
561+
@CliOption(key = "partitionPath", unspecifiedDefaultValue = "", help = "partition path") final String partitionPath,
562562
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master,
563563
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory,
564564
@CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV,

hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public String showAllFileSlices(
119119

120120
@CliCommand(value = "show fsview latest", help = "Show latest file-system view")
121121
public String showLatestFileSlices(
122-
@CliOption(key = {"partitionPath"}, help = "A valid partition path", mandatory = true) String partition,
122+
@CliOption(key = {"partitionPath"}, help = "A valid partition path", unspecifiedDefaultValue = "") String partition,
123123
@CliOption(key = {"baseFileOnly"}, help = "Only display base file view",
124124
unspecifiedDefaultValue = "false") boolean baseFileOnly,
125125
@CliOption(key = {"maxInstant"}, help = "File-Slices upto this instant are displayed",

hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public String convert(
5353
@CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName,
5454
@CliOption(key = "tableType", mandatory = true, help = "Table type") final String tableType,
5555
@CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") final String rowKeyField,
56-
@CliOption(key = "partitionPathField", mandatory = true,
56+
@CliOption(key = "partitionPathField", unspecifiedDefaultValue = "",
5757
help = "Partition path field name") final String partitionPathField,
5858
@CliOption(key = {"parallelism"}, mandatory = true,
5959
help = "Parallelism for hoodie insert") final String parallelism,

0 commit comments

Comments
 (0)