Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
import org.apache.hudi.io.storage.{HoodieParquetConfig, HoodieParquetWriter}
import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieParquetWriter}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}
Expand All @@ -45,7 +45,7 @@ object SparkHelpers {
val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble,
HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE);
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble)
val parquetConfig: HoodieAvroParquetConfig = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble)

// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,19 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
*/
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata) {
HoodieTableMetaClient metaClient = createMetaClient(false);
return commit(instantTime, writeStatuses, extraMetadata, metaClient.getCommitActionType());
List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
return commitStats(instantTime, stats, extraMetadata);
}

private boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata, String actionType) {

public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata) {
LOG.info("Committing " + instantTime);
HoodieTableMetaClient metaClient = createMetaClient(false);
String actionType = metaClient.getCommitActionType();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);

HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), stat));

// Finalize write
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client;

import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.collection.Pair;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
* Hoodie's internal write status used in datasource implementation of bulk insert.
*/
public class HoodieInternalWriteStatus implements Serializable {

private static final long serialVersionUID = 1L;
private static final long RANDOM_SEED = 9038412832L;

private String fileId;
private String partitionPath;
private List<String> successRecordKeys = new ArrayList<>();
private List<Pair<String, Throwable>> failedRecordKeys = new ArrayList<>();

private HoodieWriteStat stat;

private long totalRecords = 0;
private long totalErrorRecords = 0;
private Throwable globalError = null;

private final double failureFraction;
private final boolean trackSuccessRecords;
private final transient Random random;

public HoodieInternalWriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
this.trackSuccessRecords = trackSuccessRecords;
this.failureFraction = failureFraction;
this.random = new Random(RANDOM_SEED);
}

public void markSuccess(String recordKey) {
if (trackSuccessRecords) {
this.successRecordKeys.add(recordKey);
}
totalRecords++;
}

public void markFailure(String recordKey, Throwable t) {
if (failedRecordKeys.isEmpty() || (random.nextDouble() <= failureFraction)) {
failedRecordKeys.add(Pair.of(recordKey, t));
}
totalRecords++;
}

public boolean hasErrors() {
return failedRecordKeys.size() != 0;
}

public HoodieWriteStat getStat() {
return stat;
}

public void setStat(HoodieWriteStat stat) {
this.stat = stat;
}

public String getFileId() {
return fileId;
}

public void setFileId(String fileId) {
this.fileId = fileId;
}

public String getPartitionPath() {
return partitionPath;
}

public void setPartitionPath(String partitionPath) {
this.partitionPath = partitionPath;
}

public List<String> getSuccessRecordKeys() {
return successRecordKeys;
}

public long getFailedRowsSize() {
return failedRecordKeys.size();
}

public List<Pair<String, Throwable>> getFailedRecordKeys() {
return failedRecordKeys;
}

public void setFailedRecordKeys(List<Pair<String, Throwable>> failedRecordKeys) {
this.failedRecordKeys = failedRecordKeys;
}

public long getTotalRecords() {
return totalRecords;
}

public void setTotalRecords(long totalRecords) {
this.totalRecords = totalRecords;
}

public long getTotalErrorRecords() {
return totalErrorRecords;
}

public void setTotalErrorRecords(long totalErrorRecords) {
this.totalErrorRecords = totalErrorRecords;
}

public Throwable getGlobalError() {
return globalError;
}

public void setGlobalError(Throwable globalError) {
this.globalError = globalError;
}

public void setSuccessRecordKeys(List<String> successRecordKeys) {
this.successRecordKeys = successRecordKeys;
}

@Override
public String toString() {
return "PartitionPath " + partitionPath + ", FileID " + fileId + ", Success records "
+ totalRecords + ", errored Rows " + totalErrorRecords
+ ", global error " + (globalError != null);
}
}
Loading