Skip to content

Commit 52e63b3

Browse files
wqwl611wqwl611
andauthored
[HUDI-4097] add table info to jobStatus (#5529)
Co-authored-by: wqwl611 <[email protected]>
1 parent 5c4813f commit 52e63b3

27 files changed

Lines changed: 41 additions & 41 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met
334334
* @param metadata instance of {@link HoodieCommitMetadata}.
335335
*/
336336
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
337-
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table");
337+
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
338338
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
339339
table.isTableServiceAction(actionType)));
340340
}
@@ -1038,7 +1038,7 @@ public void dropIndex(List<MetadataPartitionType> partitionTypes) {
10381038
HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant);
10391039
this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty());
10401040
try {
1041-
context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table");
1041+
context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table: " + config.getTableName());
10421042
table.getMetadataWriter(dropInstant).ifPresent(w -> {
10431043
try {
10441044
((HoodieTableMetadataWriter) w).dropMetadataPartitions(partitionTypes);

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public List<ValidationOpResult> validateCompactionPlan(HoodieTableMetaClient met
8585
if (plan.getOperations() != null) {
8686
List<CompactionOperation> ops = plan.getOperations().stream()
8787
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
88-
context.setJobStatus(this.getClass().getSimpleName(), "Validate compaction operations");
88+
context.setJobStatus(this.getClass().getSimpleName(), "Validate compaction operations: " + config.getTableName());
8989
return context.map(ops, op -> {
9090
try {
9191
return validateCompactionOperation(metaClient, compactionInstant, op, Option.of(fsView));
@@ -351,7 +351,7 @@ private List<RenameOpResult> runRenamingOps(HoodieTableMetaClient metaClient,
351351
} else {
352352
LOG.info("The following compaction renaming operations needs to be performed to un-schedule");
353353
if (!dryRun) {
354-
context.setJobStatus(this.getClass().getSimpleName(), "Execute unschedule operations");
354+
context.setJobStatus(this.getClass().getSimpleName(), "Execute unschedule operations: " + config.getTableName());
355355
return context.map(renameActions, lfPair -> {
356356
try {
357357
LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath());
@@ -394,7 +394,7 @@ public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulin
394394
"Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant);
395395
List<CompactionOperation> ops = plan.getOperations().stream()
396396
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
397-
context.setJobStatus(this.getClass().getSimpleName(), "Generate compaction unscheduling operations");
397+
context.setJobStatus(this.getClass().getSimpleName(), "Generate compaction unscheduling operations: " + config.getTableName());
398398
return context.flatMap(ops, op -> {
399399
try {
400400
return getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op,

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, Hoo
519519
new Path(metaClient.getMetaPath(), archivedInstant.getFileName())
520520
).map(Path::toString).collect(Collectors.toList());
521521

522-
context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants");
522+
context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName());
523523
Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false);
524524

525525
for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
8383
public static List<Pair<String, HoodieBaseFile>> getLatestBaseFilesForAllPartitions(final List<String> partitions,
8484
final HoodieEngineContext context,
8585
final HoodieTable hoodieTable) {
86-
context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions");
86+
context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions: " + hoodieTable.getConfig().getTableName());
8787
return context.flatMap(partitions, partitionPath -> {
8888
List<Pair<String, HoodieBaseFile>> filteredFiles =
8989
getLatestBaseFilesForPartition(partitionPath, hoodieTable).stream()

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
167167
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
168168
.collect(toList());
169169

170-
context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
170+
context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on): " + config.getTableName());
171171
return context.map(partitionPathFileIDList, pf -> {
172172
try {
173173
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
@@ -209,7 +209,7 @@ private List<Pair<String, BloomIndexFileInfo>> getFileInfoForLatestBaseFiles(
209209
protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
210210
List<String> partitions, final HoodieEngineContext context, final HoodieTable<?, ?, ?, ?> hoodieTable) {
211211
// also obtain file ranges, if range pruning is enabled
212-
context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices");
212+
context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices: " + config.getTableName());
213213

214214
final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
215215
return context.flatMap(partitions, partitionName -> {

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1047,7 +1047,7 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan
10471047
private void initialCommit(String createInstantTime, List<MetadataPartitionType> partitionTypes) {
10481048
// List all partitions in the basePath of the containing dataset
10491049
LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath());
1050-
engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions");
1050+
engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions: " + dataWriteConfig.getTableName());
10511051

10521052
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
10531053

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ public void finalizeWrite(HoodieEngineContext context, String instantTs, List<Ho
566566

567567
private void deleteInvalidFilesByPartitions(HoodieEngineContext context, Map<String, List<Pair<String, String>>> invalidFilesByPartition) {
568568
// Now delete partially written files
569-
context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation");
569+
context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation: " + config.getTableName());
570570
context.map(new ArrayList<>(invalidFilesByPartition.values()), partitionWithFileList -> {
571571
final FileSystem fileSystem = metaClient.getFs();
572572
LOG.info("Deleting invalid data files=" + partitionWithFileList);
@@ -642,7 +642,7 @@ protected void reconcileAgainstMarkers(HoodieEngineContext context,
642642
}
643643

644644
// Now delete partially written files
645-
context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files");
645+
context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files: " + config.getTableName());
646646
deleteInvalidFilesByPartitions(context, invalidPathsByPartition);
647647

648648
// Now ensure the deleted files disappear
@@ -665,7 +665,7 @@ protected void reconcileAgainstMarkers(HoodieEngineContext context,
665665
*/
666666
private void waitForAllFiles(HoodieEngineContext context, Map<String, List<Pair<String, String>>> groupByPartition, FileVisibility visibility) {
667667
// This will either ensure all files to be deleted are present.
668-
context.setJobStatus(this.getClass().getSimpleName(), "Wait for all files to appear/disappear");
668+
context.setJobStatus(this.getClass().getSimpleName(), "Wait for all files to appear/disappear: " + config.getTableName());
669669
boolean checkPassed =
670670
context.map(new ArrayList<>(groupByPartition.entrySet()), partitionWithFileList -> waitForCondition(partitionWithFileList.getKey(),
671671
partitionWithFileList.getValue().stream(), visibility), config.getFinalizeWriteParallelism())

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan clean
132132
config.getCleanerParallelism());
133133
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
134134

135-
context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions");
135+
context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions: " + config.getTableName());
136136

137137
Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition =
138138
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
9696
try {
9797
CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
9898
Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
99-
context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned");
99+
context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned: " + config.getTableName());
100100
List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
101101

102102
if (partitionsToClean.isEmpty()) {
@@ -107,7 +107,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
107107
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
108108
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
109109

110-
context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned");
110+
context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());
111111

112112
Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
113113
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public HoodieWriteMetadata<O> write(String instantTime,
4949
I taggedRecords = dedupedRecords;
5050
if (table.getIndex().requiresTagging(operationType)) {
5151
// perform index loop up to get existing location of records
52-
context.setJobStatus(this.getClass().getSimpleName(), "Tagging");
52+
context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName());
5353
taggedRecords = tag(dedupedRecords, context, table);
5454
}
5555
Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());

0 commit comments

Comments
 (0)