Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
263aeaf
Removing spark-avro bundling from Hudi by default;
Mar 5, 2022
8d3b39d
Scaffolded Spark 3.2.x hierarchy:
Mar 16, 2022
433348f
Moved `ExpressionCodeGen`,`ExpressionPayload` into `hudi-spark` module
Mar 16, 2022
6c1635e
Bootstrapped
Mar 16, 2022
48942bf
Limited access to `SchemaConverters`, `AvroUtils` to `spark.sql` package
Mar 16, 2022
f92fe76
Fixing compilation for "hudi-spark3" module
Mar 16, 2022
8f0383d
Down-scoped `AvroUtils` to be Spark-version specific
Mar 17, 2022
1fd9363
Bootstrapped Spark 3.1.x Avro serializer/deserializer hierarchy
Mar 17, 2022
953e442
Bootstrapped Spark 2.4.x Avro serializer/deserializer hierarchy
Mar 17, 2022
303d532
Removed "spark-avro" dependency from all modules
Mar 17, 2022
19bb645
Tidying up
Mar 17, 2022
d51871b
Fixing compilation
Mar 17, 2022
7807a1b
Missing license
Mar 17, 2022
736c36d
Reverting accidental change
Mar 17, 2022
258c601
Fixed `AvroDeserializer` to stay compatible w/ both Spark 3.2.1 and 3…
Mar 17, 2022
9968b8d
Cleaned up all "spark-avro" refs
Mar 17, 2022
a60e736
Tidying up
Mar 17, 2022
5ddb953
Untethered bespoke Column Stats index from the write path
Mar 17, 2022
af6bcd5
Fixing compilation
Mar 17, 2022
d1d9359
Fixing tests dependent on `SparkAdapter`
Mar 17, 2022
5f5c90d
Fixing compilation
Mar 18, 2022
d621f30
Tidying up
Mar 21, 2022
d374e8d
Inlined all methods renamed in `DataSourceUtils`
Mar 21, 2022
81d964f
Tidying up
Mar 21, 2022
2a0b567
`lint`
Mar 21, 2022
20ee548
Fixing compilation
Mar 21, 2022
df5b626
Reverting incorrect change
Mar 22, 2022
7a72626
Tidying up after rebase
Mar 25, 2022
c3da8e2
Removing spark-avro from Hudi examples
Mar 25, 2022
2490c7e
Fixed compilation for Spark 3.2.0
Mar 25, 2022
265462f
Sync'd Avro Serializer/Deserializer hierarchy to Spark 3.1.3
Mar 28, 2022
4a2280b
Bumped 3.1.x branch to rely on Spark 3.1.3
Mar 28, 2022
192fe7f
Relocate "org.apache.spark.sql.avro" to avoid potential collisions w/…
Mar 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,9 @@ mvn clean package -DskipTests -Dspark3
mvn clean package -DskipTests -Dspark3.1.x
```

### Build without spark-avro module
### What about "spark-avro" module?

The default hudi-jar bundles spark-avro module. To build without spark-avro module, build using `spark-shade-unbundle-avro` profile

```
# Checkout code and build
git clone https://github.com/apache/hudi.git && cd hudi
mvn clean package -DskipTests -Pspark-shade-unbundle-avro

