Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions presto-hudi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-presto-bundle</artifactId>
<version>1.0.2</version>
</dependency>

<!-- Tests -->
Expand Down Expand Up @@ -263,6 +264,30 @@
<ignoredClassPattern>module-info</ignoredClassPattern>
<ignoredClassPattern>META-INF.versions.9.module-info</ignoredClassPattern>
</ignoredClassPatterns>
<ignoredDependencies>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-presto-bundle</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
</dependency>
</ignoredDependencies>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<!-- TODO: Remove this once fixed -->
<ignoredDependencies>
<ignoredDependency>io.airlift:aircompressor:jar</ignoredDependency>
</ignoredDependencies>
</configuration>
</plugin>
</plugins>
Expand Down
133 changes: 133 additions & 0 deletions presto-hudi/src/main/java/com/facebook/presto/hudi/HudiConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.airlift.units.DataSize;
import com.facebook.airlift.units.Duration;
import jakarta.validation.constraints.DecimalMax;
import jakarta.validation.constraints.DecimalMin;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

import static com.facebook.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.SECONDS;

public class HudiConfig
{
Expand All @@ -34,6 +36,16 @@ public class HudiConfig
private int splitLoaderParallelism = 4;
private int splitGeneratorParallelism = 4;

private boolean isColumnStatsIndexEnabled = true;
private boolean isRecordLevelIndexEnabled = true;
private boolean isSecondaryIndexEnabled = true;
private boolean isPartitionStatsIndexEnabled = true;
private Duration columnStatsWaitTimeout = new Duration(1, SECONDS);
private Duration recordIndexWaitTimeout = new Duration(2, SECONDS);
private Duration secondaryIndexWaitTimeout = new Duration(2, SECONDS);
private boolean metadataPartitionListingEnabled = true;
private boolean resolveColumnNameCasingEnabled;

public boolean isMetadataTableEnabled()
{
return metadataTableEnabled;
Expand Down Expand Up @@ -132,4 +144,125 @@ public HudiConfig setSplitLoaderParallelism(int splitLoaderParallelism)
this.splitLoaderParallelism = splitLoaderParallelism;
return this;
}

@Config("hudi.index.column-stats-index-enabled")
@ConfigDescription("Internal configuration to control whether column stats index is enabled for debugging/testing.")
public HudiConfig setColumnStatsIndexEnabled(boolean isColumnStatsIndexEnabled)
{
this.isColumnStatsIndexEnabled = isColumnStatsIndexEnabled;
return this;
}

public boolean isColumnStatsIndexEnabled()
{
return isColumnStatsIndexEnabled;
}

@Config("hudi.index.record-level-index-enabled")
@ConfigDescription("Internal configuration to control whether record level index is enabled for debugging/testing.")
public HudiConfig setRecordLevelIndexEnabled(boolean isRecordLevelIndexEnabled)
{
this.isRecordLevelIndexEnabled = isRecordLevelIndexEnabled;
return this;
}

public boolean isRecordLevelIndexEnabled()
{
return isRecordLevelIndexEnabled;
}

@Config("hudi.index.secondary-index-enabled")
@ConfigDescription("Internal configuration to control whether secondary index is enabled for debugging/testing.")
public HudiConfig setSecondaryIndexEnabled(boolean isSecondaryIndexEnabled)
{
this.isSecondaryIndexEnabled = isSecondaryIndexEnabled;
return this;
}

public boolean isSecondaryIndexEnabled()
{
return isSecondaryIndexEnabled;
}

@Config("hudi.index.partition-stats-index-enabled")
@ConfigDescription("Internal configuration to control whether partition stats index is enabled for debugging/testing.")
public HudiConfig setPartitionStatsIndexEnabled(boolean isPartitionStatsIndexEnabled)
{
this.isPartitionStatsIndexEnabled = isPartitionStatsIndexEnabled;
return this;
}

public boolean isPartitionStatsIndexEnabled()
{
return isPartitionStatsIndexEnabled;
}

@Config("hudi.index.record-index.wait-timeout")
@ConfigDescription("Maximum timeout to wait for loading record index, e.g. 1000ms, 20s")
public HudiConfig setRecordIndexWaitTimeout(Duration recordIndexWaitTimeout)
{
this.recordIndexWaitTimeout = recordIndexWaitTimeout;
return this;
}

@Config("hudi.index.column-stats.wait-timeout")
@ConfigDescription("Maximum timeout to wait for loading column stats, e.g. 1000ms, 20s")
public HudiConfig setColumnStatsWaitTimeout(Duration columnStatusWaitTimeout)
{
this.columnStatsWaitTimeout = columnStatusWaitTimeout;
return this;
}

@NotNull
public Duration getColumnStatsWaitTimeout()
{
return columnStatsWaitTimeout;
}

@NotNull
public Duration getRecordIndexWaitTimeout()
{
return recordIndexWaitTimeout;
}

@Config("hudi.index.secondary-index.wait-timeout")
@ConfigDescription("Maximum timeout to wait for loading secondary index, e.g. 1000ms, 20s")
public HudiConfig setSecondaryIndexWaitTimeout(Duration secondaryIndexWaitTimeout)
{
this.secondaryIndexWaitTimeout = secondaryIndexWaitTimeout;
return this;
}

@NotNull
public Duration getSecondaryIndexWaitTimeout()
{
return secondaryIndexWaitTimeout;
}

public boolean isMetadataPartitionListingEnabled()
{
return metadataPartitionListingEnabled;
}

@Config("hudi.metadata.partition-listing.enabled")
@ConfigDescription("Enables listing table partitions through the metadata table.")
public HudiConfig setMetadataPartitionListingEnabled(boolean metadataPartitionListingEnabled)
{
this.metadataPartitionListingEnabled = metadataPartitionListingEnabled;
return this;
}


public boolean isResolveColumnNameCasingEnabled()
{
return resolveColumnNameCasingEnabled;
}

@Config("hudi.table.resolve-column-name-casing.enabled")
@ConfigDescription("Reconcile column names between the catalog schema and the Hudi table to handle case differences")
public HudiConfig setResolveColumnNameCasingEnabled(boolean resolveColumnNameCasingEnabled)
{
this.resolveColumnNameCasingEnabled = resolveColumnNameCasingEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public enum HudiErrorCode
HUDI_CANNOT_OPEN_SPLIT(0x41, EXTERNAL),
HUDI_CURSOR_ERROR(0x42, EXTERNAL),
HUDI_CANNOT_GENERATE_SPLIT(0x43, EXTERNAL),
HUDI_PARTITION_NOT_FOUND(0x44, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveColumnConverterProvider;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
Expand All @@ -40,7 +42,11 @@
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.util.Lazy;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -51,7 +57,11 @@
import static com.facebook.presto.hive.HiveColumnHandle.MAX_PARTITION_KEY_COLUMN_INDEX;
import static com.facebook.presto.hudi.HudiColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hudi.HudiColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR;
import static com.facebook.presto.hudi.HudiErrorCode.HUDI_UNKNOWN_TABLE_TYPE;
import static com.facebook.presto.hudi.HudiSessionProperties.isResolveColumnNameCasingEnabled;
import static com.facebook.presto.hudi.util.HudiUtil.buildTableMetaClient;
import static com.facebook.presto.hudi.util.HudiUtil.getLatestTableSchema;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand Down Expand Up @@ -100,13 +110,33 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
throw new PrestoException(HUDI_UNKNOWN_TABLE_TYPE, "Unknown table type " + inputFormat);
}

String basePath = table.getStorage().getLocation();
ExtendedFileSystem fs = getFileSystem(session, tableName, basePath);
return new HudiTableHandle(
table,
Lazy.lazily(() -> buildTableMetaClient(fs, tableName.getTableName(), basePath)),
table.getDatabaseName(),
table.getTableName(),
table.getStorage().getLocation(),
basePath,
hudiTableType);
}

private ExtendedFileSystem getFileSystem(ConnectorSession session, SchemaTableName table, String basePath)
{
HdfsContext hdfsContext = new HdfsContext(
session,
table.getSchemaName(),
table.getTableName(),
basePath,
false);
try {
return hdfsEnvironment.getFileSystem(hdfsContext, new Path(basePath));
}
catch (IOException e) {
throw new PrestoException(HUDI_FILESYSTEM_ERROR, "Could not open file system for " + table, e);
}
}

@Override
public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName)
{
Expand All @@ -117,16 +147,22 @@ public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTabl
@Override
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
{
HudiTableHandle handle = (HudiTableHandle) tableHandle;
HudiTableHandle hudiTableHandle = (HudiTableHandle) tableHandle;
Table table = getTable(session, tableHandle);
List<HudiColumnHandle> partitionColumns = getPartitionColumnHandles(table);
List<HudiColumnHandle> dataColumns = getDataColumnHandles(table);
HudiPredicates predicates = HudiPredicates.from(constraint.getSummary());
Optional<Lazy<Schema>> hudiTableSchema = isResolveColumnNameCasingEnabled(session) ?
Optional.of(Lazy.lazily(() -> getLatestTableSchema(hudiTableHandle.getMetaClient(), hudiTableHandle.getTableName()))) : Optional.empty();

ConnectorTableLayout layout = new ConnectorTableLayout(new HudiTableLayoutHandle(
handle,
hudiTableHandle,
dataColumns,
partitionColumns,
table.getParameters(),
constraint.getSummary()));
predicates.getRegularColumnPredicates(),
predicates.getPartitionColumnPredicates(),
hudiTableSchema));
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
}

