Skip to content
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2682,26 +2682,45 @@ case class Chr(child: Expression)
""",
since = "1.5.0",
group = "string_funcs")
case class Base64(child: Expression)
case class Base64(child: Expression, chunkBase64: Boolean)
extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant {

def this(expr: Expression) = this(expr, SQLConf.get.chunkBase64StringEnabled)

lazy val encoder: JBase64.Encoder = if (chunkBase64) {
JBase64.getMimeEncoder
} else {
JBase64.getMimeEncoder(-1, Array())
}

override def dataType: DataType = SQLConf.get.defaultStringType
override def inputTypes: Seq[DataType] = Seq(BinaryType)

protected override def nullSafeEval(bytes: Any): Any = {
UTF8String.fromBytes(JBase64.getMimeEncoder.encode(bytes.asInstanceOf[Array[Byte]]))
UTF8String.fromBytes(encoder.encode(bytes.asInstanceOf[Array[Byte]]))
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, (child) => {
s"""${ev.value} = UTF8String.fromBytes(
${classOf[JBase64].getName}.getMimeEncoder().encode($child));
"""})
if (chunkBase64) {
s"""${ev.value} = UTF8String.fromBytes(
${classOf[JBase64].getName}.getMimeEncoder().encode($child));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use the encoder directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use the encoder directly?

java.util.Base64$Encoder is not serializable.

-- !query
select base64(c7), base64(c8), base64(v), ascii(s) from char_tbl4
-- !query schema
struct<>
-- !query output
java.io.NotSerializableException
java.util.Base64$Encoder
Serialization stack:
	- object not serializable (class: java.util.Base64$Encoder, value: java.util.Base64$Encoder@423ed07f)
	- element of array (index: 2)
	- array (class [Ljava.lang.Object;, size 5)
	- field (class: org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory, name: org$apache$spark$sql$execution$WholeStageCodegenEvaluatorFactory$$references, type: class [Ljava.lang.Object;)
	- object (class org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory, org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory@2fd9633e)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/execution/WholeStageCodegenEvaluatorFactory;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2458/0x000002cf3e949c30, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2458/0x000002cf3e949c30@603a0fa7)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can follow the work I have done for Encode to make it RuntimeReplaceable with StaticInvoke

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can follow the work I have done for Encode to make it RuntimeReplaceable with StaticInvoke

Thanks, I will try it

"""
} else {
s"""${ev.value} = UTF8String.fromBytes(
${classOf[JBase64].getName}.getMimeEncoder(-1, new byte[0]).encode($child));
"""
}
})
}

override protected def withNewChildInternal(newChild: Expression): Base64 = copy(child = newChild)
}

object Base64 {
def apply(expr: Expression): Base64 = new Base64(expr)
}

/**
* Converts the argument from a base 64 string to BINARY.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3525,6 +3525,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val CHUNK_BASE_64_STRING_ENABLED = buildConf("spark.sql.chunkBase64String.enabled")
.doc("Whether to truncate string generated by the `Base64` function. When true, base64" +
" strings generated by the base64 function are chunked into lines of at most 76" +
" characters. When false, the base64 strings are not chunked.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

val ENABLE_DEFAULT_COLUMNS =
buildConf("spark.sql.defaultColumn.enabled")
.internal()
Expand Down Expand Up @@ -5856,6 +5864,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def ansiRelationPrecedence: Boolean = ansiEnabled && getConf(ANSI_RELATION_PRECEDENCE)

def chunkBase64StringEnabled: Boolean = getConf(CHUNK_BASE_64_STRING_ENABLED)

def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
case "TIMESTAMP_LTZ" =>
// For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL TIME ZONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,19 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
GenerateUnsafeProjection.generate(StringDecode(b, Literal("\"quote")).replacement :: Nil)
}

test("SPARK-47307: base64 encoding without chunking") {
val longString = "a" * 58
val encoded = "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYQ=="
withSQLConf(SQLConf.CHUNK_BASE_64_STRING_ENABLED.key -> "false") {
checkEvaluation(Base64(Literal(longString.getBytes)), encoded)
}
val chunkEncoded =
s"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\r\nYQ=="
withSQLConf(SQLConf.CHUNK_BASE_64_STRING_ENABLED.key -> "true") {
checkEvaluation(Base64(Literal(longString.getBytes)), chunkEncoded)
}
}

test("initcap unit test") {
checkEvaluation(InitCap(Literal.create(null, StringType)), null)
checkEvaluation(InitCap(Literal("a b")), "A B")
Expand Down