Skip to content

Commit 18e2264

Browse files
fix: DH-19929: support Avro String[] types (#7291)
Cherry-pick of #7149 Co-authored-by: devonallison <[email protected]>
1 parent f8ee1ac commit 18e2264

4 files changed

Lines changed: 155 additions & 4 deletions

File tree

extensions/kafka/src/main/java/io/deephaven/kafka/ingest/GenericRecordArrayFieldCopier.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.avro.generic.GenericRecord;
1616

1717
import java.lang.reflect.Array;
18+
import java.util.function.Function;
1819
import java.util.regex.Pattern;
1920

2021
import static io.deephaven.util.QueryConstants.*;
@@ -33,14 +34,20 @@ public GenericRecordArrayFieldCopier(
3334

3435
private static <T> T[] convertObjectArray(final GenericArray<?> ga, final T[] emptyArray,
3536
final Class<T> componentType) {
37+
return convertObjectArray(ga, emptyArray, componentType, componentType::cast);
38+
}
39+
40+
private static <X, T> T[] convertObjectArray(final GenericArray<X> ga, final T[] emptyArray,
41+
final Class<T> componentType, final Function<X, T> f) {
3642
final int gaSize = ga.size();
3743
if (gaSize == 0) {
3844
return emptyArray;
3945
}
46+
// noinspection unchecked
4047
final T[] out = (T[]) Array.newInstance(componentType, ga.size());
4148
int i = 0;
42-
for (Object o : ga) {
43-
out[i] = componentType.cast(o);
49+
for (X o : ga) {
50+
out[i] = f.apply(o);
4451
++i;
4552
}
4653
return out;
@@ -131,7 +138,12 @@ static ArrayConverter makeFor(Class<?> componentType) {
131138
return (GenericArray<?> ga) -> convertObjectArray(ga, EMPTY_BOOLEANBOXED_ARRAY, boolean.class);
132139
}
133140
if (componentType.equals(String.class)) {
134-
return (GenericArray<?> ga) -> convertObjectArray(ga, EMPTY_STRING_ARRAY, String.class);
141+
// org.apache.avro.generic.GenericData.StringType
142+
// org.apache.avro.util.Utf8 implements CharSequence so from a reading perspective, we should be able
143+
// to safely cast to a CharSequence. In the case where it's already a String, CharSequence::toString
144+
// will be a no-op.
145+
return ga -> convertObjectArray((GenericArray<CharSequence>) ga, EMPTY_STRING_ARRAY, String.class,
146+
CharSequence::toString);
135147
}
136148
return (GenericArray<?> ga) -> convertObjectArray(ga, EMPTY_OBJECT_ARRAY, Object.class);
137149
}

extensions/kafka/src/main/java/io/deephaven/kafka/ingest/GenericRecordStringFieldCopier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public void copyField(
2929
for (int ii = 0; ii < length; ++ii) {
3030
final GenericRecord record = (GenericRecord) inputChunk.get(ii + sourceOffset);
3131
final Object value = GenericRecordUtil.getPath(record, fieldPath);
32-
output.set(ii + destOffset, value == null ? null : Objects.toString(value));
32+
output.set(ii + destOffset, value == null ? null : value.toString());
3333
}
3434
}
3535
}

extensions/kafka/src/test/java/io/deephaven/kafka/ingest/TestAvroAdapter.java

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import junit.framework.TestCase;
1414
import org.apache.avro.Schema;
1515
import org.apache.avro.generic.GenericData;
16+
import org.apache.avro.util.Utf8;
1617
import org.jetbrains.annotations.NotNull;
1718
import org.junit.Test;
1819

@@ -192,4 +193,116 @@ public void testTimestamp() throws IOException {
192193
}
193194
}
194195
}
196+
197+
@Test
198+
public void testStringArray() throws IOException {
199+
final Schema avroSchema = getSchema("stringarray.avsc");
200+
201+
final String[] names = new String[] {"A", "B", "C", "Result"};
202+
final Class<?>[] types = new Class[] {int.class, String.class, double.class, String[].class};
203+
final TableDefinition definition = TableDefinition.from(Arrays.asList(names), Arrays.asList(types));
204+
205+
final GenericData.Record genericRecord = new GenericData.Record(avroSchema);
206+
genericRecord.put("A", 123);
207+
genericRecord.put("B", "hello");
208+
genericRecord.put("C", 45.67);
209+
genericRecord.put("Result",
210+
new GenericData.Array<>(avroSchema.getField("Result").schema(), Arrays.asList("pass", "ok", "done")));
211+
212+
final Map<String, String> colMap = new HashMap<>();
213+
colMap.put("A", "A");
214+
colMap.put("B", "B");
215+
colMap.put("C", "C");
216+
colMap.put("Result", "Result");
217+
218+
try (final WritableObjectChunk<Object, Values> inputValues =
219+
WritableObjectChunk.makeWritableChunk(1)) {
220+
inputValues.setSize(0);
221+
inputValues.add(genericRecord);
222+
223+
final WritableChunk[] output = new WritableChunk[names.length];
224+
try (final SafeCloseableArray ignored = new SafeCloseableArray(output)) {
225+
output[0] = WritableIntChunk.makeWritableChunk(1);
226+
output[1] = WritableObjectChunk.makeWritableChunk(1);
227+
output[2] = WritableDoubleChunk.makeWritableChunk(1);
228+
output[3] = WritableObjectChunk.makeWritableChunk(1);
229+
230+
for (WritableChunk wc : output) {
231+
wc.setSize(0);
232+
}
233+
234+
final GenericRecordChunkAdapter adapter = GenericRecordChunkAdapter.make(definition,
235+
(idx) -> output[idx].getChunkType(), colMap, Pattern.compile(Pattern.quote(".")), avroSchema,
236+
true);
237+
adapter.handleChunk(inputValues, output);
238+
239+
TestCase.assertEquals(1, output[0].size());
240+
TestCase.assertEquals(1, output[1].size());
241+
TestCase.assertEquals(1, output[2].size());
242+
TestCase.assertEquals(1, output[3].size());
243+
244+
TestCase.assertEquals(123, output[0].asIntChunk().get(0));
245+
TestCase.assertEquals("hello", output[1].asObjectChunk().get(0));
246+
TestCase.assertEquals(45.67, output[2].asDoubleChunk().get(0));
247+
TestCase.assertTrue(Arrays.equals(new String[] {"pass", "ok", "done"},
248+
(String[]) output[3].asObjectChunk().get(0)));
249+
}
250+
}
251+
}
252+
253+
@Test
254+
public void testUtf8Array() throws IOException {
255+
final Schema avroSchema = getSchema("stringarray.avsc");
256+
257+
final String[] names = new String[] {"A", "B", "C", "Result"};
258+
final Class<?>[] types = new Class[] {int.class, String.class, double.class, String[].class};
259+
final TableDefinition definition = TableDefinition.from(Arrays.asList(names), Arrays.asList(types));
260+
261+
final GenericData.Record genericRecord = new GenericData.Record(avroSchema);
262+
genericRecord.put("A", 123);
263+
genericRecord.put("B", "hello");
264+
genericRecord.put("C", 45.67);
265+
genericRecord.put("Result", new GenericData.Array<>(avroSchema.getField("Result").schema(),
266+
Arrays.asList(new Utf8("pass"), new Utf8("ok"), new Utf8("done"))));
267+
268+
final Map<String, String> colMap = new HashMap<>();
269+
colMap.put("A", "A");
270+
colMap.put("B", "B");
271+
colMap.put("C", "C");
272+
colMap.put("Result", "Result");
273+
274+
try (final WritableObjectChunk<Object, Values> inputValues =
275+
WritableObjectChunk.makeWritableChunk(1)) {
276+
inputValues.setSize(0);
277+
inputValues.add(genericRecord);
278+
279+
final WritableChunk[] output = new WritableChunk[names.length];
280+
try (final SafeCloseableArray ignored = new SafeCloseableArray(output)) {
281+
output[0] = WritableIntChunk.makeWritableChunk(1);
282+
output[1] = WritableObjectChunk.makeWritableChunk(1);
283+
output[2] = WritableDoubleChunk.makeWritableChunk(1);
284+
output[3] = WritableObjectChunk.makeWritableChunk(1);
285+
286+
for (WritableChunk wc : output) {
287+
wc.setSize(0);
288+
}
289+
290+
final GenericRecordChunkAdapter adapter = GenericRecordChunkAdapter.make(definition,
291+
(idx) -> output[idx].getChunkType(), colMap, Pattern.compile(Pattern.quote(".")), avroSchema,
292+
true);
293+
adapter.handleChunk(inputValues, output);
294+
295+
TestCase.assertEquals(1, output[0].size());
296+
TestCase.assertEquals(1, output[1].size());
297+
TestCase.assertEquals(1, output[2].size());
298+
TestCase.assertEquals(1, output[3].size());
299+
300+
TestCase.assertEquals(123, output[0].asIntChunk().get(0));
301+
TestCase.assertEquals("hello", output[1].asObjectChunk().get(0));
302+
TestCase.assertEquals(45.67, output[2].asDoubleChunk().get(0));
303+
TestCase.assertTrue(Arrays.equals(new String[] {"pass", "ok", "done"},
304+
(String[]) output[3].asObjectChunk().get(0)));
305+
}
306+
}
307+
}
195308
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"fields": [
3+
{
4+
"name": "A",
5+
"type": "int"
6+
},
7+
{
8+
"name": "B",
9+
"type": "string"
10+
},
11+
{
12+
"name": "C",
13+
"type": "double"
14+
},
15+
{
16+
"name": "Result",
17+
"type": {
18+
"items": "string",
19+
"type": "array"
20+
}
21+
}
22+
],
23+
"name": "stringarray",
24+
"namespace": "com.example.avro",
25+
"type": "record"
26+
}

0 commit comments

Comments
 (0)