diff --git a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp index eebb65879695..1ae6e9b6652c 100644 --- a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp +++ b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp @@ -281,14 +281,31 @@ static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr &>(*offsets_column).getData(); offsets_data.reserve(arrow_column->length()); - for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i) + uint64_t start_offset = 0u; + + for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i) { arrow::ListArray & list_chunk = dynamic_cast(*(arrow_column->chunk(chunk_i))); auto arrow_offsets_array = list_chunk.offsets(); auto & arrow_offsets = dynamic_cast(*arrow_offsets_array); - auto start = offsets_data.back(); + + /* + * It seems like arrow::ListArray::values() (nested column data) might or might not be shared across chunks. + * When it is shared, the offsets will be monotonically increasing. Otherwise, the offsets will be zero based. + * In order to account for both cases, the starting offset is updated whenever a zero-based offset is found. + * More info can be found in: https://lists.apache.org/thread/rrwfb9zo2dc58dhd9rblf20xd7wmy7jm and + * https://github.com/ClickHouse/ClickHouse/pull/43297 + * */ + if (list_chunk.offset() == 0) + { + start_offset = offsets_data.back(); + } + for (int64_t i = 1; i < arrow_offsets.length(); ++i) - offsets_data.emplace_back(start + arrow_offsets.Value(i)); + { + auto offset = arrow_offsets.Value(i); + offsets_data.emplace_back(start_offset + offset); + } } return offsets_column; } @@ -316,8 +333,23 @@ static std::shared_ptr getNestedArrowColumn(std::shared_ptr for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i) { arrow::ListArray & list_chunk = dynamic_cast(*(arrow_column->chunk(chunk_i))); - std::shared_ptr chunk = list_chunk.values(); - array_vector.emplace_back(std::move(chunk)); + + /* + * It seems like arrow::ListArray::values() (nested column data) might or might not be shared across chunks. + * Therefore, simply appending arrow::ListArray::values() could lead to duplicated data to be appended. + * To properly handle this, arrow::ListArray::values() needs to be sliced based on the chunk offsets. + * arrow::ListArray::Flatten does that. More info on: https://lists.apache.org/thread/rrwfb9zo2dc58dhd9rblf20xd7wmy7jm and + * https://github.com/ClickHouse/ClickHouse/pull/43297 + * */ + auto flatten_result = list_chunk.Flatten(); + if (flatten_result.ok()) + { + array_vector.emplace_back(flatten_result.ValueOrDie()); + } + else + { + throw Exception(ErrorCodes::INCORRECT_DATA, "Failed to flatten chunk '{}' of column of type '{}' ", chunk_i, arrow_column->type()->id()); + } } return std::make_shared(array_vector); } diff --git a/tests/queries/0_stateless/02481_parquet_int_list_multiple_chunks.reference b/tests/queries/0_stateless/02481_parquet_int_list_multiple_chunks.reference new file mode 100644 index 000000000000..285856e363a1 --- /dev/null +++ b/tests/queries/0_stateless/02481_parquet_int_list_multiple_chunks.reference @@ -0,0 +1,3 @@ +Parquet +3d94071a2fe62a3b3285f170ca6f42e5 - +70000 diff --git a/tests/queries/0_stateless/02481_parquet_int_list_multiple_chunks.sh b/tests/queries/0_stateless/02481_parquet_int_list_multiple_chunks.sh new file mode 100755 index 000000000000..c2c6f6898510 --- /dev/null +++ b/tests/queries/0_stateless/02481_parquet_int_list_multiple_chunks.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# Tags: no-ubsan, no-fasttest + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +echo "Parquet" + +# File generated with the below script + +#import pyarrow as pa +#import pyarrow.parquet as pq +#import random +# +# +#def gen_array(offset): +# array = [] +# array_length = random.randint(0, 9) +# for i in range(array_length): +# array.append(i + offset) +# +# return array +# +# +#def gen_arrays(number_of_arrays): +# list_of_arrays = [] +# for i in range(number_of_arrays): +# list_of_arrays.append(gen_array(i)) +# return list_of_arrays +# +#arr = pa.array(gen_arrays(70000)) +#table = pa.table([arr], ["arr"]) +#pq.write_table(table, "int-list-zero-based-chunked-array.parquet") + +DATA_FILE=$CUR_DIR/data_parquet/int-list-zero-based-chunked-array.parquet +${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_load" +${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_load (arr Array(Int64)) ENGINE = Memory" +cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "INSERT INTO parquet_load FORMAT Parquet" +${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_load" | md5sum +${CLICKHOUSE_CLIENT} --query="SELECT count() FROM parquet_load" +${CLICKHOUSE_CLIENT} --query="drop table parquet_load" \ No newline at end of file diff --git a/tests/queries/0_stateless/02481_parquet_list_monotonically_increasing_offsets.reference b/tests/queries/0_stateless/02481_parquet_list_monotonically_increasing_offsets.reference new file mode 100644 index 000000000000..2db066c0f875 --- /dev/null +++ b/tests/queries/0_stateless/02481_parquet_list_monotonically_increasing_offsets.reference @@ -0,0 +1,3 @@ +Parquet +e1cfe4265689ead763b18489b363344d - +39352 diff --git a/tests/queries/0_stateless/02481_parquet_list_monotonically_increasing_offsets.sh b/tests/queries/0_stateless/02481_parquet_list_monotonically_increasing_offsets.sh new file mode 100755 index 000000000000..47245eeb9401 --- /dev/null +++ b/tests/queries/0_stateless/02481_parquet_list_monotonically_increasing_offsets.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +# Tags: no-ubsan, no-fasttest + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +echo "Parquet" + +DATA_FILE=$CUR_DIR/data_parquet/list_monotonically_increasing_offsets.parquet +${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_load" +${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_load (list Array(Int64), json Nullable(String)) ENGINE = Memory" +cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "INSERT INTO parquet_load FORMAT Parquet" +${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_load" | md5sum +${CLICKHOUSE_CLIENT} --query="SELECT count() FROM parquet_load" +${CLICKHOUSE_CLIENT} --query="drop table parquet_load" \ No newline at end of file diff --git a/tests/queries/0_stateless/data_parquet/int-list-zero-based-chunked-array.parquet b/tests/queries/0_stateless/data_parquet/int-list-zero-based-chunked-array.parquet new file mode 100644 index 000000000000..2eb3ba3ab150 Binary files /dev/null and b/tests/queries/0_stateless/data_parquet/int-list-zero-based-chunked-array.parquet differ diff --git a/tests/queries/0_stateless/data_parquet/list_monotonically_increasing_offsets.parquet b/tests/queries/0_stateless/data_parquet/list_monotonically_increasing_offsets.parquet new file mode 100644 index 000000000000..1c23e27db658 Binary files /dev/null and b/tests/queries/0_stateless/data_parquet/list_monotonically_increasing_offsets.parquet differ