Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connector.catalog;

import java.util.Map;

import org.apache.spark.annotation.Experimental;

/**
* Catalog methods for working with Partitions.
*/
@Experimental
public interface SupportsPartitions extends TableCatalog {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to extend TableCatalog instead of Table? I think it would be better to support partitions at a table level.

Doing it this way creates more complexity for implementations because they need to handle more cases. For example, if the table doesn't exist, this should throw NoSuchTableException just like loadTable. It would be simpler for the API if these methods were used to manipulate a table, not to load and manipulate a table. Loading should be orthogonal to partition operations.

Another issue is that this assumes a table catalog contains tables that support partitions, or tables that do not. But Spark's built-in catalog supports some sources that don't expose partitions and some that do. This would cause more work for many catalogs, which would need to detect whether a table has support and throw UnsupportedOperationException if it does not. That also makes integration more difficult for Spark because it can't check a table in the analyzer to determine whether it supports the operation or not. Instead, Spark would need to handle exceptions at runtime.

Copy link
Contributor Author

@jackylee-ch jackylee-ch Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable to me.
The reason I want defined it as Catalog API is I think Catalog API is used to manage metadata for Partition and Table API is used for the actual data operation.
However, as you said, there are some source, such as mysql or FileTable will use partition API to manage partition data. Thus making Partition API as part of Table API is a better way.
Thanks


/**
* Create partitions in an existing table, assuming it exists.
*
* @param ident a table identifier
* @param partitions transforms to use for partitioning data in the table
* @param ignoreIfExists
*/
void createPartitions(
Identifier ident,
TablePartition[] partitions,
Boolean ignoreIfExists);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should this be boolean? also please add to the javadoc for this.

Copy link
Contributor Author

@jackylee-ch jackylee-ch Jul 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be renamed to createPartition(identifier, properties)


/**
* Drop partitions from a table, assuming they exist.
*
* @param ident a table identifier
* @param partitions a list of string map for existing partitions
* @param ignoreIfNotExists
*/
void dropPartitions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other places in the v2 API, we leave ifNotExists to Spark so that we don't need to pass it. Instead of passing an extra parameter, the method returns a boolean to indicate whether the partition was dropped or if no action was taken. Either way, the partition should not exist after the method call so it is idempotent.

Then, Spark decides whether to throw an exception because the partition did not exist. We prefer that pattern of doing more in Spark to make behavior standard across sources and to make the requirements as simple as we can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I will change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be renamed to boolean dropPartition(identifier)

Identifier ident,
Map<String, String>[] partitions,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few cases here where partitions are referred to as Map<String, String> and a few times where they use the TablePartition class. I think it would probably make more sense if they were all TablePartition (the class) unless there is a significant reason for them not to be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TablePartition Contains the partition metadata, it's too heavy for this. As for Transform, it maybe a good choice if it can pass partition value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then is this partitionSpec and not "partition"?

Copy link
Contributor

@rdblue rdblue Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to decide how to pass the data that identifies a partition.

There have been a lot of problems over the years working with Hive partitions because values are coerced to and from String. Often, people get the conversions slightly wrong. I think a better approach is to use a row of values to pass partition data between Spark and a source. We already pass typed rows in the read and write APIs, so it would be reasonable to do so here as well.

One benefit of using a typed row to represent partition data is that we can directly use a listPartitions call for metadata queries.

This would also align more closely with how Spark handles partitions internally. From PartitioningUtils:

/**
 * Holds a directory in a partitioned collection of files as well as the partition values
 * in the form of a Row.  Before scanning, the files at `path` need to be enumerated.
 */
case class PartitionPath(values: InternalRow, path: Path)

case class PartitionSpec(
    partitionColumns: StructType,
    partitions: Seq[PartitionPath])

A table that implements SupportsPartitions could return a partitionType(): StructType that describes the partition rows it accepts and produces.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing: using a row to pass a tuple would make it possible to also get the partition that an input split (e.g., file) belongs to. That would be useful for storage-partitioned joins.

Copy link
Contributor Author

@jackylee-ch jackylee-ch Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Point! The partition identifiers can be written like PartitionSpec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually thinking that partitions would be identified more like PartitionPath. In this API, I'm not sure if the Path part of PartitionPath is needed, since sources may not need to expose it to Spark. (In Iceberg, for example, there is no partition path.)

I think just using an InternalRow to identify a partition is a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, I have changed it to InternalRow. The definition in PartitioningUtils is a very good idea, thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further think about this. InternalRow only contains the data of partition identifier, not partition columns. That means user must make partition data always in order. Is that reasonable to user? @rdblue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is reasonable. While this is a public API, the user here is a connector developer and they are expected to be able to produce InternalRow in other places. I think this is actually a good thing because we don't need to pass the names every time.

Boolean ignoreIfNotExists);

