diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 4b747d3a77c00..2f425acbc7f2b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -334,7 +334,7 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met * @param metadata instance of {@link HoodieCommitMetadata}. */ protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { - context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table"); + context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName()); table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime, table.isTableServiceAction(actionType))); } @@ -1038,7 +1038,7 @@ public void dropIndex(List partitionTypes) { HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant); this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty()); try { - context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table"); + context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table: " + config.getTableName()); table.getMetadataWriter(dropInstant).ifPresent(w -> { try { ((HoodieTableMetadataWriter) w).dropMetadataPartitions(partitionTypes); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java index 40e8f85a3ac70..d006b52b3306a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java @@ -85,7 +85,7 @@ public List validateCompactionPlan(HoodieTableMetaClient met if (plan.getOperations() != null) { List ops = plan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); - context.setJobStatus(this.getClass().getSimpleName(), "Validate compaction operations"); + context.setJobStatus(this.getClass().getSimpleName(), "Validate compaction operations: " + config.getTableName()); return context.map(ops, op -> { try { return validateCompactionOperation(metaClient, compactionInstant, op, Option.of(fsView)); @@ -351,7 +351,7 @@ private List runRenamingOps(HoodieTableMetaClient metaClient, } else { LOG.info("The following compaction renaming operations needs to be performed to un-schedule"); if (!dryRun) { - context.setJobStatus(this.getClass().getSimpleName(), "Execute unschedule operations"); + context.setJobStatus(this.getClass().getSimpleName(), "Execute unschedule operations: " + config.getTableName()); return context.map(renameActions, lfPair -> { try { LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath()); @@ -394,7 +394,7 @@ public List> getRenamingActionsForUnschedulin "Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant); List ops = plan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); - context.setJobStatus(this.getClass().getSimpleName(), "Generate compaction unscheduling operations"); + context.setJobStatus(this.getClass().getSimpleName(), "Generate compaction unscheduling operations: " + config.getTableName()); return context.flatMap(ops, op -> { try { return getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 190a5fe1c6064..41bcf001a0b6d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -519,7 +519,7 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo new Path(metaClient.getMetaPath(), archivedInstant.getFileName()) ).map(Path::toString).collect(Collectors.toList()); - context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants"); + context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName()); Map resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false); for (Map.Entry result : resultDeleteInstantFiles.entrySet()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index b714c50334b4f..9b3dc8df0098a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -83,7 +83,7 @@ public static List getLatestBaseFilesForPartition( public static List> getLatestBaseFilesForAllPartitions(final List partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) { - context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions"); + context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions: " + hoodieTable.getConfig().getTableName()); return context.flatMap(partitions, partitionPath -> { List> filteredFiles = getLatestBaseFilesForPartition(partitionPath, hoodieTable).stream() diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index aeaf78672680d..6545c642c4ccb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -167,7 +167,7 @@ List> loadColumnRangesFromFiles( .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId())) .collect(toList()); - context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)"); + context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on): " + config.getTableName()); return context.map(partitionPathFileIDList, pf -> { try { HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf); @@ -209,7 +209,7 @@ private List> getFileInfoForLatestBaseFiles( protected List> loadColumnRangesFromMetaIndex( List partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) { // also obtain file ranges, if range pruning is enabled - context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices"); + context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices: " + config.getTableName()); final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); return context.flatMap(partitions, partitionName -> { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index d080d14a69fad..f5a96fb676131 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -1047,7 +1047,7 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan private void initialCommit(String createInstantTime, List partitionTypes) { // List all partitions in the basePath of the containing dataset LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); - engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions"); + engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions: " + dataWriteConfig.getTableName()); Map> partitionToRecordsMap = new HashMap<>(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index f6f73f633ef5d..807865dae2416 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -566,7 +566,7 @@ public void finalizeWrite(HoodieEngineContext context, String instantTs, List>> invalidFilesByPartition) { // Now delete partially written files - context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation"); + context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation: " + config.getTableName()); context.map(new ArrayList<>(invalidFilesByPartition.values()), partitionWithFileList -> { final FileSystem fileSystem = metaClient.getFs(); LOG.info("Deleting invalid data files=" + partitionWithFileList); @@ -642,7 +642,7 @@ protected void reconcileAgainstMarkers(HoodieEngineContext context, } // Now delete partially written files - context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files"); + context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files: " + config.getTableName()); deleteInvalidFilesByPartitions(context, invalidPathsByPartition); // Now ensure the deleted files disappear @@ -665,7 +665,7 @@ protected void reconcileAgainstMarkers(HoodieEngineContext context, */ private void waitForAllFiles(HoodieEngineContext context, Map>> groupByPartition, FileVisibility visibility) { // This will either ensure all files to be deleted are present. - context.setJobStatus(this.getClass().getSimpleName(), "Wait for all files to appear/disappear"); + context.setJobStatus(this.getClass().getSimpleName(), "Wait for all files to appear/disappear: " + config.getTableName()); boolean checkPassed = context.map(new ArrayList<>(groupByPartition.entrySet()), partitionWithFileList -> waitForCondition(partitionWithFileList.getKey(), partitionWithFileList.getValue().stream(), visibility), config.getFinalizeWriteParallelism()) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 2bb277b05b4f8..30ed27b39b77a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -132,7 +132,7 @@ List clean(HoodieEngineContext context, HoodieCleanerPlan clean config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); - context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions"); + context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions: " + config.getTableName()); Stream> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream() diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index fb2df582bfe15..d8e51bcd1643e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -96,7 +96,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { try { CleanPlanner planner = new CleanPlanner<>(context, table, config); Option earliestInstant = planner.getEarliestCommitToRetain(); - context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned"); + context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned: " + config.getTableName()); List partitionsToClean = planner.getPartitionPathsToClean(earliestInstant); if (partitionsToClean.isEmpty()) { @@ -107,7 +107,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); - context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned"); + context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName()); Map>> cleanOpsWithPartitionMeta = context .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java index 6d5372b47297d..846afec7c1db3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java @@ -49,7 +49,7 @@ public HoodieWriteMetadata write(String instantTime, I taggedRecords = dedupedRecords; if (table.getIndex().requiresTagging(operationType)) { // perform index loop up to get existing location of records - context.setJobStatus(this.getClass().getSimpleName(), "Tagging"); + context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName()); taggedRecords = tag(dedupedRecords, context, table); } Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index d548e07eac8a5..75954872aedd5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -133,7 +133,7 @@ public HoodieData compact( .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); LOG.info("Compactor compacting " + operations + " files"); - context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices"); + context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices: " + config.getTableName()); TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier(); return context.parallelize(operations).map(operation -> compact( compactionHandler, metaClient, config, operation, compactionInstantTime, taskContextSupplier)) @@ -288,7 +288,7 @@ HoodieCompactionPlan generateCompactionPlan( SliceView fileSystemView = hoodieTable.getSliceView(); LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); - context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact"); + context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact: " + config.getTableName()); List operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView .getLatestFileSlices(partitionPath) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java index 24c0dbc80ed80..5c184e77dfaa2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java @@ -88,7 +88,7 @@ public HoodieWriteMetadata> execute() { context, compactionPlan, table, configCopy, instantTime, compactionHandler); compactor.maybePersist(statuses, config); - context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata"); + context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata: " + config.getTableName()); List updateStatusMap = statuses.map(WriteStatus::getStat).collectAsList(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); for (HoodieWriteStat stat : updateStatusMap) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java index d3cc5660bc70a..05fb7c0c92d1d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -119,7 +119,7 @@ private HoodieCompactionPlan scheduleCompaction() { .collect(Collectors.toSet()); // exclude files in pending clustering from compaction. fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet())); - context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan"); + context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan: " + config.getTableName()); return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering); } catch (IOException e) { throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java index 8475afe16eea0..8d5e767307d78 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java @@ -72,7 +72,7 @@ public BaseRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig co public List performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) { int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); - context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions"); + context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions: " + config.getTableName()); // If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize // is failing with com.esotericsoftware.kryo.KryoException // stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8 @@ -88,7 +88,7 @@ public List performRollback(HoodieEngineContext context, Hoo public List collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) { int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); - context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade"); + context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade: " + config.getTableName()); // If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize // is failing with com.esotericsoftware.kryo.KryoException // stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8 diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java index e3159abad8de7..aa9e0b6583a24 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java @@ -88,7 +88,7 @@ public List getRollbackRequests(HoodieInstant instantToRo FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false); int numPartitions = Math.max(Math.min(partitionPaths.size(), config.getRollbackParallelism()), 1); - context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing Rollback Plan"); + context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing Rollback Plan: " + config.getTableName()); HoodieTableType tableType = table.getMetaClient().getTableType(); String baseFileExtension = getBaseFileExtension(metaClient); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java index 134b238852cd3..7f408c1b8d24a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java @@ -84,7 +84,7 @@ public HoodieSavepointMetadata execute() { ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained), "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained); - context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime); + context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime + " " + table.getConfig().getTableName()); List partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath()); Map> latestFilesMap = context.mapToPair(partitions, partitionPath -> { // Scan all partitions files with this commit time diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java index 3dacf1e1302c5..07428dd936469 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java @@ -84,7 +84,7 @@ public Option createIfNotExists(String partitionPath, String dataFileName, */ public void quietDeleteMarkerDir(HoodieEngineContext context, int parallelism) { try { - context.setJobStatus(this.getClass().getSimpleName(), "Deleting marker directory"); + context.setJobStatus(this.getClass().getSimpleName(), "Deleting marker directory: " + basePath); deleteMarkerDir(context, parallelism); } catch (Exception e) { LOG.warn("Error deleting marker directory for instant " + instantTime, e); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 271ba95d941e8..524758a675cfc 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -356,7 +356,7 @@ public void completeCompaction( HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) { - this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); + this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName()); List writeStats = metadata.getWriteStats(); final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); try { @@ -508,7 +508,7 @@ public Map> getPartitionToReplacedFileIds( List partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath()); if (partitionPaths != null && partitionPaths.size() > 0) { - context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions"); + context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions: " + config.getTableName()); partitionToExistingFileIds = partitionPaths.stream().parallel() .collect( Collectors.toMap( diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java index deaf934cf5d03..fb19259b55591 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java @@ -230,7 +230,7 @@ private Map> getSmallFilesForPartitions(List par } if (partitionPaths != null && partitionPaths.size() > 0) { - context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions"); + context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions: " + config.getTableName()); partitionSmallFilesMap = context.mapToPair(partitionPaths, partitionPath -> new ImmutablePair<>(partitionPath, getSmallFiles(partitionPath)), 0); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 7b0c8bbc8d25c..3b512f0bdc871 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -117,7 +117,7 @@ protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) { @Override public boolean commit(String instantTime, JavaRDD writeStatuses, Option> extraMetadata, String commitActionType, Map> partitionToReplacedFileIds) { - context.setJobStatus(this.getClass().getSimpleName(), "Committing stats"); + context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " + config.getTableName()); List writeStats = writeStatuses.map(WriteStatus::getStat).collect(); return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds); } @@ -303,7 +303,7 @@ public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) { - this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); + this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName()); List writeStats = metadata.getWriteStats(); final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); try { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java index 9c2f37d56a509..c9fb895adc401 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java @@ -126,7 +126,7 @@ private Map computeComparisonsPerFileGroup( if (config.getBloomIndexPruneByRanges()) { // we will just try exploding the input and then count to determine comparisons // FIX(vc): Only do sampling here and extrapolate? - context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files"); + context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files: " + config.getTableName()); fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey(); } else { fileToComparisons = new HashMap<>(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index 504da8a722810..4e488047d845e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -334,7 +334,7 @@ private HoodieData runMetadataBootstrap(List getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(), partitionFsPair.getRight().getLeft(), keyGenerator)); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 205da82ac145d..f8e4b31ff687e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -113,7 +113,7 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext context, } private HoodieData> clusteringHandleUpdate(HoodieData> inputRecords, Set fileGroupsInPendingClustering) { - context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering"); + context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering: " + config.getTableName()); UpdateStrategy>> updateStrategy = (UpdateStrategy>>) ReflectionUtils .loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering); Pair>, Set> recordsAndPendingClusteringFileGroups = @@ -152,7 +152,7 @@ public HoodieWriteMetadata> execute(HoodieData> execute(HoodieData> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering); - context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data"); + context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName()); HoodieData writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); updateIndexAndCommitIfNeeded(writeStatuses, result); @@ -280,7 +280,7 @@ protected void setCommitMetadata(HoodieWriteMetadata> re @Override protected void commit(Option> extraMetadata, HoodieWriteMetadata> result) { - context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect"); + context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect: " + config.getTableName()); commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collectAsList()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index c54c526253f0b..c2f5a43066d36 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -266,7 +266,7 @@ private Map> getSmallFilesForPartitions(List par } if (partitionPaths != null && partitionPaths.size() > 0) { - context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions"); + context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions: " + config.getTableName()); JavaRDD partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size()); partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction>) partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index b5d7dc4b107dd..a2e81a3371d10 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -174,7 +174,7 @@ protected JavaRDD> buildHoodieRecordsForImport ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class)); HoodieEngineContext context = new HoodieSparkEngineContext(jsc); - context.setJobStatus(this.getClass().getSimpleName(), "Build records for import"); + context.setJobStatus(this.getClass().getSimpleName(), "Build records for import: " + cfg.tableName); return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration()) // To reduce large number of tasks. diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java index a2717a35617f3..402b380a00e08 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java @@ -107,7 +107,7 @@ public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDi fs.delete(new Path(outputDir), true); } - context.setJobStatus(this.getClass().getSimpleName(), "Creating a snapshot"); + context.setJobStatus(this.getClass().getSimpleName(), "Creating a snapshot: " + baseDir); List> filesToCopy = context.flatMap(partitions, partition -> { // Only take latest version files <= latestCommit. diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index 255393b232eb1..753765fb6a504 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -177,7 +177,7 @@ private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List part : ReflectionUtils.loadClass(cfg.outputPartitioner); HoodieEngineContext context = new HoodieSparkEngineContext(jsc); - context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset"); + context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset: " + cfg.targetOutputPath); final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg); Iterator exportingFilePaths = jsc .parallelize(partitions, partitions.size())