Expand Down Expand Up @@ -241,7 +277,7 @@ static List<HudiColumnHandle> fromPartitionColumns(List<Column> partitionColumns
return builder.build();
}

static List<HudiColumnHandle> fromDataColumns(List<Column> dataColumns)
public static List<HudiColumnHandle> fromDataColumns(List<Column> dataColumns)
{
ImmutableList.Builder<HudiColumnHandle> builder = ImmutableList.builder();
int id = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.facebook.presto.hive.metastore.Storage;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.storage.StoragePath;

import java.util.List;
import java.util.Map;
Expand All @@ -32,20 +34,23 @@ public class HudiPartition
// TODO: storage and dataColumns is required from MOR record cursor, might be able to remove later
private final Storage storage;
private final List<HudiColumnHandle> dataColumns;
private final String relativePartitionPath;

@JsonCreator
public HudiPartition(
@JsonProperty("name") String name,
@JsonProperty("values") List<String> values,
@JsonProperty("keyValues") Map<String, String> keyValues,
@JsonProperty("storage") Storage storage,
@JsonProperty("dataColumns") List<HudiColumnHandle> dataColumns)
@JsonProperty("dataColumns") List<HudiColumnHandle> dataColumns,
@JsonProperty("relativePartitionPath") String relativePartitionPath)
{
this.name = requireNonNull(name, "name is null");
this.values = requireNonNull(values, "values is null");
this.keyValues = requireNonNull(keyValues, "keyValues is null");
this.storage = requireNonNull(storage, "storage is null");
this.dataColumns = requireNonNull(dataColumns, "dataColumns is null");
this.relativePartitionPath = requireNonNull(relativePartitionPath, "relativePartitionPath is null");
}

@JsonProperty
Expand Down Expand Up @@ -78,6 +83,12 @@ public List<HudiColumnHandle> getDataColumns()
return dataColumns;
}

@JsonProperty
public String getRelativePartitionPath()
{
return relativePartitionPath;
}

@Override
public boolean equals(Object o)
{
Expand Down
Loading
Loading