/**
* Override the specs of one or many existing table partitions, assuming they exist.
*
* @param ident a table identifier
* @param oldpartitions a list of string map for existing partitions to be renamed
* @param newPartitions a list of string map for new partitions
*/
void renamePartitions(
Identifier ident,
Map<String, String>[] oldpartitions,
Map<String, String>[] newPartitions);

/**
* Alter one or many table partitions whose specs that match those specified in `parts`,
* assuming the partitions exist.
*
* @param ident a table identifier
* @param partitions transforms to use for partitioning data in the table
*/
void alterPartitions(
Identifier ident,
TablePartition[] partitions);

/**
* Retrieve the metadata of a table partition, assuming it exists.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if they don't exist? Is an exception thrown. I don't know if this documentation style is consistent with how spark does it but I would expect something like:

Retrieve the metadata of a table partition.

@throws ParitionNotFoundException ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorrry, I will rewrite the document. The NoSuchPartition should be thrown.

*
* @param ident a table identifier
* @param partition a list of string map for existing partitions

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't exactly clear what the keys and values of the maps are here. It also does not appear to be a list which I find confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorrry, I will rewrite the document. Thanks for pay attention to this pr

*/
TablePartition getPartition(
Identifier ident,
Map<String, String> partition);

/**
* List the names of all partitions that belong to the specified table, assuming it exists.
*
* @param ident a table identifier
* @param partition a list of string map for existing partitions
*/
String[] listPartitionNames(
Identifier ident,
Map<String, String> partition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't clear what this does. What is a partition name? Are you referring to the "key" in Hive?

Copy link
Contributor Author

@jackylee-ch jackylee-ch Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will return the partition identifiers, may be filtered by a partition identifier, in this table. Not just the key in Hive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be changed to Row[] listPartitionIdentifiers(identifier); The identifier` in parameter is used to find partition identifiers in which the parameter is part of then..


/**
* List the metadata of all partitions that belong to the specified table, assuming it exists.
*
* @param ident a table identifier
* @param partition a list of string map for existing partitions
*/
TablePartition[] listPartitions(
Identifier ident,
Map<String, String> partition);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connector.catalog;

import java.util.HashMap;
import java.util.Map;

public class TablePartition {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partitions are already referred to in other parts of the catalog with the Transform class, do we need this as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your CR.
Transform doest not contains actual partition values and partition metadata. In many cases, we need to know the metadata of a partition.
Please correct me if there is something wrong.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class and method needs documentation, it might also help clarify how this differents from Transform

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reply, I will doing it.

private Map<String, String> partitionSpec;
private Map<String, String> parametes;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo "parameters"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks


public TablePartition(
Map<String, String> partitionSpec) {
this.partitionSpec = partitionSpec;
this.parametes = new HashMap<String, String>();
}

public TablePartition(
Map<String, String> partitionSpec,
Map<String, String> parametes) {
this.partitionSpec = partitionSpec;
this.parametes = parametes;
}

public Map<String, String> getPartitionSpec() {
return partitionSpec;
}

public void setPartitionSpec(Map<String, String> partitionSpec) {
this.partitionSpec = partitionSpec;
}

public Map<String, String> getParametes() {
return parametes;
}

public void setParameters(Map<String, String> parametes) {
this.parametes = parametes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector

import java.{lang, util}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitions, TablePartition}


/**
* This class is used to test SupportsPartitions API.
*/
class InMemoryPartitionCatalog extends InMemoryTableCatalog with SupportsPartitions {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

protected val memoryTablePartitions: util.Map[Identifier, mutable.HashSet[TablePartition]] =
new ConcurrentHashMap[Identifier, mutable.HashSet[TablePartition]]()

def createPartitions(
ident: Identifier,
partitions: Array[TablePartition],
ignoreIfExists: lang.Boolean = false): Unit = {
assert(tableExists(ident))
val table = loadTable(ident).asInstanceOf[InMemoryTable]
val tableSchema = table.schema.map(_.name)
checkPartitionKeysExists(tableSchema, partitions.map(_.getPartitionSpec))
val tablePartitions =
memoryTablePartitions.getOrDefault(ident, new mutable.HashSet[TablePartition]())
partitions.foreach(tablePartitions.add)
memoryTablePartitions.put(ident, tablePartitions)
}

def dropPartitions(
ident: Identifier,
partitions: Array[util.Map[String, String]],
ignoreIfNotExists: lang.Boolean = false): Unit = {
assert(tableExists(ident))
val table = loadTable(ident).asInstanceOf[InMemoryTable]
val tableSchema = table.schema.map(_.name)
checkPartitionKeysExists(tableSchema, partitions)
if (memoryTablePartitions.containsKey(ident)) {
val tablePartitions = memoryTablePartitions.get(ident)
tablePartitions.filter { tablePartition =>
partitions.contains(tablePartition.getPartitionSpec)
}.foreach(tablePartitions.remove)
memoryTablePartitions.put(ident, tablePartitions)
}
}

def renamePartitions(
ident: Identifier,
oldPartitions: Array[util.Map[String, String]],
newPartitions: Array[util.Map[String, String]]): Unit = {
assert(tableExists(ident))
val table = loadTable(ident).asInstanceOf[InMemoryTable]
val tableSchema = table.schema.map(_.name)
checkPartitionKeysExists(tableSchema, oldPartitions)
checkPartitionKeysExists(tableSchema, newPartitions)
if (memoryTablePartitions.containsKey(ident)) {
val tablePartitions = memoryTablePartitions.get(ident)
for (oldPartition <- oldPartitions;
newPartition <- newPartitions) {
tablePartitions.filter { tablePartition =>
oldPartition == tablePartition.getPartitionSpec
}.foreach { tablePartition =>
tablePartition.setPartitionSpec(newPartition)
}
}
memoryTablePartitions.put(ident, tablePartitions)
}
}

def alterPartitions(
ident: Identifier,
partitions: Array[TablePartition]): Unit = {
assert(tableExists(ident))
assert(tableExists(ident))
val table = loadTable(ident).asInstanceOf[InMemoryTable]
val tableSchema = table.schema.map(_.name)
checkPartitionKeysExists(tableSchema, partitions.map(_.getPartitionSpec))
if (memoryTablePartitions.containsKey(ident)) {
val tablePartitions = memoryTablePartitions.get(ident)
partitions.foreach { partition =>
tablePartitions.filter(_.getPartitionSpec == partition.getPartitionSpec)
.foreach(tablePartitions.remove)
tablePartitions.add(partition)
}
}
}

def getPartition(
ident: Identifier,
partition: util.Map[String, String]): TablePartition = {
assert(tableExists(ident))
val table = loadTable(ident).asInstanceOf[InMemoryTable]
val tableSchema = table.schema.map(_.name)
checkPartitionKeysExists(tableSchema, Array(partition))
memoryTablePartitions.getOrDefault(ident, mutable.HashSet.empty[TablePartition])
.find(_.getPartitionSpec == partition)
.getOrElse {
throw new NoSuchPartitionException(
ident.namespace().quoted,
ident.name(),
partition.asScala.toMap)
}
}

def listPartitionNames(
ident: Identifier,
partition: util.Map[String, String] = new util.HashMap()): Array[String] = {
assert(tableExists(ident))
memoryTablePartitions.getOrDefault(ident, mutable.HashSet.empty[TablePartition])
.filter { tablePartition =>
partition.asScala.toSet.subsetOf(tablePartition.getPartitionSpec.asScala.toSet)
}.map { tablePartition =>
tablePartition.getPartitionSpec.asScala.map { kv =>
s"${kv._1}=${kv._2}"
}.mkString("/")
}.toArray
}

def listPartitions(
ident: Identifier,
partition: util.Map[String, String] = new util.HashMap()): Array[TablePartition] = {
assert(tableExists(ident))
memoryTablePartitions.getOrDefault(ident, mutable.HashSet.empty[TablePartition])
.filter { tablePartition =>
partition.asScala.toSet.subsetOf(tablePartition.getPartitionSpec.asScala.toSet)
}.toArray
}

private def checkPartitionKeysExists(
tableSchema: Seq[String],
partitions: Array[util.Map[String, String]]): Unit = {
partitions.foreach { partition =>
val errorPartitionKeys = partition.keySet().asScala.filterNot(tableSchema.contains)
if (errorPartitionKeys.nonEmpty) {
throw new IllegalArgumentException(
s"Partition Keys not exists, table schema: ${tableSchema.mkString("{", ",", "}")}")
}
}
}
}
Loading