Skip to content

Commit 88197b1

Browse files
committed
Tmp commit
1 parent 08a0b84 commit 88197b1

4 files changed

Lines changed: 33 additions & 15 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/build/BuildTaskExecutor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.apache.hudi.common.util.HoodieTimer;
3333
import org.apache.hudi.common.util.ValidationUtils;
3434
import org.apache.hudi.exception.HoodieBuildException;
35-
import org.apache.hudi.exception.HoodieIOException;
35+
import org.apache.hudi.exception.HoodieException;
3636
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
3737
import org.apache.hudi.secondary.index.HoodieSecondaryIndex;
3838
import org.apache.hudi.secondary.index.SecondaryIndexBuilder;
@@ -233,8 +233,8 @@ private void addBatch(GenericRecord[] records, int size) {
233233
indexBuilders.forEach(indexBuilder -> {
234234
try {
235235
indexBuilder.addBatch(records, size);
236-
} catch (IOException e) {
237-
throw new HoodieIOException("Add records to index builder failed, baseFile: "
236+
} catch (Exception e) {
237+
throw new HoodieException("Add records to index builder failed, baseFile: "
238238
+ baseFilePath + ", builderName: " + indexBuilder.getName(), e);
239239
}
240240
});

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/build/SparkExecuteBuildCommitActionExecutor.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,15 @@
4646
import org.apache.spark.sql.SparkSession;
4747
import org.apache.spark.sql.catalyst.QualifiedTableName;
4848
import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
49+
import org.slf4j.Logger;
50+
import org.slf4j.LoggerFactory;
4951

5052
import java.util.ArrayList;
5153
import java.util.List;
5254

5355
public class SparkExecuteBuildCommitActionExecutor<T extends HoodieRecordPayload, I, K, O>
5456
extends BaseActionExecutor<T, I, K, O, HoodieBuildCommitMetadata> {
57+
private static final Logger LOG = LoggerFactory.getLogger(SparkExecuteBuildCommitActionExecutor.class);
5558

5659
private HoodieBuildPlan buildPlan;
5760

@@ -85,17 +88,25 @@ public HoodieBuildCommitMetadata execute() {
8588
SerializableConfiguration conf = new SerializableConfiguration(context.getHadoopConf().get());
8689
JavaSparkContext sparkContext = HoodieSparkEngineContext.getSparkContext(context);
8790
HoodieJavaRDD<BuildStatus> buildStatusRDD =
88-
HoodieJavaRDD.of(sparkContext.parallelize(buildTasks, buildTasks.size())
89-
.mapPartitions(buildTasksPartition -> {
90-
List<BuildStatus> buildStatuses = new ArrayList<>();
91-
buildTasksPartition.forEachRemaining(buildTask -> {
92-
BuildStatus buildStatus = new BuildTaskExecutor(buildTask, table.getConfig().getBasePath(),
93-
indexFolderPath, serializableSchema, conf).execute();
94-
buildStatuses.add(buildStatus);
95-
});
91+
null;
92+
try {
93+
buildStatusRDD = HoodieJavaRDD.of(sparkContext.parallelize(buildTasks, buildTasks.size())
94+
.mapPartitions(buildTasksPartition -> {
95+
List<BuildStatus> buildStatuses = new ArrayList<>();
96+
buildTasksPartition.forEachRemaining(buildTask -> {
97+
BuildStatus buildStatus = new BuildTaskExecutor(buildTask, table.getConfig().getBasePath(),
98+
indexFolderPath, serializableSchema, conf).execute();
99+
buildStatuses.add(buildStatus);
100+
});
96101

97-
return buildStatuses.iterator();
98-
}));
102+
return buildStatuses.iterator();
103+
}));
104+
} catch (Exception e) {
105+
String tableName = table.getMetaClient().getTableConfig().getTableName();
106+
String basePath = table.getConfig().getBasePath();
107+
LOG.error("Build execute failed, table: {}, basePath: {}, schema: {}", tableName, basePath, schema);
108+
throw new RuntimeException(e);
109+
}
99110

100111
// Invalidate cached table for queries do not access related table through {@code DefaultSource}
101112
SessionCatalog sessionCatalog = SparkSession.active().sessionState().catalog();

hudi-common/src/main/java/org/apache/hudi/secondary/index/lucene/LuceneIndexBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public LuceneIndexBuilder(HoodieBuildTaskConfig indexConfig) {
9696
fieldTypes[i] = AvroInternalSchemaConverter.buildTypeFromAvroSchema(field.schema()).typeId();
9797
fieldNames.add(field.name());
9898
});
99-
LOG.info("Init lucene index builder ok, name: {}, indexFields: {}", name, fieldNames);
99+
LOG.warn("Init lucene index builder ok, name: {}, indexFields: {}", name, fieldNames);
100100
}
101101

102102
@Override

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBuildProcedure.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ import org.apache.hadoop.fs.Path
2323
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
2424
import org.apache.hudi.common.util.{Option => HOption}
2525
import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}
26+
import org.apache.spark.internal.Logging
2627

2728
import scala.collection.JavaConverters.asScalaIteratorConverter
2829

29-
class TestBuildProcedure extends HoodieSparkProcedureTestBase {
30+
class TestBuildProcedure extends HoodieSparkProcedureTestBase with Logging{
3031
test("Test Call run_build Procedure by Table for None-Partitioned Table") {
3132
withTempDir { tmp =>
3233
Seq("cow", "mor").foreach { tableType =>
@@ -51,6 +52,8 @@ class TestBuildProcedure extends HoodieSparkProcedureTestBase {
5152
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
5253
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
5354
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
55+
val schema = client.getConfig.getSchema
56+
logWarning(s"Exec build, table: ${tableName}, basePath: ${basePath}, schema: ${schema}")
5457
// Generate the first build plan
5558
val firstScheduledBuild = HoodieActiveTimeline.createNewInstantTime
5659
client.scheduleBuildAtInstant(firstScheduledBuild, HOption.empty())
@@ -164,6 +167,8 @@ class TestBuildProcedure extends HoodieSparkProcedureTestBase {
164167
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
165168
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
166169
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
170+
val schema = client.getConfig.getSchema
171+
logWarning(s"Exec build, table: ${tableName}, basePath: ${basePath}, schema: ${schema}")
167172
// Generate the first build plan
168173
val firstScheduledBuild = HoodieActiveTimeline.createNewInstantTime
169174
client.scheduleBuildAtInstant(firstScheduledBuild, HOption.empty())
@@ -280,6 +285,8 @@ class TestBuildProcedure extends HoodieSparkProcedureTestBase {
280285
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
281286
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
282287
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
288+
val schema = client.getConfig.getSchema
289+
logWarning(s"Exec build, table: ${tableName}, basePath: ${basePath}, schema: ${schema}")
283290
// Generate the first build plan
284291
val firstScheduledBuild = HoodieActiveTimeline.createNewInstantTime
285292
client.scheduleBuildAtInstant(firstScheduledBuild, HOption.empty())

0 commit comments

Comments
 (0)