Skip to content

Commit 80e0b11

Browse files
author
Alexey Kudinkin
committed
Cloned UTF8StringBuilder to accommodate for Spark 2
1 parent d92c91d commit 80e0b11

3 files changed

Lines changed: 103 additions & 3 deletions

File tree

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.spark.sql.Row;
3030
import org.apache.spark.sql.catalyst.InternalRow;
3131
import org.apache.spark.sql.types.StructType;
32-
import org.apache.spark.unsafe.UTF8StringBuilder;
32+
import org.apache.hudi.unsafe.UTF8StringBuilder;
3333
import org.apache.spark.unsafe.types.UTF8String;
3434
import scala.Function1;
3535

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.spark.sql.Row;
2424
import org.apache.spark.sql.catalyst.InternalRow;
2525
import org.apache.spark.sql.types.StructType;
26-
import org.apache.spark.unsafe.UTF8StringBuilder;
26+
import org.apache.hudi.unsafe.UTF8StringBuilder;
2727
import org.apache.spark.unsafe.types.UTF8String;
2828

2929
import java.util.Arrays;
@@ -54,7 +54,7 @@ public ComplexKeyGenerator(TypedProperties props) {
5454
.map(String::trim)
5555
.filter(s -> !s.isEmpty())
5656
.collect(Collectors.toList());
57-
complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props);
57+
this.complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props);
5858
}
5959

6060
@Override
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hudi.unsafe;
19+
20+
import org.apache.spark.unsafe.Platform;
21+
import org.apache.spark.unsafe.array.ByteArrayMethods;
22+
import org.apache.spark.unsafe.types.UTF8String;
23+
24+
/**
25+
* A helper class to write {@link UTF8String}s to an internal buffer and build the concatenated
26+
* {@link UTF8String} at the end.
27+
*/
28+
public class UTF8StringBuilder {
29+
30+
private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
31+
32+
private byte[] buffer;
33+
private int cursor = Platform.BYTE_ARRAY_OFFSET;
34+
35+
public UTF8StringBuilder() {
36+
// Since initial buffer size is 16 in `StringBuilder`, we set the same size here
37+
this(16);
38+
}
39+
40+
public UTF8StringBuilder(int initialSize) {
41+
if (initialSize < 0) {
42+
throw new IllegalArgumentException("Size must be non-negative");
43+
}
44+
if (initialSize > ARRAY_MAX) {
45+
throw new IllegalArgumentException(
46+
"Size " + initialSize + " exceeded maximum size of " + ARRAY_MAX);
47+
}
48+
this.buffer = new byte[initialSize];
49+
}
50+
51+
// Grows the buffer by at least `neededSize`
52+
private void grow(int neededSize) {
53+
if (neededSize > ARRAY_MAX - totalSize()) {
54+
throw new UnsupportedOperationException(
55+
"Cannot grow internal buffer by size " + neededSize + " because the size after growing " +
56+
"exceeds size limitation " + ARRAY_MAX);
57+
}
58+
final int length = totalSize() + neededSize;
59+
if (buffer.length < length) {
60+
int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
61+
final byte[] tmp = new byte[newLength];
62+
Platform.copyMemory(
63+
buffer,
64+
Platform.BYTE_ARRAY_OFFSET,
65+
tmp,
66+
Platform.BYTE_ARRAY_OFFSET,
67+
totalSize());
68+
buffer = tmp;
69+
}
70+
}
71+
72+
private int totalSize() {
73+
return cursor - Platform.BYTE_ARRAY_OFFSET;
74+
}
75+
76+
public void append(UTF8String value) {
77+
grow(value.numBytes());
78+
value.writeToMemory(buffer, cursor);
79+
cursor += value.numBytes();
80+
}
81+
82+
public void append(String value) {
83+
append(UTF8String.fromString(value));
84+
}
85+
86+
public void appendBytes(Object base, long offset, int length) {
87+
grow(length);
88+
Platform.copyMemory(
89+
base,
90+
offset,
91+
buffer,
92+
cursor,
93+
length);
94+
cursor += length;
95+
}
96+
97+
public UTF8String build() {
98+
return UTF8String.fromBytes(buffer, 0, totalSize());
99+
}
100+
}

0 commit comments

Comments
 (0)