From 38db997af550b3cccfb636c497203f0a0390fee2 Mon Sep 17 00:00:00 2001 From: Ted Jenks Date: Fri, 12 Jul 2024 18:33:20 +0800 Subject: [PATCH 1/3] [SPARK-47307][SQL] Add a config to optionally chunk base64 strings Follow up #45408 ### What changes were proposed in this pull request? [[SPARK-47307](https://issues.apache.org/jira/browse/SPARK-47307)] Add a config to optionally chunk base64 strings ### Why are the changes needed? In #35110, it was incorrectly asserted that: > ApacheCommonBase64 obeys http://www.ietf.org/rfc/rfc2045.txt This is not true as the previous code called: ```java public static byte[] encodeBase64(byte[] binaryData) ``` Which states: > Encodes binary data using the base64 algorithm but does not chunk the output. However, the RFC 2045 (MIME) base64 encoder does chunk by default. This now means that any Spark encoded base64 strings cannot be decoded by encoders that do not implement RFC 2045. The docs state RFC 4648. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test suite. ### Was this patch authored or co-authored using generative AI tooling? No Closes #47303 from wForget/SPARK-47307. Lead-authored-by: Ted Jenks Co-authored-by: wforget <643348094@qq.com> Co-authored-by: Kent Yao Co-authored-by: Ted Chester Jenks Signed-off-by: Kent Yao (cherry picked from commit 8d3d4f9b900dadede3b8e33af830e5ef66682923) --- .../expressions/stringExpressions.scala | 40 +++++++++++++------ .../apache/spark/sql/internal/SQLConf.scala | 11 +++++ .../expressions/StringExpressionsSuite.scala | 13 ++++++ 3 files changed, 52 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 46f8e1a9d673d..98a3b71a28627 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -2419,24 +2419,40 @@ case class Chr(child: Expression) """, since = "1.5.0", group = "string_funcs") -case class Base64(child: Expression) - extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant { +case class Base64(child: Expression, chunkBase64: Boolean) + extends UnaryExpression with RuntimeReplaceable with ImplicitCastInputTypes { + + def this(expr: Expression) = this(expr, SQLConf.get.chunkBase64StringEnabled) override def dataType: DataType = StringType override def inputTypes: Seq[DataType] = Seq(BinaryType) - protected override def nullSafeEval(bytes: Any): Any = { - UTF8String.fromBytes(JBase64.getMimeEncoder.encode(bytes.asInstanceOf[Array[Byte]])) - } + override def replacement: Expression = StaticInvoke( + classOf[Base64], + dataType, + "encode", + Seq(child, Literal(chunkBase64, BooleanType)), + Seq(BinaryType, BooleanType)) - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - nullSafeCodeGen(ctx, ev, (child) => { - s"""${ev.value} = UTF8String.fromBytes( - ${classOf[JBase64].getName}.getMimeEncoder().encode($child)); - """}) - } + override def toString: String = s"$prettyName($child)" - override protected def withNewChildInternal(newChild: Expression): Base64 = copy(child = newChild) + override protected def withNewChildInternal(newChild: Expression): Expression = + copy(child = newChild) +} + +object Base64 { + def apply(expr: Expression): Base64 = new Base64(expr) + + private lazy val nonChunkEncoder = JBase64.getMimeEncoder(-1, Array()) + + def encode(input: Array[Byte], chunkBase64: Boolean): UTF8String = { + val encoder = if (chunkBase64) { + JBase64.getMimeEncoder + } else { + nonChunkEncoder + } + UTF8String.fromBytes(encoder.encode(input)) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 275ec71cb0615..55f80645228db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3229,6 +3229,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val CHUNK_BASE64_STRING_ENABLED = buildConf("spark.sql.legacy.chunkBase64String.enabled") + .internal() + .doc("Whether to truncate string generated by the `Base64` function. When true, base64" + + " strings generated by the base64 function are chunked into lines of at most 76" + + " characters. When false, the base64 strings are not chunked.") + .version("3.5.2") + .booleanConf + .createWithDefault(false) + val ENABLE_DEFAULT_COLUMNS = buildConf("spark.sql.defaultColumn.enabled") .internal() @@ -5111,6 +5120,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def ansiRelationPrecedence: Boolean = ansiEnabled && getConf(ANSI_RELATION_PRECEDENCE) + def chunkBase64StringEnabled: Boolean = getConf(CHUNK_BASE64_STRING_ENABLED) + def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match { case "TIMESTAMP_LTZ" => // For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL TIME ZONE diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala index 006c4a7805688..75224bf33f53e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala @@ -506,6 +506,19 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { GenerateUnsafeProjection.generate(StringDecode(b, Literal("\"quote")) :: Nil) } + test("SPARK-47307: base64 encoding without chunking") { + val longString = "a" * 58 + val encoded = "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYQ==" + withSQLConf(SQLConf.CHUNK_BASE64_STRING_ENABLED.key -> "false") { + checkEvaluation(Base64(Literal(longString.getBytes)), encoded) + } + val chunkEncoded = + s"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\r\nYQ==" + withSQLConf(SQLConf.CHUNK_BASE64_STRING_ENABLED.key -> "true") { + checkEvaluation(Base64(Literal(longString.getBytes)), chunkEncoded) + } + } + test("initcap unit test") { checkEvaluation(InitCap(Literal.create(null, StringType)), null) checkEvaluation(InitCap(Literal("a b")), "A B") From 0662d662bdacc6155ff8e60c1feac14b99fb5bc0 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Fri, 12 Jul 2024 19:42:00 +0800 Subject: [PATCH 2/3] regenerate golden files --- .../query-tests/explain-results/function_base64.explain | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_base64.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_base64.explain index bc3c6e4bb2bcf..99d842189c659 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_base64.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_base64.explain @@ -1,2 +1,2 @@ -Project [base64(cast(g#0 as binary)) AS base64(g)#0] +Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.Base64, StringType, encode, cast(g#0 as binary), false, BinaryType, BooleanType, true, true, true) AS base64(g)#0] +- LocalRelation , [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] From f1da1fba4044abbb90e331cd626cc394d33970d2 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Mon, 15 Jul 2024 10:45:25 +0800 Subject: [PATCH 3/3] fix --- .../sql/catalyst/expressions/ExpressionEvalHelper.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 5be0cae4a22f1..84520846a8d48 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -71,10 +71,15 @@ trait ExpressionEvalHelper extends ScalaCheckDrivenPropertyChecks with PlanTestB new ArrayBasedMapData(keyArray, valueArray) } + protected def replace(expr: Expression): Expression = expr match { + case r: RuntimeReplaceable => replace(r.replacement) + case _ => expr.mapChildren(replace) + } + private def prepareEvaluation(expression: Expression): Expression = { val serializer = new JavaSerializer(new SparkConf()).newInstance val resolver = ResolveTimeZone - val expr = resolver.resolveTimeZones(expression) + val expr = resolver.resolveTimeZones(replace(expression)) assert(expr.resolved) serializer.deserialize(serializer.serialize(expr)) }