Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public class HoodieBootstrapConfig extends HoodieConfig {
.sinceVersion("0.6.0")
.withDocumentation("Implementation to use, for mapping a skeleton base file to a bootstrap base file.");

public static final ConfigProperty<String> DATA_QUERIES_ONLY = ConfigProperty
.key("hoodie.bootstrap.data.queries.only")
.defaultValue("true")
.withDocumentation("Improves query performance, but queries cannot use hudi metadata fields");

/**
* @deprecated Use {@link #BASE_PATH} and its methods instead
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieBootstrapFileReader;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
Expand Down Expand Up @@ -332,8 +333,13 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContex
List<Iterator<HoodieRecord<T>>> iteratorsForPartition = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
try {
boolean isBootstrapSkeleton = !clusteringOp.getBootstrapFilePath().isEmpty();
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema()));
HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(hadoopConf.get(), new Path(clusteringOp.getDataFilePath()));
if (isBootstrapSkeleton) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, for full bootstrap mode, it still goes through the usual base file reader correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

HoodieFileReader dataFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(hadoopConf.get(), new Path(clusteringOp.getBootstrapFilePath()));
baseFileReader = new HoodieBootstrapFileReader(baseFileReader, dataFileReader, writeConfig.isConsistentLogicalTimestampEnabled());
}
Option<BaseKeyGenerator> keyGeneratorOp =
writeConfig.populateMetaFields() ? Option.empty() : Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
Expand Down Expand Up @@ -368,9 +374,6 @@ private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc,
.stream()
.map(op -> {
ArrayList<String> readPaths = new ArrayList<>();
if (op.getBootstrapFilePath() != null) {
readPaths.add(op.getBootstrapFilePath());
}
if (op.getDataFilePath() != null) {
readPaths.add(op.getDataFilePath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
// Group by partition for efficient updates for both InMemory and DiskBased structures.
fileGroups.stream().collect(Collectors.groupingBy(HoodieFileGroup::getPartitionPath)).forEach((partition, value) -> {
if (!isPartitionAvailableInStore(partition)) {
if (bootstrapIndex.useIndex()) {
if (!partition.isEmpty() && bootstrapIndex.useIndex()) {
try (BootstrapIndex.IndexReader reader = bootstrapIndex.createReader()) {
LOG.info("Bootstrap Index available for partition " + partition);
List<BootstrapFileMapping> sourceFileMappings =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package org.apache.hudi.io.storage;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.collection.ClosableIterator;

import org.apache.avro.Schema;

import java.io.IOException;
import java.util.Set;


public class HoodieBootstrapFileReader<T> implements HoodieFileReader<T> {

private HoodieFileReader<T> skeletonFileReader;
private HoodieFileReader<T> dataFileReader;
private Boolean isConsistentLogicalTimestampEnabled;

public HoodieBootstrapFileReader(HoodieFileReader<T> skeletonFileReader, HoodieFileReader<T> dataFileReader, Boolean isConsistentLogicalTimestampEnabled) {
this.skeletonFileReader = skeletonFileReader;
this.dataFileReader = dataFileReader;
this.isConsistentLogicalTimestampEnabled = isConsistentLogicalTimestampEnabled;
}
@Override
public String[] readMinMaxRecordKeys() {
return skeletonFileReader.readMinMaxRecordKeys();
}

@Override
public BloomFilter readBloomFilter() {
return skeletonFileReader.readBloomFilter();
}

@Override
public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
return skeletonFileReader.filterRowKeys(candidateRowKeys);
}

@Override
public ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
ClosableIterator<HoodieRecord<T>> skeletonIterator = skeletonFileReader.getRecordIterator(readerSchema, requestedSchema);
ClosableIterator<HoodieRecord<T>> dataFileIterator = dataFileReader.getRecordIterator(HoodieAvroUtils.removeMetadataFields(readerSchema), requestedSchema);

return new ClosableIterator<HoodieRecord<T>>() {
@Override
public void close() {
skeletonIterator.close();
dataFileIterator.close();
}

@Override
public boolean hasNext() {
return skeletonIterator.hasNext() && dataFileIterator.hasNext();
}

@Override
public HoodieRecord<T> next() {
HoodieRecord<T> dataRecord = dataFileIterator.next();
HoodieRecord<T> skeletonRecord = skeletonIterator.next();
HoodieRecord<T> ret = dataRecord.prependMetaFields(readerSchema, readerSchema, new MetadataValues().
setCommitTime(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.COMMIT_TIME_METADATA_FIELD ))
.setCommitSeqno(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))
.setRecordKey(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.RECORD_KEY_METADATA_FIELD))
.setPartitionPath(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.PARTITION_PATH_METADATA_FIELD))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the skeleton record giving the partition path?

.setFileName(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.FILENAME_METADATA_FIELD)),null);
return ret;
}
};
}

@Override
public Schema getSchema() {
return skeletonFileReader.getSchema();
}

@Override
public void close() {
skeletonFileReader.close();
dataFileReader.close();
}

@Override
public long getTotalRecords() {
return Math.min(skeletonFileReader.getTotalRecords(), dataFileReader.getTotalRecords());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hadoop.CachingPath
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
Expand Down Expand Up @@ -480,7 +481,9 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
try {
val tableConfig = metaClient.getTableConfig
if (extractPartitionValuesFromPartitionPath) {
val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString
val tablePathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(metaClient.getBasePathV2)
val partitionPathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(file.getPath.getParent)
val relativePath = new URI(tablePathWithoutScheme.toString).relativize(new URI(partitionPathWithoutScheme.toString)).toString
val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean
if (hiveStylePartitioningEnabled) {
val partitionSpec = PartitioningUtils.parsePathFragment(relativePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ import org.apache.hudi.HoodieBootstrapRelation.validate
import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isMetaField, removeMetaFields}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

case class HoodieBootstrapSplit(dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile] = None) extends HoodieFileSplit

case class HoodieBootstrapSplit(filePartition: FilePartition, dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile] = None) extends HoodieFileSplit
/**
* This is Spark relation that can be used for querying metadata/fully bootstrapped query hoodie tables, as well as
* non-bootstrapped tables. It implements PrunedFilteredScan interface in order to support column pruning and filter
Expand All @@ -58,16 +58,19 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext,
override val metaClient: HoodieTableMetaClient,
override val optParams: Map[String, String],
private val prunedDataSchema: Option[StructType] = None)
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema) {
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema)
with SparkAdapterSupport {

override type FileSplit = HoodieBootstrapSplit
override type Relation = HoodieBootstrapRelation

private lazy val skeletonSchema = HoodieSparkUtils.getMetaSchema

override val mandatoryFields: Seq[String] = Seq.empty
private val dataOnly = !optParams.contains(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key()) ||
(optParams.get(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key).get == "true")

protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit] = {
private def collectBootstrapFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit] = {
val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters)
fileSlices.map { fileSlice =>
val baseFile = fileSlice.getBaseFile.get()
Expand All @@ -78,27 +81,63 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext,
val dataFile = PartitionedFile(partitionValues, baseFile.getBootstrapBaseFile.get().getPath, 0, baseFile.getBootstrapBaseFile.get().getFileLen)
val skeletonFile = Option(PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen))

HoodieBootstrapSplit(dataFile, skeletonFile)
HoodieBootstrapSplit(null,dataFile,skeletonFile)
} else {
val dataFile = PartitionedFile(getPartitionColumnsAsInternalRow(baseFile.getFileStatus), baseFile.getPath, 0, baseFile.getFileLen)
HoodieBootstrapSplit(dataFile)
HoodieBootstrapSplit(null,dataFile)
}
}
}

protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBootstrapSplit] = {
if (!dataOnly) {
return collectBootstrapFileSplits(partitionFilters, dataFilters)
}
val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters)
val fileSplits = fileSlices.flatMap { fileSlice =>
val baseFile = fileSlice.getBaseFile.get()
val (fs, partitionValues) = if (baseFile.getBootstrapBaseFile.isPresent) {
(baseFile.getBootstrapBaseFile.get.getFileStatus, getPartitionColumnsAsInternalRowInternal(baseFile.getFileStatus, extractPartitionValuesFromPartitionPath = true))
} else {
(baseFile.getFileStatus, getPartitionColumnsAsInternalRow(baseFile.getFileStatus))
}
// TODO fix, currently assuming parquet as underlying format
HoodieDataSourceHelper.splitFiles(
sparkSession = sparkSession,
file = fs,
partitionValues = partitionValues
)
}
// NOTE: It's important to order the splits in the reverse order of their
// size so that we can subsequently bucket them in an efficient manner
.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes

sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes)
.map(s => HoodieBootstrapSplit.apply(s, null))
}

protected override def composeRDD(fileSplits: Seq[FileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
filters: Array[Filter]): RDD[InternalRow] = {


val regularFileReader = createRegularFileReader(tableSchema, requiredSchema, filters)

if (dataOnly) {
return sparkAdapter.createHoodieFileScanRDD(sparkSession, regularFileReader.apply,
fileSplits.map(_.filePartition), requiredSchema.structTypeSchema).asInstanceOf[HoodieUnsafeRDD]
}

val requiredSkeletonFileSchema =
StructType(skeletonSchema.filter(f => requestedColumns.exists(col => resolver(f.name, col))))

val (bootstrapDataFileReader, bootstrapSkeletonFileReader) =
createBootstrapFileReaders(tableSchema, requiredSchema, requiredSkeletonFileSchema, filters)

val regularFileReader = createRegularFileReader(tableSchema, requiredSchema, filters)

new HoodieBootstrapRDD(sqlContext.sparkSession, bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader,
requiredSchema, fileSplits)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ class TestBootstrapProcedure extends HoodieSparkProcedureTestBase {
assertResult(10) {
result.length
}
spark.sql("set hoodie.datasource.write.row.writer.enable = false")
spark.sql(
s"""call run_clustering(table => '$tableName')""".stripMargin)
spark.sql(s"select * from $tableName").show(false)
}
}

Expand Down