Skip to content

Commit 9ffba3f

Browse files
committed
[HUDI-3581] Reorganize some clazz for hudi flink
1 parent fe53bd2 commit 9ffba3f

15 files changed

Lines changed: 28 additions & 14 deletions

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ private void shutdownCallback(Function<Boolean, Boolean> callback) {
170170
if (null != callback) {
171171
callback.apply(null != error);
172172
}
173+
this.started = false;
173174
});
174175
}
175176

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -835,11 +835,18 @@ public void archive() {
835835
* Provides a new commit time for a write operation (insert/update/delete).
836836
*/
837837
public String startCommit() {
838+
HoodieTableMetaClient metaClient = createMetaClient(true);
839+
return startCommit(metaClient.getCommitActionType(), metaClient);
840+
}
841+
842+
/**
843+
* Provides a new commit time for a write operation (insert/update/delete/insert_overwrite/insert_overwrite_table) with specified action.
844+
*/
845+
public String startCommit(String actionType, HoodieTableMetaClient metaClient) {
838846
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
839847
HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());
840848
String instantTime = HoodieActiveTimeline.createNewInstantTime();
841-
HoodieTableMetaClient metaClient = createMetaClient(true);
842-
startCommit(instantTime, metaClient.getCommitActionType(), metaClient);
849+
startCommit(instantTime, actionType, metaClient);
843850
return instantTime;
844851
}
845852

hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ private void addEventToBuffer(WriteMetadataEvent event) {
364364
private void startInstant() {
365365
// put the assignment in front of metadata generation,
366366
// because the instant request from write task is asynchronous.
367-
this.instant = this.writeClient.startCommit();
367+
this.instant = this.writeClient.startCommit(tableState.commitAction, this.metaClient);
368368
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
369369
this.ckpMetadata.startInstant(this.instant);
370370
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,

hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hudi.table.format.cow;
2020

2121
import org.apache.hudi.common.fs.FSUtils;
22+
import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
2223

2324
import org.apache.flink.api.common.io.FileInputFormat;
2425
import org.apache.flink.api.common.io.FilePathFilter;

hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@
2222
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
2323
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
2424
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
25+
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
2526
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
27+
import org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
28+
import org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
2629
import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
30+
import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
2731
import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader;
2832

2933
import org.apache.flink.core.fs.Path;

hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java renamed to hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.hudi.table.format.cow;
19+
package org.apache.hudi.table.format.cow.vector;
2020

2121
import org.apache.flink.table.data.DecimalData;
2222
import org.apache.flink.table.data.vector.BytesColumnVector;

hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java renamed to hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.hudi.table.format.cow;
19+
package org.apache.hudi.table.format.cow.vector.reader;
2020

2121
import org.apache.flink.formats.parquet.vector.ParquetDictionary;
2222
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;

hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
package org.apache.hudi.table.format.cow.vector.reader;
2020

21-
import org.apache.hudi.table.format.cow.ParquetDecimalVector;
2221
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
22+
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
2323

2424
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
2525
import org.apache.flink.table.data.TimestampData;

hudi-flink/src/main/java/org/apache/hudi/table/format/cow/FixedLenBytesColumnReader.java renamed to hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/FixedLenBytesColumnReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.hudi.table.format.cow;
18+
package org.apache.hudi.table.format.cow.vector.reader;
1919

2020
import org.apache.flink.table.data.vector.writable.WritableBytesVector;
2121
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
2222
import org.apache.flink.table.data.vector.writable.WritableIntVector;
23-
2423
import org.apache.parquet.column.ColumnDescriptor;
2524
import org.apache.parquet.column.page.PageReader;
2625
import org.apache.parquet.io.api.Binary;

hudi-flink/src/main/java/org/apache/hudi/table/format/cow/Int64TimestampColumnReader.java renamed to hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.hudi.table.format.cow;
18+
package org.apache.hudi.table.format.cow.vector.reader;
1919

2020
import org.apache.flink.table.data.TimestampData;
2121
import org.apache.flink.table.data.vector.writable.WritableIntVector;

0 commit comments

Comments
 (0)