Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ public int hashCode() {

@Override
public NamedReference[] references() { return EMPTY_REFERENCE; }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect data source developers to use it? I thought we will add internal utils to convert between v1 and v2 filters, if it's for internal usage only.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for internal use only. I will remove this and add a method in DataSourceUtils to do the conversion.

return new org.apache.spark.sql.sources.AlwaysFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ public int hashCode() {

@Override
public NamedReference[] references() { return EMPTY_REFERENCE; }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.AlwaysTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ public And(Filter left, Filter right) {
public String toString() {
return String.format("(%s) AND (%s)", left.describe(), right.describe());
}

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.And(left.toV1(), right.toV1());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.connector.expressions.filter;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.CatalystTypeConverters;
import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;

Expand All @@ -37,4 +38,10 @@ public EqualNullSafe(NamedReference column, Literal<?> value) {

@Override
public String toString() { return this.column.describe() + " <=> " + value.describe(); }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.EqualNullSafe(
column.describe(), CatalystTypeConverters.convertToScala(value.value(), value.dataType()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.connector.expressions.filter;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.CatalystTypeConverters;
import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;

Expand All @@ -36,4 +37,10 @@ public EqualTo(NamedReference column, Literal<?> value) {

@Override
public String toString() { return column.describe() + " = " + value.describe(); }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.EqualTo(
(column).describe(), CatalystTypeConverters.convertToScala(value.value(), value.dataType()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.connector.expressions.filter;

import java.io.Serializable;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.NamedReference;
Expand All @@ -27,7 +29,7 @@
* @since 3.3.0
*/
@Evolving
public abstract class Filter implements Expression {
public abstract class Filter implements Expression, Serializable {

protected static final NamedReference[] EMPTY_REFERENCE = new NamedReference[0];

Expand All @@ -38,4 +40,9 @@ public abstract class Filter implements Expression {

@Override
public String describe() { return this.toString(); }

/**
* Returns a V1 Filter.
*/
public abstract org.apache.spark.sql.sources.Filter toV1();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.connector.expressions.filter;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.CatalystTypeConverters;
import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;

Expand All @@ -36,4 +37,11 @@ public GreaterThan(NamedReference column, Literal<?> value) {

@Override
public String toString() { return column.describe() + " > " + value.describe(); }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.GreaterThan(
column.describe(), CatalystTypeConverters.convertToScala(value.value(), value.dataType()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.connector.expressions.filter;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.CatalystTypeConverters;
import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;

Expand All @@ -36,4 +37,10 @@ public GreaterThanOrEqual(NamedReference column, Literal<?> value) {

@Override
public String toString() { return column.describe() + " >= " + value.describe(); }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.GreaterThanOrEqual(
column.describe(), CatalystTypeConverters.convertToScala(value.value(), value.dataType()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.stream.Collectors;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.CatalystTypeConverters;
import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;

Expand Down Expand Up @@ -73,4 +74,15 @@ public String toString() {

@Override
public NamedReference[] references() { return new NamedReference[] { column }; }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
Object[] array = new Object[values.length];
int index = 0;
for (Literal value: values) {
array[index] = CatalystTypeConverters.convertToScala(value.value(), value.dataType());
index++;
}
return new org.apache.spark.sql.sources.In(column.describe(), array);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,9 @@ public int hashCode() {

@Override
public NamedReference[] references() { return new NamedReference[] { column }; }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.IsNotNull(column.describe());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,9 @@ public int hashCode() {

@Override
public NamedReference[] references() { return new NamedReference[] { column }; }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.IsNull(column.describe());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.connector.expressions.filter;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.CatalystTypeConverters;
import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;

Expand All @@ -36,4 +37,10 @@ public LessThan(NamedReference column, Literal<?> value) {

@Override
public String toString() { return column.describe() + " < " + value.describe(); }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.LessThan(
column.describe(), CatalystTypeConverters.convertToScala(value.value(), value.dataType()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.connector.expressions.filter;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.CatalystTypeConverters;
import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;

Expand All @@ -36,4 +37,10 @@ public LessThanOrEqual(NamedReference column, Literal<?> value) {

@Override
public String toString() { return column.describe() + " <= " + value.describe(); }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.LessThanOrEqual(
column.describe(), CatalystTypeConverters.convertToScala(value.value(), value.dataType()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ public int hashCode() {

@Override
public NamedReference[] references() { return child.references(); }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.Not(child.toV1());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ public Or(Filter left, Filter right) {
public String toString() {
return String.format("(%s) OR (%s)", left.describe(), right.describe());
}

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.Or(left.toV1(), right.toV1());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ public StringContains(NamedReference column, UTF8String value) {

@Override
public String toString() { return "STRING_CONTAINS(" + column.describe() + ", " + value + ")"; }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.StringContains(column.describe(), value.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ public StringEndsWith(NamedReference column, UTF8String value) {

@Override
public String toString() { return "STRING_ENDS_WITH(" + column.describe() + ", " + value + ")"; }

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.StringEndsWith(column.describe(), value.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ public StringStartsWith(NamedReference column, UTF8String value) {
public String toString() {
return "STRING_STARTS_WITH(" + column.describe() + ", " + value + ")";
}

@Override
public org.apache.spark.sql.sources.Filter toV1() {
return new org.apache.spark.sql.sources.StringStartsWith(column.describe(), value.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, UnboundFunction}
import org.apache.spark.sql.connector.expressions.{NamedReference, Transform}
import org.apache.spark.sql.connector.expressions.filter.{Filter => V2Filter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED, LEGACY_CTE_PRECEDENCE_POLICY}
import org.apache.spark.sql.sources.Filter
Expand Down Expand Up @@ -1136,6 +1137,11 @@ object QueryCompilationErrors {
s"Fail to rebuild expression: missing key $filter in `translatedFilterToExpr`")
}

def failedToRebuildExpressionError(filter: V2Filter): Throwable = {
new AnalysisException(
s"Fail to rebuild expression: missing key $filter in `translatedFilterToExpr`")
}

def dataTypeUnsupportedByDataSourceError(format: String, field: StructField): Throwable = {
new AnalysisException(
s"$format data source does not support ${field.dataType.catalogString} data type.")
Expand Down Expand Up @@ -2392,4 +2398,8 @@ object QueryCompilationErrors {
errorClass = "INVALID_JSON_SCHEMA_MAPTYPE",
messageParameters = Array(schema.toString))
}

def invalidDataTypeForFilterValue(value: Any): Throwable = {
new AnalysisException(s"Filter value $value has invalid data type")
}
}
Loading