# Start command
spark-2.4.4-bin-hadoop2.7/bin/spark-shell \
--packages org.apache.spark:spark-avro_2.11:2.4.4 \
--jars `ls packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-*.*.*-SNAPSHOT.jar` \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
```
Starting from versions 0.11, Hudi no longer requires `spark-avro` to be specified using `--packages`

## Running Tests

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.

spark-submit \
--packages org.apache.spark:spark-avro_2.11:2.4.0 \
--conf spark.task.cpus=1 \
--conf spark.executor.cores=1 \
--conf spark.task.maxFailures=100 \
Expand Down
4 changes: 0 additions & 4 deletions hudi-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,6 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.shell</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.hudi.table;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
Expand Down Expand Up @@ -72,17 +77,9 @@
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.storage.HoodieLayoutFactory;
import org.apache.hudi.table.storage.HoodieStorageLayout;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
Expand Down Expand Up @@ -261,19 +258,6 @@ public abstract HoodieWriteMetadata<O> bulkInsertPrepped(HoodieEngineContext con
*/
public abstract HoodieWriteMetadata<O> insertOverwriteTable(HoodieEngineContext context, String instantTime, I records);

/**
* Updates Metadata Indexes (like Column Stats index)
* TODO rebase onto metadata table (post RFC-27)
*
* @param context instance of {@link HoodieEngineContext}
* @param instantTime instant of the carried operation triggering the update
*/
public abstract void updateMetadataIndexes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may I know how is this change is related to this patch ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So some of the tests that we have in "spark-client" module actually require SparkAdapter to be loaded, which lives in "hudi-spark" module, entailing that it couldn't be loaded there.

So i had to either move the tests to "hudi-spark" or remove this method (which uses AvroConversionUtil, in turn referencing SparkAdapter) which i'm removing regardless in another PR.

@Nonnull HoodieEngineContext context,
@Nonnull List<HoodieWriteStat> stats,
@Nonnull String instantTime
) throws Exception;

public HoodieWriteConfig getConfig() {
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand Down Expand Up @@ -63,12 +62,9 @@
import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -246,11 +242,6 @@ public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String
throw new HoodieNotSupportedException("DeletePartitions is not supported yet");
}

@Override
public void updateMetadataIndexes(@Nonnull HoodieEngineContext context, @Nonnull List<HoodieWriteStat> stats, @Nonnull String instantTime) {
throw new HoodieNotSupportedException("update statistics is not supported yet");
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> preppedRecords) {
throw new HoodieNotSupportedException("This method should not be invoked");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand Down Expand Up @@ -66,12 +65,9 @@
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -173,11 +169,6 @@ public HoodieWriteMetadata<List<WriteStatus>> insertOverwriteTable(HoodieEngineC
context, config, this, instantTime, records).execute();
}

@Override
public void updateMetadataIndexes(@Nonnull HoodieEngineContext context, @Nonnull List<HoodieWriteStat> stats, @Nonnull String instantTime) {
throw new HoodieNotSupportedException("update statistics is not supported yet");
}

@Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context,
String instantTime,
Expand Down
5 changes: 0 additions & 5 deletions hudi-client/hudi-spark-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

<!-- Parquet -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,6 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
finalizeWrite(table, clusteringCommitTime, writeStats);
// Update table's metadata (table)
updateTableMetadata(table, metadata, clusteringInstant);
// Update tables' metadata indexes
// NOTE: This overlaps w/ metadata table (above) and will be reconciled in the future
table.updateMetadataIndexes(context, writeStats, clusteringCommitTime);

LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.table;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
Expand All @@ -38,18 +36,14 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
Expand Down Expand Up @@ -78,21 +72,14 @@
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with
Expand Down Expand Up @@ -172,63 +159,6 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> insertOverwriteTable(HoodieE
return new SparkInsertOverwriteTableCommitActionExecutor(context, config, this, instantTime, records).execute();
}

@Override
public void updateMetadataIndexes(@Nonnull HoodieEngineContext context, @Nonnull List<HoodieWriteStat> stats, @Nonnull String instantTime) throws Exception {
updateColumnsStatsIndex(context, stats, instantTime);
}

private void updateColumnsStatsIndex(
@Nonnull HoodieEngineContext context,
@Nonnull List<HoodieWriteStat> updatedFilesStats,
@Nonnull String instantTime
) throws Exception {
String sortColsList = config.getClusteringSortColumns();
String basePath = metaClient.getBasePath();
String indexPath = metaClient.getColumnStatsIndexPath();

List<String> touchedFiles =
updatedFilesStats.stream()
.map(s -> new Path(basePath, s.getPath()).toString())
.collect(Collectors.toList());

if (touchedFiles.isEmpty() || StringUtils.isNullOrEmpty(sortColsList) || StringUtils.isNullOrEmpty(indexPath)) {
return;
}

LOG.info(String.format("Updating column-statistics index table (%s)", indexPath));

List<String> sortCols = Arrays.stream(sortColsList.split(","))
.map(String::trim)
.collect(Collectors.toList());

HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)context;

// Fetch table schema to appropriately construct col-stats index schema
Schema tableWriteSchema =
HoodieAvroUtils.createHoodieWriteSchema(
new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields()
);

List<String> completedCommits =
metaClient.getCommitsTimeline()
.filterCompletedInstants()
.getInstants()
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());

ColumnStatsIndexHelper.updateColumnStatsIndexFor(
sparkEngineContext.getSqlContext().sparkSession(),
AvroConversionUtils.convertAvroSchemaToStructType(tableWriteSchema),
touchedFiles,
sortCols,
indexPath,
instantTime,
completedCommits
);

LOG.info(String.format("Successfully updated column-statistics index at instant (%s)", instantTime));
}

@Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
*/

package org.apache.hudi

import org.apache.avro.Schema.Type
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema}
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
Expand Down Expand Up @@ -136,27 +136,36 @@ object AvroConversionUtils {
}

/**
*
* Returns avro schema from spark StructType.
*
* @param structType Dataframe Struct Type.
* @param structName Avro record name.
* @param recordNamespace Avro record namespace.
* @return Avro schema corresponding to given struct type.
*/
*
* Returns avro schema from spark StructType.
*
* @param structType Dataframe Struct Type.
* @param structName Avro record name.
* @param recordNamespace Avro record namespace.
* @return Avro schema corresponding to given struct type.
*/
def convertStructTypeToAvroSchema(structType: DataType,
structName: String,
recordNamespace: String): Schema = {
getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace), structType)
val schemaConverters = sparkAdapter.getAvroSchemaConverters
val avroSchema = schemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
getAvroSchemaWithDefaults(avroSchema, structType)
}

def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
val schemaConverters = sparkAdapter.getAvroSchemaConverters
schemaConverters.toSqlType(avroSchema) match {
case (dataType, _) => dataType.asInstanceOf[StructType]
}
}

/**
*
* Method to add default value of null to nullable fields in given avro schema
*
* @param schema input avro schema
* @return Avro schema with null default set to nullable fields
*/
*
* Method to add default value of null to nullable fields in given avro schema
*
* @param schema input avro schema
* @return Avro schema with null default set to nullable fields
*/
def getAvroSchemaWithDefaults(schema: Schema, dataType: DataType): Schema = {

schema.getType match {
Expand Down Expand Up @@ -205,10 +214,6 @@ object AvroConversionUtils {
}
}

def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
}

def getAvroRecordNameAndNamespace(tableName: String): (String, String) = {
val name = HoodieAvroUtils.sanitizeName(tableName)
(s"${name}_record", s"hoodie.${name}")
Expand Down
Loading