diff --git a/src/DataTypes/NestedUtils.cpp b/src/DataTypes/NestedUtils.cpp index e31f23e18c59..d6380d2fdbca 100644 --- a/src/DataTypes/NestedUtils.cpp +++ b/src/DataTypes/NestedUtils.cpp @@ -48,11 +48,8 @@ std::string concatenateName(const std::string & nested_table_name, const std::st */ std::pair splitName(const std::string & name, bool reverse) { - auto idx = (reverse ? name.find_last_of('.') : name.find_first_of('.')); - if (idx == std::string::npos || idx == 0 || idx + 1 == name.size()) - return {name, {}}; - - return {name.substr(0, idx), name.substr(idx + 1)}; + auto res = splitName(std::string_view(name), reverse); + return {std::string(res.first), std::string(res.second)}; } std::pair splitName(std::string_view name, bool reverse) diff --git a/src/DataTypes/NestedUtils.h b/src/DataTypes/NestedUtils.h index 8f2ab20ac879..894af62092ba 100644 --- a/src/DataTypes/NestedUtils.h +++ b/src/DataTypes/NestedUtils.h @@ -17,6 +17,8 @@ namespace Nested std::string concatenateName(const std::string & nested_table_name, const std::string & nested_field_name); /// Splits name of compound identifier by first/last dot (depending on 'reverse' parameter). + /// If the name is not nested (no dot or dot at start/end), + /// returns {name, ""}. std::pair splitName(const std::string & name, bool reverse = false); std::pair splitName(std::string_view name, bool reverse = false); diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index 365df6585ae0..fb8204b7fcb3 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -178,7 +178,7 @@ class FormatFactory final : private boost::noncopyable UInt64 max_block_size, const std::optional & format_settings = std::nullopt, FormatParserSharedResourcesPtr parser_shared_resources = nullptr, - FormatFilterInfoPtr format_filter_info = std::make_shared(), + FormatFilterInfoPtr format_filter_info = nullptr, // affects things like buffer sizes and parallel reading bool is_remote_fs = false, // allows to do: buf -> parallel read -> decompression, diff --git a/src/Formats/FormatFilterInfo.cpp b/src/Formats/FormatFilterInfo.cpp index 8d25deefb397..ac130771fbb9 100644 --- a/src/Formats/FormatFilterInfo.cpp +++ b/src/Formats/FormatFilterInfo.cpp @@ -15,11 +15,16 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int ICEBERG_SPECIFICATION_VIOLATION; } void ColumnMapper::setStorageColumnEncoding(std::unordered_map && storage_encoding_) { + chassert(storage_encoding.empty()); storage_encoding = std::move(storage_encoding_); + for (const auto & [column_name, field_id] : storage_encoding) + if (!field_id_to_clickhouse_name.emplace(field_id, column_name).second) + throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Duplicate field id {}", field_id); } std::pair, std::unordered_map> ColumnMapper::makeMapping( diff --git a/src/Formats/FormatFilterInfo.h b/src/Formats/FormatFilterInfo.h index 3d37a95c93b2..024c4e5321aa 100644 --- a/src/Formats/FormatFilterInfo.h +++ b/src/Formats/FormatFilterInfo.h @@ -18,14 +18,20 @@ class ColumnMapper { public: /// clickhouse_column_name -> field_id + /// For tuples, the map contains both the tuple itself and all its elements, e.g. {t, t.x, t.y}. + /// Note that parquet schema reader has to apply the mapping to all tuple fields recursively + /// even if the whole tuple was requested, because the names of the fields may be different. void setStorageColumnEncoding(std::unordered_map && storage_encoding_); const std::unordered_map & getStorageColumnEncoding() const { return storage_encoding; } + const std::unordered_map & getFieldIdToClickHouseName() const { return field_id_to_clickhouse_name; } + /// clickhouse_column_name -> format_column_name (just join the maps above by field_id). std::pair, std::unordered_map> makeMapping(const std::unordered_map & format_encoding); private: std::unordered_map storage_encoding; + std::unordered_map field_id_to_clickhouse_name; }; using ColumnMapperPtr = std::shared_ptr; diff --git a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp index 3ed775e5d5c1..ac983fad9d15 100644 --- a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp +++ b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp @@ -1556,7 +1556,18 @@ Chunk ArrowColumnToCHColumn::arrowTableToCHChunk( auto arrow_field = table->schema()->GetFieldByName(column_name); if (parquet_columns_to_clickhouse) - column_name = parquet_columns_to_clickhouse->at(column_name); + { + auto column_name_it = parquet_columns_to_clickhouse->find(column_name); + if (column_name_it == parquet_columns_to_clickhouse->end()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Column '{}' is not present in input data. Column name mapping has {} columns", + column_name, + parquet_columns_to_clickhouse->size()); + } + column_name = column_name_it->second; + } if (case_insensitive_matching) boost::to_lower(column_name); diff --git a/src/Processors/Formats/Impl/Parquet/Decoding.cpp b/src/Processors/Formats/Impl/Parquet/Decoding.cpp index 40f4efcf68c9..814033c451dd 100644 --- a/src/Processors/Formats/Impl/Parquet/Decoding.cpp +++ b/src/Processors/Formats/Impl/Parquet/Decoding.cpp @@ -745,7 +745,7 @@ struct ByteStreamSplitDecoder : public PageDecoder bool PageDecoderInfo::canReadDirectlyIntoColumn(parq::Encoding::type encoding, size_t num_values, IColumn & col, std::span & out) const { - if (encoding == parq::Encoding::PLAIN && fixed_size_converter && fixed_size_converter->isTrivial()) + if (encoding == parq::Encoding::PLAIN && fixed_size_converter && physical_type != parq::Type::BOOLEAN && fixed_size_converter->isTrivial()) { chassert(col.sizeOfValueIfFixed() == fixed_size_converter->input_size); out = col.insertRawUninitialized(num_values); @@ -1417,7 +1417,7 @@ void GeoConverter::convertColumn(std::span chars, const UInt64 * off { col.reserve(col.size() + num_values); chassert(chars.size() >= offsets[num_values - 1]); - for (size_t i = 0; i < num_values; ++i) + for (ssize_t i = 0; i < ssize_t(num_values); ++i) { char * ptr = const_cast(chars.data() + offsets[i - 1]); size_t length = offsets[i] - offsets[i - 1] - separator_bytes; diff --git a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp index 43dc7c9a606a..0cea3ce8e407 100644 --- a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp +++ b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp @@ -356,6 +356,8 @@ void ReadManager::finishRowSubgroupStage(size_t row_group_idx, size_t row_subgro } case ReadStage::MainData: { + row_subgroup.stage.store(ReadStage::Deliver, std::memory_order::relaxed); + /// Must add to delivery_queue before advancing read_ptr to deliver subgroups in order. /// (If we advanced read_ptr first, another thread could start and finish reading the /// next subgroup before we add this one to delivery_queue, and ReadManager::read could @@ -368,7 +370,6 @@ void ReadManager::finishRowSubgroupStage(size_t row_group_idx, size_t row_subgro size_t prev = row_group.read_ptr.exchange(row_subgroup_idx + 1); chassert(prev == row_subgroup_idx); advanced_read_ptr = prev + 1; - row_subgroup.stage.store(ReadStage::Deliver, std::memory_order::relaxed); delivery_cv.notify_one(); break; // proceed to advancing read_ptr } diff --git a/src/Processors/Formats/Impl/Parquet/Reader.cpp b/src/Processors/Formats/Impl/Parquet/Reader.cpp index cc2b0662c7a8..fdd0a433d789 100644 --- a/src/Processors/Formats/Impl/Parquet/Reader.cpp +++ b/src/Processors/Formats/Impl/Parquet/Reader.cpp @@ -274,6 +274,7 @@ void Reader::prefilterAndInitRowGroups() SchemaConverter schemer(file_metadata, options, &extended_sample_block); if (prewhere_info && !prewhere_info->remove_prewhere_column) schemer.external_columns.push_back(prewhere_info->prewhere_column_name); + schemer.column_mapper = format_filter_info->column_mapper.get(); schemer.prepareForReading(); primitive_columns = std::move(schemer.primitive_columns); total_primitive_columns_in_file = schemer.primitive_column_idx; @@ -281,24 +282,26 @@ void Reader::prefilterAndInitRowGroups() /// Precalculate some column index mappings. - sample_block_to_output_columns_idx.resize(extended_sample_block.columns(), UINT64_MAX); + sample_block_to_output_columns_idx.resize(extended_sample_block.columns()); for (size_t i = 0; i < output_columns.size(); ++i) { const auto & idx = output_columns[i].idx_in_output_block; if (idx.has_value()) { - chassert(sample_block_to_output_columns_idx.at(*idx) == UINT64_MAX); + chassert(!sample_block_to_output_columns_idx.at(*idx).has_value()); sample_block_to_output_columns_idx.at(*idx) = i; } } - chassert(std::all_of(sample_block_to_output_columns_idx.begin(), sample_block_to_output_columns_idx.end(), [](size_t x) { return x != UINT64_MAX; })); if (format_filter_info->key_condition) { for (size_t idx_in_output_block : format_filter_info->key_condition->getUsedColumns()) { - size_t output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block); - const OutputColumnInfo & output_info = output_columns[output_idx]; + const auto & output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block); + if (!output_idx.has_value()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "KeyCondition uses PREWHERE output"); + const OutputColumnInfo & output_info = output_columns[output_idx.value()]; + if (output_info.is_primitive) primitive_columns[output_info.primitive_start].used_by_key_condition = idx_in_output_block; } @@ -363,7 +366,11 @@ void Reader::prefilterAndInitRowGroups() const auto & column_conditions = static_cast(format_filter_info->opaque.get())->column_conditions; for (const auto & [idx_in_output_block, key_condition] : column_conditions) { - const OutputColumnInfo & output_info = output_columns[sample_block_to_output_columns_idx.at(idx_in_output_block)]; + const auto & output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block); + if (!output_idx.has_value()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Column condition uses PREWHERE output"); + const OutputColumnInfo & output_info = output_columns[output_idx.value()]; + if (!output_info.is_primitive) continue; primitive_columns[output_info.primitive_start].column_index_condition = key_condition.get(); @@ -602,44 +609,47 @@ void Reader::initializePrefetches() void Reader::preparePrewhere() { PrewhereInfoPtr prewhere_info = format_filter_info->prewhere_info; - if (!prewhere_info) - return; + if (prewhere_info) + { + /// TODO [parquet]: We currently run prewhere after reading all prewhere columns of the row + /// subgroup, in one thread per row group. Instead, we could extract single-column conditions + /// and run them after decoding the corresponding columns, in parallel. + /// (Still run multi-column conditions, like `col1 = 42 or col2 = 'yes'`, after reading all columns.) + /// Probably reuse tryBuildPrewhereSteps from MergeTree for splitting the expression. - /// TODO [parquet]: We currently run prewhere after reading all prewhere columns of the row - /// subgroup, in one thread per row group. Instead, we could extract single-column conditions - /// and run them after decoding the corresponding columns, in parallel. - /// (Still run multi-column conditions, like `col1 = 42 or col2 = 'yes'`, after reading all columns.) - /// Probably reuse tryBuildPrewhereSteps from MergeTree for splitting the expression. - /// Convert ActionsDAG to ExpressionActions. - ExpressionActionsSettings actions_settings; - if (prewhere_info->row_level_filter.has_value()) - { - ExpressionActions actions(prewhere_info->row_level_filter->clone(), actions_settings); + /// Convert ActionsDAG to ExpressionActions. + ExpressionActionsSettings actions_settings; + if (prewhere_info->row_level_filter.has_value()) + { + ExpressionActions actions(prewhere_info->row_level_filter->clone(), actions_settings); + prewhere_steps.push_back(PrewhereStep + { + .actions = std::move(actions), + .result_column_name = prewhere_info->row_level_column_name, + }); + } + ExpressionActions actions(prewhere_info->prewhere_actions.clone(), actions_settings); prewhere_steps.push_back(PrewhereStep { .actions = std::move(actions), - .result_column_name = prewhere_info->row_level_column_name + .result_column_name = prewhere_info->prewhere_column_name, + .need_filter = prewhere_info->need_filter, }); + if (!prewhere_info->remove_prewhere_column) + prewhere_steps.back().idx_in_output_block = sample_block->getPositionByName(prewhere_info->prewhere_column_name); } - ExpressionActions actions(prewhere_info->prewhere_actions.clone(), actions_settings); - prewhere_steps.push_back(PrewhereStep - { - .actions = std::move(actions), - .result_column_name = prewhere_info->prewhere_column_name, - .need_filter = prewhere_info->need_filter, - }); - if (!prewhere_info->remove_prewhere_column) - prewhere_steps.back().idx_in_output_block = sample_block->getPositionByName(prewhere_info->prewhere_column_name); - /// Look up expression inputs in extended_sample_block. for (PrewhereStep & step : prewhere_steps) { for (const auto & col : step.actions.getRequiredColumnsWithTypes()) { size_t idx_in_output_block = extended_sample_block.getPositionByName(col.name, /* case_insensitive= */ false); - size_t output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block); - OutputColumnInfo & output_info = output_columns[output_idx]; + const auto & output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block); + if (!output_idx.has_value()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "PREWHERE appears to use its own output as input"); + OutputColumnInfo & output_info = output_columns[output_idx.value()]; + output_info.use_prewhere = true; bool only_for_prewhere = idx_in_output_block >= sample_block->columns(); @@ -649,7 +659,21 @@ void Reader::preparePrewhere() primitive_columns[primitive_idx].only_for_prewhere = only_for_prewhere; } - step.input_column_idxs.push_back(output_idx); + step.input_column_idxs.push_back(output_idx.value()); + } + } + + /// Assert that sample_block_to_output_columns_idx is valid. + for (size_t i = 0; i < sample_block_to_output_columns_idx.size(); ++i) + { + /// (`prewhere_steps` has at most two elements) + size_t is_prewhere_output = std::count_if(prewhere_steps.begin(), prewhere_steps.end(), + [&](const PrewhereStep & step) { return step.idx_in_output_block == i; }); + if (is_prewhere_output > 1 || + /// Column must appear in exactly one of {output_columns, prewhere output}. + sample_block_to_output_columns_idx[i].has_value() != !is_prewhere_output) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected column in sample block: {}", extended_sample_block.getByPosition(i).name); } } } @@ -974,7 +998,8 @@ void Reader::intersectColumnIndexResultsAndInitSubgroups(RowGroup & row_group) bytes_per_row += estimateColumnMemoryBytesPerRow(row_group.columns.at(i), row_group, primitive_columns.at(i)); size_t n = size_t(options.format.parquet.prefer_block_bytes / std::max(bytes_per_row, 1.)); - rows_per_subgroup = std::min(rows_per_subgroup, std::max(n, 1ul)); + n = std::max(n, size_t(128)); // avoid super tiny blocks if something is wrong with stats + rows_per_subgroup = std::min(rows_per_subgroup, n); } chassert(rows_per_subgroup > 0); diff --git a/src/Processors/Formats/Impl/Parquet/Reader.h b/src/Processors/Formats/Impl/Parquet/Reader.h index 7b5875a7f93a..899d4371b0c4 100644 --- a/src/Processors/Formats/Impl/Parquet/Reader.h +++ b/src/Processors/Formats/Impl/Parquet/Reader.h @@ -24,12 +24,10 @@ namespace DB::Parquet { // TODO [parquet]: -// * column_mapper -// * allow_geoparquet_parser +// * either multistage PREWHERE or make query optimizer selectively move parts of the condition to prewhere instead of the whole condition // * test on files from https://github.com/apache/parquet-testing // * check fields for false sharing, add cacheline padding as needed // * make sure userspace page cache read buffer supports readBigAt -// * assert that memory usage is zero at the end, the reset()s are easy to miss // * support newer parquet versions: https://github.com/apache/parquet-format/blob/master/CHANGES.md // * make writer write DataPageV2 // * make writer write PageEncodingStats @@ -37,8 +35,6 @@ namespace DB::Parquet // * try adding [[unlikely]] to all ifs // * try adding __restrict to pointers on hot paths // * support or deprecate the preserve-order setting -// * stats (reuse the ones from the other PR?) -// - number of row groups that were split into chunks // * add comments everywhere // * progress indication and estimating bytes to read; allow negative total_bytes_to_read? // * cache FileMetaData in something like SchemaCache @@ -156,7 +152,7 @@ struct Reader size_t column_idx; /// Index in parquet `schema` (in FileMetaData). size_t schema_idx; - String name; + String name; // possibly mapped by ColumnMapper (e.g. using iceberg metadata) PageDecoderInfo decoder; DataTypePtr raw_decoded_type; // not Nullable @@ -192,7 +188,7 @@ struct Reader struct OutputColumnInfo { - String name; + String name; // possibly mapped by ColumnMapper /// Range in primitive_columns. size_t primitive_start = 0; size_t primitive_end = 0; @@ -455,8 +451,10 @@ struct Reader size_t total_primitive_columns_in_file = 0; std::vector output_columns; /// Maps idx_in_output_block to index in output_columns. I.e.: - /// sample_block_to_output_columns_idx[output_columns[i].idx_in_output_block] = i - std::vector sample_block_to_output_columns_idx; + /// sample_block_to_output_columns_idx[output_columns[i].idx_in_output_block] = i + /// nullopt if the column is produced by PREWHERE expression: + /// prewhere_steps[?].idx_in_output_block == i + std::vector> sample_block_to_output_columns_idx; /// sample_block with maybe some columns added at the end. /// The added columns are used as inputs to prewhere expression, then discarded. diff --git a/src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp b/src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp index f174701c746e..3b1429c5231e 100644 --- a/src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp +++ b/src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp @@ -12,6 +12,8 @@ #include #include #include +#include +#include #include #include @@ -25,6 +27,7 @@ namespace DB::ErrorCodes extern const int TOO_DEEP_RECURSION; extern const int NOT_IMPLEMENTED; extern const int THERE_IS_NO_COLUMN; + extern const int ICEBERG_SPECIFICATION_VIOLATION; } namespace DB::Parquet @@ -65,7 +68,10 @@ void SchemaConverter::prepareForReading() /// DFS the schema tree. size_t top_level_columns = size_t(file_metadata.schema.at(0).num_children); for (size_t i = 0; i < top_level_columns; ++i) - processSubtree("", /*requested*/ false, /*type_hint*/ nullptr, SchemaContext::None); + { + TraversalNode node; + processSubtree(node); + } /// Check that all requested columns were found. std::vector found_columns(sample_block->columns()); @@ -109,33 +115,47 @@ NamesAndTypesList SchemaConverter::inferSchema() NamesAndTypesList res; for (size_t i = 0; i < top_level_columns; ++i) { - std::optional idx = processSubtree("", /*requested*/ true, /*type_hint*/ nullptr, SchemaContext::None); - if (idx.has_value()) + TraversalNode node; + node.requested = true; + processSubtree(node); + if (node.output_idx.has_value()) { - const OutputColumnInfo & col = output_columns.at(idx.value()); + const OutputColumnInfo & col = output_columns.at(node.output_idx.value()); res.emplace_back(col.name, col.type); } } return res; } -std::optional SchemaConverter::processSubtree(String name, bool requested, DataTypePtr type_hint, SchemaContext schema_context) +std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElement & element) const +{ + if (!column_mapper) + return element.name; + const auto & map = column_mapper->getFieldIdToClickHouseName(); + if (!element.__isset.field_id) + throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Missing field_id for column {}", element.name); + auto it = map.find(element.field_id); + if (it == map.end()) + throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Parquet file has column {} with field_id {} that is not in datalake metadata", element.name, element.field_id); + auto split = Nested::splitName(std::string_view(it->second), /*reverse=*/ true); + return split.second.empty() ? split.first : split.second; +} + +void SchemaConverter::processSubtree(TraversalNode & node) { - if (type_hint) - chassert(requested); + if (node.type_hint) + chassert(node.requested); if (schema_idx >= file_metadata.schema.size()) throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid parquet schema tree"); - const parq::SchemaElement & element = file_metadata.schema.at(schema_idx); + node.element = &file_metadata.schema.at(schema_idx); schema_idx += 1; std::optional idx_in_output_block; size_t wrap_in_arrays = 0; - if (schema_context == SchemaContext::None) + if (node.schema_context == SchemaContext::None) { - if (!name.empty()) - name += "."; - name += element.name; + node.appendNameComponent(node.element->name, useColumnMapperIfNeeded(*node.element)); if (sample_block) { @@ -143,24 +163,24 @@ std::optional SchemaConverter::processSubtree(String name, bool requeste /// E.g.: /// insert into function file('t.parquet') select [(10,20,30)] as x; /// select * from file('t.parquet', Parquet, '`x.2` Array(UInt8)'); -- outputs [20] - std::optional pos = sample_block->findPositionByName(name, options.format.parquet.case_insensitive_column_matching); + std::optional pos = sample_block->findPositionByName(node.name, options.format.parquet.case_insensitive_column_matching); if (pos.has_value()) { - if (requested) - throw Exception(ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE, "Requested column {} is part of another requested column", name); + if (node.requested) + throw Exception(ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE, "Requested column {} is part of another requested column", node.getNameForLogging()); - requested = true; - name = sample_block->getByPosition(pos.value()).name; // match case - type_hint = sample_block->getByPosition(pos.value()).type; + node.requested = true; + node.name = sample_block->getByPosition(pos.value()).name; // match case + node.type_hint = sample_block->getByPosition(pos.value()).type; for (size_t i = 1; i < levels.size(); ++i) { if (levels[i].is_array) { - const DataTypeArray * array = typeid_cast(type_hint.get()); + const DataTypeArray * array = typeid_cast(node.type_hint.get()); if (!array) - throw Exception(ErrorCodes::TYPE_MISMATCH, "Requested type of nested column {} doesn't match parquet schema: parquet type is Array, requested type is {}", name, type_hint->getName()); - type_hint = array->getNestedType(); + throw Exception(ErrorCodes::TYPE_MISMATCH, "Requested type of nested column {} doesn't match parquet schema: parquet type is Array, requested type is {}", node.getNameForLogging(), node.type_hint->getName()); + node.type_hint = array->getNestedType(); wrap_in_arrays += 1; } } @@ -178,62 +198,60 @@ std::optional SchemaConverter::processSubtree(String name, bool requeste levels.resize(prev_levels_size); }); - if (element.repetition_type != parq::FieldRepetitionType::REQUIRED) + if (node.element->repetition_type != parq::FieldRepetitionType::REQUIRED) { LevelInfo prev = levels.back(); if (prev.def == UINT8_MAX) - throw Exception(ErrorCodes::TOO_DEEP_RECURSION, "Parquet column {} has extremely deeply nested (>255 levels) arrays or nullables", name); + throw Exception(ErrorCodes::TOO_DEEP_RECURSION, "Parquet column {} has extremely deeply nested (>255 levels) arrays or nullables", node.getNameForLogging()); auto level = LevelInfo {.def = UInt8(prev.def + 1), .rep = prev.rep}; - if (element.repetition_type == parq::FieldRepetitionType::REPEATED) + if (node.element->repetition_type == parq::FieldRepetitionType::REPEATED) { level.rep += 1; // no overflow, rep <= def level.is_array = true; /// We'll first process schema for array element type, then wrap it in Array type. - if (type_hint) + if (node.type_hint) { - const DataTypeArray * array_type = typeid_cast(type_hint.get()); + const DataTypeArray * array_type = typeid_cast(node.type_hint.get()); if (!array_type) - throw Exception(ErrorCodes::TYPE_MISMATCH, "Requested type of column {} doesn't match parquet schema: parquet type is Array, requested type is {}", name, type_hint->getName()); - type_hint = array_type->getNestedType(); + throw Exception(ErrorCodes::TYPE_MISMATCH, "Requested type of column {} doesn't match parquet schema: parquet type is Array, requested type is {}", node.getNameForLogging(), node.type_hint->getName()); + node.type_hint = array_type->getNestedType(); } } chassert(level.def == levels.size()); levels.push_back(level); } - std::optional output_idx; // index in output_columns - /// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md - if (!processSubtreePrimitive(name, requested, type_hint, schema_context, element, output_idx) && - !processSubtreeMap(name, requested, type_hint, schema_context, element, output_idx) && - !processSubtreeArrayOuter(name, requested, type_hint, schema_context, element, output_idx) && - !processSubtreeArrayInner(name, requested, type_hint, schema_context, element, output_idx)) + if (!processSubtreePrimitive(node) && + !processSubtreeMap(node) && + !processSubtreeArrayOuter(node) && + !processSubtreeArrayInner(node)) { - processSubtreeTuple(name, requested, type_hint, schema_context, element, output_idx); + processSubtreeTuple(node); } - if (!output_idx.has_value()) - return std::nullopt; - if (!requested) - return std::nullopt; // we just needed to recurse to children, not interested in output_idx + if (!node.output_idx.has_value()) + return; + if (!node.requested) + return; // we just needed to recurse to children, not interested in output_idx auto make_array = [&](UInt8 rep) { size_t array_idx = output_columns.size(); OutputColumnInfo & array = output_columns.emplace_back(); - const OutputColumnInfo & array_element = output_columns.at(*output_idx); - array.name = name; + const OutputColumnInfo & array_element = output_columns.at(node.output_idx.value()); + array.name = node.name; array.primitive_start = array_element.primitive_start; array.primitive_end = primitive_columns.size(); array.type = std::make_shared(array_element.type); - array.nested_columns = {*output_idx}; + array.nested_columns = {*node.output_idx}; array.rep = rep; - output_idx = array_idx; + node.output_idx = array_idx; }; - if (element.repetition_type == parq::FieldRepetitionType::REPEATED) + if (node.element->repetition_type == parq::FieldRepetitionType::REPEATED) { /// Array of some kind. Can be a child of List or Map, or a standalone repeated field. /// We dispatch all 3 cases to this one code path to minimize probability of bugs. @@ -248,30 +266,28 @@ std::optional SchemaConverter::processSubtree(String name, bool requeste for (size_t i = 0; i < wrap_in_arrays; ++i) make_array(levels[prev_levels_size - 1].rep - i); - output_columns[*output_idx].idx_in_output_block = idx_in_output_block; + output_columns[node.output_idx.value()].idx_in_output_block = idx_in_output_block; } - - return output_idx; } -bool SchemaConverter::processSubtreePrimitive(const String & name, bool requested, DataTypePtr type_hint, SchemaContext schema_context, const parq::SchemaElement & element, std::optional & output_idx) +bool SchemaConverter::processSubtreePrimitive(TraversalNode & node) { /// `parquet.thrift` says "[num_children] is not set when the element is a primitive type". /// If it's set but has value 0, logically it would make sense to interpret it as empty tuple/struct. /// But in practice some writers are sloppy about it and set this field to 0 (rather than unset) /// for primitive columns. E.g. /// tests/queries/0_stateless/data_hive/partitioning/non_existing_column=Elizabeth/sample.parquet - bool is_primitive = !element.__isset.num_children || (element.num_children == 0 && element.__isset.type); + bool is_primitive = !node.element->__isset.num_children || (node.element->num_children == 0 && node.element->__isset.type); if (!is_primitive) return false; primitive_column_idx += 1; - if (!requested) + if (!node.requested) return true; - if (!element.__isset.type) - throw Exception(ErrorCodes::INCORRECT_DATA, "Parquet metadata is missing physical type for column {}", element.name); + if (!node.element->__isset.type) + throw Exception(ErrorCodes::INCORRECT_DATA, "Parquet metadata is missing physical type for column {}", node.getNameForLogging()); - DataTypePtr primitive_type_hint = type_hint; + DataTypePtr primitive_type_hint = node.type_hint; bool output_nullable = false; bool output_nullable_if_not_json = false; if (primitive_type_hint) @@ -287,7 +303,7 @@ bool SchemaConverter::processSubtreePrimitive(const String & name, bool requeste } } /// Force map key to be non-nullable because clickhouse Map doesn't support nullable map key. - else if (!options.schema_inference_force_not_nullable && schema_context != SchemaContext::MapKey) + else if (!options.schema_inference_force_not_nullable && node.schema_context != SchemaContext::MapKey) { if (levels.back().is_array == false) { @@ -306,7 +322,7 @@ bool SchemaConverter::processSubtreePrimitive(const String & name, bool requeste } } - auto geo_it = geo_columns.find(name); + auto geo_it = geo_columns.find(node.getParquetName()); auto geo_metadata = geo_it == geo_columns.end() ? std::nullopt : std::optional(geo_it->second); DataTypePtr inferred_type; @@ -314,7 +330,7 @@ bool SchemaConverter::processSubtreePrimitive(const String & name, bool requeste PageDecoderInfo decoder; try { - processPrimitiveColumn(element, primitive_type_hint, decoder, raw_decoded_type, inferred_type, geo_metadata); + processPrimitiveColumn(*node.element, primitive_type_hint, decoder, raw_decoded_type, inferred_type, geo_metadata); } catch (Exception & e) { @@ -325,7 +341,7 @@ bool SchemaConverter::processSubtreePrimitive(const String & name, bool requeste } else { - e.addMessage("column '" + name + "'"); + e.addMessage("column '" + node.getNameForLogging() + "'"); throw; } } @@ -341,7 +357,7 @@ bool SchemaConverter::processSubtreePrimitive(const String & name, bool requeste PrimitiveColumnInfo & primitive = primitive_columns.emplace_back(); primitive.column_idx = primitive_column_idx - 1; primitive.schema_idx = schema_idx - 1; - primitive.name = name; + primitive.name = node.name; primitive.levels = levels; primitive.output_nullable = output_nullable || (output_nullable_if_not_json && !typeid_cast(inferred_type.get())); primitive.decoder = std::move(decoder); @@ -350,9 +366,9 @@ bool SchemaConverter::processSubtreePrimitive(const String & name, bool requeste if (level.is_array) primitive.max_array_def = level.def; - output_idx = output_columns.size(); + node.output_idx = output_columns.size(); OutputColumnInfo & output = output_columns.emplace_back(); - output.name = name; + output.name = node.name; output.primitive_start = primitive_idx; output.primitive_end = primitive_idx + 1; output.is_primitive = true; @@ -367,7 +383,7 @@ bool SchemaConverter::processSubtreePrimitive(const String & name, bool requeste inferred_type = std::make_shared(inferred_type); } - primitive.final_type = type_hint ? type_hint : inferred_type; + primitive.final_type = node.type_hint ? node.type_hint : inferred_type; primitive.needs_cast = !primitive.final_type->equals(*primitive.intermediate_type); output.type = primitive.final_type; @@ -375,7 +391,7 @@ bool SchemaConverter::processSubtreePrimitive(const String & name, bool requeste return true; } -bool SchemaConverter::processSubtreeMap(const String & name, bool requested, DataTypePtr type_hint, SchemaContext schema_context, const parq::SchemaElement & element, std::optional & output_idx) +bool SchemaConverter::processSubtreeMap(TraversalNode & node) { /// Map, aka Array(Tuple(2)). /// required group `name` (MAP or MAP_KEY_VALUE): @@ -383,18 +399,18 @@ bool SchemaConverter::processSubtreeMap(const String & name, bool requested, Dat /// reqiured "key" /// "value" - if (element.converted_type != parq::ConvertedType::MAP && element.converted_type != parq::ConvertedType::MAP_KEY_VALUE && !element.logicalType.__isset.MAP) + if (node.element->converted_type != parq::ConvertedType::MAP && node.element->converted_type != parq::ConvertedType::MAP_KEY_VALUE && !node.element->logicalType.__isset.MAP) return false; /// If an element is declared as MAP, but doesn't have the expected structure of children /// and grandchildren, we fall back to interpreting it as array of tuples, as if there were /// no MAP annotation on it. Also fall back if Tuple type was requested /// (presumably `Tuple(Array(Tuple(key, value))` - a literal interpretation of the schema tree) /// (not to be confused with the case when `Array(Tuple(key, value))` was requested). - if (schema_context != SchemaContext::None && schema_context != SchemaContext::ListElement) + if (node.schema_context != SchemaContext::None && node.schema_context != SchemaContext::ListElement) return false; - if (typeid_cast(type_hint.get())) + if (typeid_cast(node.type_hint.get())) return false; - if (element.num_children != 1) + if (node.element->num_children != 1) return false; const parq::SchemaElement & child = file_metadata.schema.at(schema_idx); if (child.repetition_type != parq::FieldRepetitionType::REPEATED || child.num_children != 2) @@ -402,52 +418,54 @@ bool SchemaConverter::processSubtreeMap(const String & name, bool requested, Dat DataTypePtr array_type_hint; bool no_map = false; // return plain Array(Tuple) instead of Map - if (type_hint) + if (node.type_hint) { - if (const DataTypeMap * map_type = typeid_cast(type_hint.get())) + if (const DataTypeMap * map_type = typeid_cast(node.type_hint.get())) { array_type_hint = map_type->getNestedType(); } - else if (typeid_cast(type_hint.get())) + else if (typeid_cast(node.type_hint.get())) { - array_type_hint = type_hint; + array_type_hint = node.type_hint; no_map = true; } else { - throw Exception(ErrorCodes::TYPE_MISMATCH, "Requested type of column {} doesn't match parquet schema: parquet type is Map, requested type is {}", name, type_hint->getName()); + throw Exception(ErrorCodes::TYPE_MISMATCH, "Requested type of column {} doesn't match parquet schema: parquet type is Map, requested type is {}", node.getNameForLogging(), node.type_hint->getName()); } } /// (MapTupleAsPlainTuple is needed to skip a level in the column name: it changes /// `my_map.key_value.key` to `my_map.key`. - auto array_idx = processSubtree(name, requested, array_type_hint, no_map ? SchemaContext::MapTupleAsPlainTuple : SchemaContext::MapTuple); + TraversalNode subnode = node.prepareToRecurse(no_map ? SchemaContext::MapTupleAsPlainTuple : SchemaContext::MapTuple, array_type_hint); + processSubtree(subnode); - if (!requested || !array_idx.has_value()) + if (!node.requested || !subnode.output_idx.has_value()) return true; + size_t array_idx = subnode.output_idx.value(); /// Support explicitly requesting Array(Tuple) type for map columns. Useful e.g. if the map /// key type is something that's not allowed as Map key in clickhouse. if (no_map) { - output_idx = array_idx; + node.output_idx = array_idx; } else { - output_idx = output_columns.size(); + node.output_idx = output_columns.size(); OutputColumnInfo & output = output_columns.emplace_back(); - const OutputColumnInfo & array = output_columns.at(array_idx.value()); + const OutputColumnInfo & array = output_columns.at(array_idx); - output.name = name; + output.name = node.name; output.primitive_start = array.primitive_start; output.primitive_end = array.primitive_end; output.type = std::make_shared(array.type); - output.nested_columns = {array_idx.value()}; + output.nested_columns = {array_idx}; } return true; } -bool SchemaConverter::processSubtreeArrayOuter(const String & name, bool requested, DataTypePtr type_hint, SchemaContext schema_context, const parq::SchemaElement & element, std::optional & output_idx) +bool SchemaConverter::processSubtreeArrayOuter(TraversalNode & node) { /// Array: /// required group `name` (List): @@ -458,46 +476,48 @@ bool SchemaConverter::processSubtreeArrayOuter(const String & name, bool request /// across two levels of recursion: processSubtreeArrayOuter for the outer wrapper, /// processSubtreeArrayInner for the inner wrapper. - if (element.converted_type != parq::ConvertedType::LIST && !element.logicalType.__isset.LIST) + if (node.element->converted_type != parq::ConvertedType::LIST && !node.element->logicalType.__isset.LIST) return false; - if (schema_context != SchemaContext::None && schema_context != SchemaContext::ListElement) + if (node.schema_context != SchemaContext::None && node.schema_context != SchemaContext::ListElement) return false; - if (element.num_children != 1) + if (node.element->num_children != 1) return false; const parq::SchemaElement & child = file_metadata.schema.at(schema_idx); if (child.repetition_type != parq::FieldRepetitionType::REPEATED || child.num_children != 1) return false; - auto array_idx = processSubtree(name, requested, type_hint, SchemaContext::ListTuple); + TraversalNode subnode = node.prepareToRecurse(SchemaContext::ListTuple, node.type_hint); + processSubtree(subnode); - if (!requested || !array_idx.has_value()) + if (!node.requested || !subnode.output_idx.has_value()) return true; - output_idx = array_idx; + node.output_idx = subnode.output_idx; return true; } -bool SchemaConverter::processSubtreeArrayInner(const String & name, bool requested, DataTypePtr type_hint, SchemaContext schema_context, const parq::SchemaElement & element, std::optional & output_idx) +bool SchemaConverter::processSubtreeArrayInner(TraversalNode & node) { - if (schema_context != SchemaContext::ListTuple) + if (node.schema_context != SchemaContext::ListTuple) return false; /// Array (middle schema element). - chassert(element.repetition_type == parq::FieldRepetitionType::REPEATED && - element.num_children == 1); // caller checked this + chassert(node.element->repetition_type == parq::FieldRepetitionType::REPEATED && + node.element->num_children == 1); // caller checked this /// (type_hint is already unwrapped to be element type, because of REPEATED) - auto element_idx = processSubtree(name, requested, type_hint, SchemaContext::ListElement); + TraversalNode subnode = node.prepareToRecurse(SchemaContext::ListElement, node.type_hint); + processSubtree(subnode); - if (!requested || !element_idx.has_value()) + if (!node.requested || !subnode.output_idx.has_value()) return true; - output_idx = element_idx; + node.output_idx = subnode.output_idx; return true; } -void SchemaConverter::processSubtreeTuple(const String & name, bool requested, DataTypePtr type_hint, SchemaContext schema_context, const parq::SchemaElement & element, std::optional & output_idx) +void SchemaConverter::processSubtreeTuple(TraversalNode & node) { /// Tuple (possibly a Map key_value tuple): /// (required|optional) group `name`: @@ -505,9 +525,9 @@ void SchemaConverter::processSubtreeTuple(const String & name, bool requested, D /// `name2` /// ... - const DataTypeTuple * tuple_type_hint = typeid_cast(type_hint.get()); - if (type_hint && !tuple_type_hint) - throw Exception(ErrorCodes::TYPE_MISMATCH, "Requested type of column {} doesn't match parquet schema: parquet type is Tuple, requested type is {}", name, type_hint->getName()); + const DataTypeTuple * tuple_type_hint = typeid_cast(node.type_hint.get()); + if (node.type_hint && !tuple_type_hint) + throw Exception(ErrorCodes::TYPE_MISMATCH, "Requested type of column {} doesn't match parquet schema: parquet type is Tuple, requested type is {}", node.getNameForLogging(), node.type_hint->getName()); /// 3 modes: /// * If type_hint has element names, we match elements from parquet to elements from type @@ -521,10 +541,10 @@ void SchemaConverter::processSubtreeTuple(const String & name, bool requested, D bool lookup_by_name = false; std::vector elements; - if (type_hint) + if (node.type_hint) { if (tuple_type_hint->hasExplicitNames() && !tuple_type_hint->getElements().empty() && - schema_context != SchemaContext::MapTuple) + node.schema_context != SchemaContext::MapTuple) { /// Allow reading a subset of tuple elements, matched by name, possibly reordered. lookup_by_name = true; @@ -532,16 +552,16 @@ void SchemaConverter::processSubtreeTuple(const String & name, bool requested, D } else { - if (tuple_type_hint->getElements().size() != size_t(element.num_children)) - throw Exception(ErrorCodes::TYPE_MISMATCH, "Requested type of column {} doesn't match parquet schema: parquet type is Tuple with {} elements, requested type is Tuple with {} elements", name, element.num_children, tuple_type_hint->getElements().size()); + if (tuple_type_hint->getElements().size() != size_t(node.element->num_children)) + throw Exception(ErrorCodes::TYPE_MISMATCH, "Requested type of column {} doesn't match parquet schema: parquet type is Tuple with {} elements, requested type is Tuple with {} elements", node.getNameForLogging(), node.element->num_children, tuple_type_hint->getElements().size()); } } - if (!lookup_by_name && requested) - elements.resize(size_t(element.num_children), UINT64_MAX); + if (!lookup_by_name && node.requested) + elements.resize(size_t(node.element->num_children), UINT64_MAX); Strings names; DataTypes types; - if (!type_hint && requested) + if (!node.type_hint && node.requested) { names.resize(elements.size()); types.resize(elements.size()); @@ -551,36 +571,37 @@ void SchemaConverter::processSubtreeTuple(const String & name, bool requested, D size_t output_start = output_columns.size(); size_t skipped_unsupported_columns = 0; std::vector element_names_in_file; - for (size_t i = 0; i < size_t(element.num_children); ++i) + for (size_t i = 0; i < size_t(node.element->num_children); ++i) { - const String & element_name = file_metadata.schema.at(schema_idx).name; - element_names_in_file.push_back(element_name); + const String & element_name = element_names_in_file.emplace_back(useColumnMapperIfNeeded(file_metadata.schema.at(schema_idx))); std::optional idx_in_output_tuple = i - skipped_unsupported_columns; if (lookup_by_name) { idx_in_output_tuple = tuple_type_hint->tryGetPositionByName(element_name, options.format.parquet.case_insensitive_column_matching); if (idx_in_output_tuple.has_value() && elements.at(idx_in_output_tuple.value()) != UINT64_MAX) - throw Exception(ErrorCodes::DUPLICATE_COLUMN, "Parquet tuple {} has multiple elements with name `{}`", name, element_name); + throw Exception(ErrorCodes::DUPLICATE_COLUMN, "Parquet tuple {} has multiple elements with name `{}`", node.getNameForLogging(), element_name); } DataTypePtr element_type_hint; - if (type_hint && idx_in_output_tuple.has_value()) + if (node.type_hint && idx_in_output_tuple.has_value()) element_type_hint = tuple_type_hint->getElement(idx_in_output_tuple.value()); - bool element_requested = requested && idx_in_output_tuple.has_value(); + const bool element_requested = node.requested && idx_in_output_tuple.has_value(); - SchemaContext child_context = SchemaContext::None; - if (schema_context == SchemaContext::MapTuple && idx_in_output_tuple == 0) - child_context = SchemaContext::MapKey; + TraversalNode subnode = node.prepareToRecurse(SchemaContext::None, element_type_hint); + subnode.requested = element_requested; + if (node.schema_context == SchemaContext::MapTuple && idx_in_output_tuple == 0) + subnode.schema_context = SchemaContext::MapKey; - auto element_idx = processSubtree(name, element_requested, element_type_hint, child_context); + processSubtree(subnode); + auto element_idx = subnode.output_idx; if (element_requested) { if (!element_idx.has_value()) { - if (type_hint || schema_context == SchemaContext::MapTuple) + if (node.type_hint || node.schema_context == SchemaContext::MapTuple) { /// If one of the elements is skipped, skip the whole tuple. /// Remove previous elements. @@ -601,7 +622,7 @@ void SchemaConverter::processSubtreeTuple(const String & name, bool requested, D elements.at(idx_in_output_tuple.value()) = element_idx.value(); const auto & type = output_columns.at(element_idx.value()).type; - if (type_hint) + if (node.type_hint) { chassert(type->equals(*element_type_hint)); } @@ -613,16 +634,16 @@ void SchemaConverter::processSubtreeTuple(const String & name, bool requested, D } } - if (!requested) + if (!node.requested) return; /// Map tuple in parquet has elements: {"key" , "value" }, /// but DataTypeMap requires: {"keys", "values"}. - if (schema_context == SchemaContext::MapTuple) + if (node.schema_context == SchemaContext::MapTuple) names = {"keys", "values"}; DataTypePtr output_type; - if (type_hint) + if (node.type_hint) { chassert(elements.size() == tuple_type_hint->getElements().size()); for (size_t i = 0; i < elements.size(); ++i) @@ -630,24 +651,24 @@ void SchemaConverter::processSubtreeTuple(const String & name, bool requested, D if (elements[i] != UINT64_MAX) continue; if (!options.format.parquet.allow_missing_columns) - throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Requested tuple element {} of column {} was not found in parquet schema ({})", tuple_type_hint->getNameByPosition(i + 1), name, element_names_in_file); + throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Requested tuple element {} of column {} was not found in parquet schema ({})", tuple_type_hint->getNameByPosition(i + 1), node.getNameForLogging(), element_names_in_file); elements[i] = output_columns.size(); OutputColumnInfo & missing_output = output_columns.emplace_back(); - missing_output.name = name + "." + (tuple_type_hint->hasExplicitNames() ? tuple_type_hint->getNameByPosition(i + 1) : std::to_string(i + 1)); + missing_output.name = node.name + "." + (tuple_type_hint->hasExplicitNames() ? tuple_type_hint->getNameByPosition(i + 1) : std::to_string(i + 1)); missing_output.type = tuple_type_hint->getElement(i); missing_output.is_missing_column = true; } - output_type = type_hint; + output_type = node.type_hint; } else { output_type = std::make_shared(types, names); } - output_idx = output_columns.size(); + node.output_idx = output_columns.size(); OutputColumnInfo & output = output_columns.emplace_back(); - output.name = name; + output.name = node.name; output.primitive_start = primitive_start; output.primitive_end = primitive_columns.size(); output.type = std::move(output_type); @@ -790,7 +811,9 @@ void SchemaConverter::processPrimitiveColumn( /// GeoParquet. /// Spec says "Geometry columns MUST be at the root of the schema", but we allow them to be - /// nested in tuples etc, why not. + /// nested in tuples etc, why not. (Though nesting in arrays/maps probably currently wouldn't + /// work because our names omit the wrapper SchemaElement-s. That would be easy to fix by + /// including them in parquet_name.) /// If type hint is String, ignore geoparquet and return raw bytes. if (geo_metadata.has_value() && (!type_hint || !typeid_cast(type_hint.get()))) { diff --git a/src/Processors/Formats/Impl/Parquet/SchemaConverter.h b/src/Processors/Formats/Impl/Parquet/SchemaConverter.h index eb8bf807b350..e89e7f75d6f0 100644 --- a/src/Processors/Formats/Impl/Parquet/SchemaConverter.h +++ b/src/Processors/Formats/Impl/Parquet/SchemaConverter.h @@ -2,6 +2,13 @@ #include +namespace DB +{ + +class ColumnMapper; + +} + namespace DB::Parquet { @@ -16,6 +23,7 @@ struct SchemaConverter const parq::FileMetaData & file_metadata; const ReadOptions & options; const Block * sample_block; + const ColumnMapper * column_mapper = nullptr; std::vector external_columns; std::vector primitive_columns; @@ -25,6 +33,7 @@ struct SchemaConverter size_t primitive_column_idx = 0; std::vector levels; + /// The key is the parquet column name, without ColumnMapper. std::unordered_map geo_columns; SchemaConverter(const parq::FileMetaData &, const ReadOptions &, const Block *); @@ -49,23 +58,87 @@ struct SchemaConverter ListElement, }; + /// Parameters of a recursive call that traverses a subtree, corresponding to a parquet SchemaElement. + struct TraversalNode + { + /// Assigned by the caller. + SchemaContext schema_context = SchemaContext::None; + + /// These fields are assigned by the caller, then updated by the callee. + /// E.g. name is initially the parent element's name, then the callee appends a path + /// component to it. + /// + /// If there's ColumnMapper, `name` is the mapped name (clickhouse column name), while + /// `parquet_name` is the name according to the parquet schema. + /// If `parquet_name` is nullopt, the clickhouse and parquet names are equal. + String name; + std::optional parquet_name; + DataTypePtr type_hint; + bool requested = false; + + /// These are assigned by the callee. + const parq::SchemaElement * element = nullptr; + std::optional output_idx; // index in output_columns + + const String & getParquetName() const + { + return parquet_name.has_value() ? *parquet_name : name; + } + + String getNameForLogging() const + { + if (parquet_name.has_value() && *parquet_name != name) + return fmt::format("{} (mapped to {})", *parquet_name, name); + return name; + } + + void appendNameComponent(const String & parquet_field_name, std::string_view mapped_field_name) + { + if (!name.empty()) + name += "."; + name += mapped_field_name; + if (parquet_name.has_value() || mapped_field_name != parquet_field_name) + { + if (parquet_name.has_value()) + *parquet_name += "."; + else + parquet_name.emplace(); + *parquet_name += parquet_field_name; + } + } + + TraversalNode prepareToRecurse(SchemaContext schema_context_, DataTypePtr type_hint_) + { + TraversalNode res = *this; + res.schema_context = schema_context_; + res.type_hint = std::move(type_hint_); + res.element = nullptr; + res.output_idx.reset(); + return res; + } + }; + void checkHasColumns(); - std::optional processSubtree(String name, bool requested, DataTypePtr type_hint, SchemaContext); + void processSubtree(TraversalNode & node); /// These functions are used by processSubtree for different kinds of SchemaElement. /// Return true if the schema element was recognized as the corresponding kind, /// even if no output column needs to be produced. - bool processSubtreePrimitive(const String & name, bool requested, DataTypePtr type_hint, SchemaContext schema_context, const parq::SchemaElement & element, std::optional & output_idx); - bool processSubtreeMap(const String & name, bool requested, DataTypePtr type_hint, SchemaContext schema_context, const parq::SchemaElement & element, std::optional & output_idx); - bool processSubtreeArrayOuter(const String & name, bool requested, DataTypePtr type_hint, SchemaContext schema_context, const parq::SchemaElement & element, std::optional & output_idx); - bool processSubtreeArrayInner(const String & name, bool requested, DataTypePtr type_hint, SchemaContext schema_context, const parq::SchemaElement & element, std::optional & output_idx); - void processSubtreeTuple(const String & name, bool requested, DataTypePtr type_hint, SchemaContext schema_context, const parq::SchemaElement & element, std::optional & output_idx); + bool processSubtreePrimitive(TraversalNode & node); + bool processSubtreeMap(TraversalNode & node); + bool processSubtreeArrayOuter(TraversalNode & node); + bool processSubtreeArrayInner(TraversalNode & node); + void processSubtreeTuple(TraversalNode & node); void processPrimitiveColumn( const parq::SchemaElement & element, DataTypePtr type_hint, PageDecoderInfo & out_decoder, DataTypePtr & out_decoded_type, DataTypePtr & out_inferred_type, std::optional geo_metadata) const; + + /// Returns element.name or a corresponding name from ColumnMapper. + /// For tuple elements, that's just the element name like `x`, not the whole path like `t.x`. + std::string_view useColumnMapperIfNeeded(const parq::SchemaElement & element) const; }; } diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp index 16ceae4819b0..fca6fc38b89d 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp @@ -37,6 +37,9 @@ ParquetV3BlockInputFormat::ParquetV3BlockInputFormat( { read_options.min_bytes_for_seek = min_bytes_for_seek; read_options.bytes_per_read_task = min_bytes_for_seek * 4; + + if (!format_filter_info) + format_filter_info = std::make_shared(); } void ParquetV3BlockInputFormat::initializeIfNeeded() diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 6e5f6c8bd2ac..740c3b4d1efb 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -252,7 +252,7 @@ bool StorageObjectStorage::canMoveConditionsToPrewhere() const std::optional StorageObjectStorage::supportedPrewhereColumns() const { - return getInMemoryMetadataPtr()->getColumnsWithoutDefaultExpressions(); + return getInMemoryMetadataPtr()->getColumnsWithoutDefaultExpressions(/*exclude=*/ hive_partition_columns_to_read_from_file_path); } IStorage::ColumnSizeByName StorageObjectStorage::getColumnSizes() const diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 28e9da937032..f0270cd78dba 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -1029,8 +1029,9 @@ bool StorageFile::canMoveConditionsToPrewhere() const std::optional StorageFile::supportedPrewhereColumns() const { - /// Currently don't support prewhere for virtual columns and columns with default expressions. - return getInMemoryMetadataPtr()->getColumnsWithoutDefaultExpressions(); + /// Currently don't support prewhere for virtual columns, columns with default expressions, + /// and columns taken from file path (hive partitioning). + return getInMemoryMetadataPtr()->getColumnsWithoutDefaultExpressions(/*exclude=*/ hive_partition_columns_to_read_from_file_path); } IStorage::ColumnSizeByName StorageFile::getColumnSizes() const diff --git a/src/Storages/StorageInMemoryMetadata.cpp b/src/Storages/StorageInMemoryMetadata.cpp index 5e9ec5ae98e4..cd010a280a68 100644 --- a/src/Storages/StorageInMemoryMetadata.cpp +++ b/src/Storages/StorageInMemoryMetadata.cpp @@ -788,11 +788,12 @@ std::unordered_map StorageInMemoryMetadata::getFakeColu return sizes; } -NameSet StorageInMemoryMetadata::getColumnsWithoutDefaultExpressions() const +NameSet StorageInMemoryMetadata::getColumnsWithoutDefaultExpressions(const NamesAndTypesList & exclude) const { + auto exclude_map = exclude.getNameToTypeMap(); NameSet names; for (const auto & col : columns) - if (!col.default_desc.expression) + if (!col.default_desc.expression && !exclude_map.contains(col.name)) names.insert(col.name); return names; } diff --git a/src/Storages/StorageInMemoryMetadata.h b/src/Storages/StorageInMemoryMetadata.h index 43af71ed7713..8fcb419fd376 100644 --- a/src/Storages/StorageInMemoryMetadata.h +++ b/src/Storages/StorageInMemoryMetadata.h @@ -289,7 +289,7 @@ struct StorageInMemoryMetadata std::unordered_map getFakeColumnSizes() const; /// Elements of `columns` that have `default_desc.expression == nullptr`. - NameSet getColumnsWithoutDefaultExpressions() const; + NameSet getColumnsWithoutDefaultExpressions(const NamesAndTypesList & exclude) const; }; using StorageMetadataPtr = std::shared_ptr; diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 5c80831421f4..e0d7158ed44b 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -1098,7 +1098,7 @@ bool IStorageURLBase::canMoveConditionsToPrewhere() const std::optional IStorageURLBase::supportedPrewhereColumns() const { - return getInMemoryMetadataPtr()->getColumnsWithoutDefaultExpressions(); + return getInMemoryMetadataPtr()->getColumnsWithoutDefaultExpressions(/*exclude=*/ hive_partition_columns_to_read_from_file_path); } IStorage::ColumnSizeByName IStorageURLBase::getColumnSizes() const diff --git a/tests/clickhouse-test b/tests/clickhouse-test index b0003e1403e8..077abaa0cdb6 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -1146,6 +1146,8 @@ class SettingsRandomizer: "use_query_condition_cache": lambda: random.randint(0, 1), "secondary_indices_enable_bulk_filtering": lambda: random.randint(0, 1), "use_skip_indexes_if_final": lambda: random.randint(0, 1), + # Use the new reader most of the time. + "input_format_parquet_use_native_reader_v3": lambda: min(1, random.randint(0, 5)), **randomize_external_sort_group_by(), } diff --git a/tests/queries/0_stateless/02168_avro_bug.sql b/tests/queries/0_stateless/02168_avro_bug.sql index ac98119845f5..338b5ef8b0b4 100644 --- a/tests/queries/0_stateless/02168_avro_bug.sql +++ b/tests/queries/0_stateless/02168_avro_bug.sql @@ -1,5 +1,5 @@ -- Tags: no-fasttest, no-parallel -insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10); -insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10); -- { serverError CANNOT_APPEND_TO_FILE } -insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10); -- { serverError CANNOT_APPEND_TO_FILE } +insert into table function file('02168_avro_bug.avro', 'Parquet', 'x UInt64') select * from numbers(10) settings engine_file_truncate_on_insert=1; +insert into table function file('02168_avro_bug.avro', 'Parquet', 'x UInt64') select * from numbers(10); -- { serverError CANNOT_APPEND_TO_FILE } +insert into table function file('02168_avro_bug.avro', 'Parquet', 'x UInt64') select * from numbers(10); -- { serverError CANNOT_APPEND_TO_FILE } select 'OK'; diff --git a/tests/queries/0_stateless/02786_parquet_big_integer_compatibility.reference b/tests/queries/0_stateless/02786_parquet_big_integer_compatibility.reference index 877bb5f390f8..b7211a9f2526 100644 --- a/tests/queries/0_stateless/02786_parquet_big_integer_compatibility.reference +++ b/tests/queries/0_stateless/02786_parquet_big_integer_compatibility.reference @@ -1,2 +1,5 @@ 424242424242424242424242424242424242424242424242424242 22707864971053448441042714569797161695738549521977760418632926980540162388532 +42424242424242424242424242424242 +22707864971053448441042714569797161695738549521977760418632926980540162388532 +42424242424242424242424242424242 diff --git a/tests/queries/0_stateless/02786_parquet_big_integer_compatibility.sh b/tests/queries/0_stateless/02786_parquet_big_integer_compatibility.sh index 0f590027f194..2c5a79e36430 100755 --- a/tests/queries/0_stateless/02786_parquet_big_integer_compatibility.sh +++ b/tests/queries/0_stateless/02786_parquet_big_integer_compatibility.sh @@ -8,5 +8,13 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # This is parsed as text. $CLICKHOUSE_LOCAL -q "select toString(424242424242424242424242424242424242424242424242424242::UInt256) as x format Parquet" | $CLICKHOUSE_LOCAL --input-format=Parquet --structure='x UInt256' -q "select * from table" -# But this is parsed as binary because text length happens to be 32 bytes. Not ideal. -$CLICKHOUSE_LOCAL -q "select toString(42424242424242424242424242424242::UInt256) as x format Parquet" | $CLICKHOUSE_LOCAL --input-format=Parquet --structure='x UInt256' -q "select * from table" +# FIXED_LEN_BYTE_ARRAY(32) is parsed as binary. +$CLICKHOUSE_LOCAL -q "select toFixedString(42424242424242424242424242424242::UInt256::String, 32) as x format Parquet" | $CLICKHOUSE_LOCAL --input-format=Parquet --structure='x UInt256' -q "select * from table" + +# FIXED_LEN_BYTE_ARRAY(not 32) is parsed as text by the new reader, throws exception in the old reader. +$CLICKHOUSE_LOCAL -q "select toFixedString(42424242424242424242424242424242::UInt256::String, 50) as x format Parquet" | $CLICKHOUSE_LOCAL --input-format=Parquet --structure='x UInt256' --input_format_parquet_use_native_reader_v3=1 -q "select * from table" + +# BYTE_ARRAY of length 32 is interpreted as binary by the old parquet reader, as text by the new one. +$CLICKHOUSE_LOCAL -q "select toString(42424242424242424242424242424242::UInt256) as x format Parquet" | $CLICKHOUSE_LOCAL --input-format=Parquet --structure='x UInt256' --input_format_parquet_use_native_reader_v3=0 -q "select * from table" +$CLICKHOUSE_LOCAL -q "select toString(42424242424242424242424242424242::UInt256) as x format Parquet" | $CLICKHOUSE_LOCAL --input-format=Parquet --structure='x UInt256' --input_format_parquet_use_native_reader_v3=1 -q "select * from table" + diff --git a/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down.sh b/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down.sh index e206e6a6f84e..204f849db91c 100755 --- a/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down.sh +++ b/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down.sh @@ -18,100 +18,102 @@ DATA_FILE_USER_PATH="${WORKING_DIR}/multi_column_bf.gz.parquet" cp ${DATA_FILE} ${DATA_FILE_USER_PATH} +CLICKHOUSE_CLIENT="${CLICKHOUSE_CLIENT} --input_format_parquet_filter_push_down=false --input_format_parquet_page_filter_push_down=false --optimize_move_to_prewhere=false --input_format_parquet_enable_row_group_prefetch=false" + ${CLICKHOUSE_CLIENT} --query="select count(*) from file('${DATA_FILE_USER_PATH}', Parquet) SETTINGS use_cache_for_count_from_files=false;" echo "bloom filter is off, all row groups should be read" echo "expect rows_read = select count()" -${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or flba='WNMM' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or flba='WNMM' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false" | jq 'del(.meta,.statistics.elapsed)' echo "bloom filter is on, some row groups should be skipped" echo "expect rows_read much less than select count()" -${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or flba='WNMM' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or flba='WNMM' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true" | jq 'del(.meta,.statistics.elapsed)' echo "bloom filter is on, but where predicate contains data from 2 row groups out of 3." echo "Rows read should be less than select count, but greater than previous selects" -${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or string='ZHZK' order by uint16_logical asc Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or string='ZHZK' order by uint16_logical asc Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true;" | jq 'del(.meta,.statistics.elapsed)' echo "bloom filter is on, but where predicate contains data from all row groups" echo "expect rows_read = select count()" -${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or string='ZHZK' or uint64_logical=18441251162536403933 order by uint16_logical asc Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or string='ZHZK' or uint64_logical=18441251162536403933 order by uint16_logical asc Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true;" | jq 'del(.meta,.statistics.elapsed)' echo "IN check" -${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string in ('PFJH', 'ZHZK') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string in ('PFJH', 'ZHZK') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true;" | jq 'del(.meta,.statistics.elapsed)' echo "tuple in case, bf is off." -${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in ('PFJH', 'GKJC') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in ('PFJH', 'GKJC') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' echo "tuple in case, bf is on." -${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in ('PFJH', 'GKJC') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in ('PFJH', 'GKJC') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true;" | jq 'del(.meta,.statistics.elapsed)' echo "complex tuple in case, bf is off" -${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in (('NON1', 'NON1'), ('PFJH', 'GKJC'), ('NON2', 'NON2')) order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in (('NON1', 'NON1'), ('PFJH', 'GKJC'), ('NON2', 'NON2')) order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' echo "complex tuple in case, bf is on" -${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in (('NON1', 'NON1'), ('PFJH', 'GKJC'), ('NON2', 'NON2')) order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in (('NON1', 'NON1'), ('PFJH', 'GKJC'), ('NON2', 'NON2')) order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true;" | jq 'del(.meta,.statistics.elapsed)' echo "complex tuple in case, bf is on. Non existent" -${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in (('NON1', 'NON1'), ('NON2', 'NON2'), ('NON3', 'NON3')) order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in (('NON1', 'NON1'), ('NON2', 'NON2'), ('NON3', 'NON3')) order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true;" | jq 'del(.meta,.statistics.elapsed)' echo "Bloom filter for json column. BF is off" -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where json = '{\"key\":38, \"value\":\"NXONM\"}' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where json = '{\"key\":38, \"value\":\"NXONM\"}' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where json = '{\"key\":38, \"value\":\"NXONM\"}'::JSON order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where json = '{\"key\":38, \"value\":\"NXONM\"}'::JSON order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' echo "Bloom filter for json column. BF is on" -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where json = '{\"key\":38, \"value\":\"NXONM\"}' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where json = '{\"key\":38, \"value\":\"NXONM\"}' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where json = '{\"key\":38, \"value\":\"NXONM\"}'::JSON order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where json = '{\"key\":38, \"value\":\"NXONM\"}'::JSON order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' echo "Bloom filter for ipv4 column. BF is off" -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where ipv4 = IPv4StringToNum('0.0.1.143') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where ipv4 = IPv4StringToNum('0.0.1.143') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where ipv4 = IPv4StringToNum('0.0.1.143') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where ipv4 = IPv4StringToNum('0.0.1.143') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' echo "Bloom filter for ipv4 column. BF is on" -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where ipv4 = IPv4StringToNum('0.0.1.143') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where ipv4 = IPv4StringToNum('0.0.1.143') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where ipv4 = IPv4StringToNum('0.0.1.143') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where ipv4 = IPv4StringToNum('0.0.1.143') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' echo "Bloom filter for ipv4 column. BF is on. Specified in the schema" -${CLICKHOUSE_CLIENT} --query="select ipv4 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv4 IPv4') where ipv4 = toIPv4('0.0.1.143') order by ipv4 asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select ipv4 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv4 IPv4') where ipv4 = toIPv4('0.0.1.143') order by ipv4 asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true;" | jq 'del(.meta,.statistics.elapsed)' echo "Bloom filter on 64 bit column read as ipv4. We explicitly deny it, should read all rg" -${CLICKHOUSE_CLIENT} --query="select uint64_logical from file ('${DATA_FILE_USER_PATH}', Parquet, 'uint64_logical IPv4') where uint64_logical = toIPv4(5552715629697883300) order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select uint64_logical from file ('${DATA_FILE_USER_PATH}', Parquet, 'uint64_logical IPv4') where uint64_logical = toIPv4(5552715629697883300) order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true;" | jq 'del(.meta,.statistics.elapsed)' echo "BF off for parquet uint64 logical type. Should read everything" -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint64_logical=18441251162536403933 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint64_logical=18441251162536403933 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint64_logical=18441251162536403933 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint64_logical=18441251162536403933 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' echo "BF on for parquet uint64 logical type. Uint64 is stored as a signed int 64, but with logical annotation. Make sure a value greater than int64 can be queried" -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint64_logical=18441251162536403933 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint64_logical=18441251162536403933 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint64_logical=18441251162536403933 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint64_logical=18441251162536403933 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' echo "Uint16 is stored as physical type int32 with bidwidth = 16 and sign = false. Make sure a value greater than int16 can be queried. BF is on." -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint16_logical=65528 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint16_logical=65528 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint16_logical=65528 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint16_logical=65528 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' echo "BF off for parquet int8 logical type. Should read everything" -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where int8_logical=-126 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where int8_logical=-126 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where int8_logical=-126 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where int8_logical=-126 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' echo "BF on for parquet int8 logical type. Should skip row groups" -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where int8_logical=-126 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where int8_logical=-126 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_enable_json_parsing=false;" | jq 'del(.meta,.statistics.elapsed)' -${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where int8_logical=-126 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where int8_logical=-126 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_enable_json_parsing=true;" | jq 'del(.meta,.statistics.elapsed)' echo "Invalid column conversion with in operation. String type can not be hashed against parquet int64 physical type. Should read everything" -${CLICKHOUSE_CLIENT} --query="select uint64_logical from file('${DATA_FILE_USER_PATH}', Parquet, 'uint64_logical String') where uint64_logical in ('5') order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select uint64_logical from file('${DATA_FILE_USER_PATH}', Parquet, 'uint64_logical String') where uint64_logical in ('5') order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true;" | jq 'del(.meta,.statistics.elapsed)' echo "Transformations on key column shall not be allowed (=). Should read everything" -${CLICKHOUSE_CLIENT} --query="select uint64_logical from file('${DATA_FILE_USER_PATH}', Parquet) where negate(uint64_logical) = -7711695863945021976 order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select uint64_logical from file('${DATA_FILE_USER_PATH}', Parquet) where negate(uint64_logical) = -7711695863945021976 order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true;" | jq 'del(.meta,.statistics.elapsed)' echo "Transformations on key column shall not be allowed (IN). Should read everything" -${CLICKHOUSE_CLIENT} --query="select uint64_logical from file('${DATA_FILE_USER_PATH}', Parquet) where negate(uint64_logical) in (-7711695863945021976) order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false, input_format_parquet_enable_row_group_prefetch=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select uint64_logical from file('${DATA_FILE_USER_PATH}', Parquet) where negate(uint64_logical) in (-7711695863945021976) order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true;" | jq 'del(.meta,.statistics.elapsed)' rm -rf ${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME:?}/* diff --git a/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.reference b/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.reference index ef9b07ba955f..8db1c1f7db07 100644 --- a/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.reference +++ b/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.reference @@ -1,4 +1,7 @@ -65409 -16 +25000 +25000 +64 128 -2363 +2048 +1024 +2048 diff --git a/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.sql b/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.sql index e6b13510301e..402deba4f8ff 100644 --- a/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.sql +++ b/tests/queries/0_stateless/03164_adapting_parquet_reader_output_size.sql @@ -1,25 +1,25 @@ --- Tags: no-fasttest, no-random-settings +-- Tags: no-fasttest, no-random-settings, no-parallel set max_insert_threads=1; +set schema_inference_make_columns_nullable=0; +set engine_file_truncate_on_insert=1; -DROP TABLE IF EXISTS test_parquet; -CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet); -INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000); -SELECT max(blockSize()) FROM test_parquet; +-- Average string lengths, approximately: 2, 200, 200, 200 +INSERT INTO FUNCTION file('03164_adapting_parquet_reader_output_size.parquet', Parquet, 'short String, long1 String, long2 String, long_low_cardinality String') SELECT number%100, range(cityHash64(number), cityHash64(number)+10), repeat(cityHash64(number)::String, 6+number%10), repeat((number%10)::String, 200+number%10) FROM numbers(25000); -DROP TABLE IF EXISTS test_parquet; -CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_max_block_size=16; -INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000); -SELECT max(blockSize()) FROM test_parquet; +-- Default limits are high, everything goes in one block. +SELECT max(blockSize())+sum(ignore(short, long2)) FROM file('03164_adapting_parquet_reader_output_size.parquet'); +-- Small column doesn't take a lot of bytes, everything goes in one block. +SELECT max(blockSize())+sum(ignore(short)) FROM file('03164_adapting_parquet_reader_output_size.parquet') settings input_format_parquet_prefer_block_bytes=100000; +-- Specific number of rows requested. +SELECT max(blockSize())+sum(ignore(short, long2)) FROM file('03164_adapting_parquet_reader_output_size.parquet') settings input_format_parquet_max_block_size=64; +-- Tiny byte limit, reader bumps block size to 128 rows instead of 1 row. +SELECT max(blockSize())+sum(ignore(short, long2)) FROM file('03164_adapting_parquet_reader_output_size.parquet') settings input_format_parquet_prefer_block_bytes=30; -DROP TABLE IF EXISTS test_parquet; -CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_prefer_block_bytes=30; -INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000); -SELECT max(blockSize()) FROM test_parquet; +-- Intermediate byte limit. The two parquet reader implementations estimate row byte sizes slightly +-- differently it slightly differently and don't match exactly, so we round the result. +SELECT roundToExp2(max(blockSize())+sum(ignore(short, long2))) FROM file('03164_adapting_parquet_reader_output_size.parquet') settings input_format_parquet_prefer_block_bytes=700000; +SELECT roundToExp2(max(blockSize())+sum(ignore(short, long1, long2))) FROM file('03164_adapting_parquet_reader_output_size.parquet') settings input_format_parquet_prefer_block_bytes=700000; -DROP TABLE IF EXISTS test_parquet; -CREATE TABLE test_parquet (col1 String, col2 String, col3 String, col4 String, col5 String, col6 String, col7 String) ENGINE=File(Parquet) settings input_format_parquet_prefer_block_bytes=30720; -INSERT INTO test_parquet SELECT rand(),rand(),rand(),rand(),rand(),rand(),rand() FROM numbers(100000); -SELECT max(blockSize()) FROM test_parquet; - -DROP TABLE IF EXISTS test_parquet; +-- Only the new parquet reader uses correct length estimate for dictionary-encoded strings. +SELECT roundToExp2(max(blockSize())+sum(ignore(short, long_low_cardinality))) FROM file('03164_adapting_parquet_reader_output_size.parquet') settings input_format_parquet_prefer_block_bytes=700000, input_format_parquet_use_native_reader_v3=1;