diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index 5ed5ab179b19..95258a2b320c 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -35,7 +35,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54 static constexpr auto DBMS_CLUSTER_INITIAL_PROCESSING_PROTOCOL_VERSION = 1; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_METADATA = 2; -static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 2; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 3; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 3; static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4; diff --git a/src/Core/Range.cpp b/src/Core/Range.cpp index 139fb8db76c9..6d037d7e9004 100644 --- a/src/Core/Range.cpp +++ b/src/Core/Range.cpp @@ -151,6 +151,13 @@ bool Range::isInfinite() const return left.isNegativeInfinity() && right.isPositiveInfinity(); } +/// [x, x] +bool Range::isPoint() const +{ + return fullBounded() && left_included && right_included && equals(left, right) + && !left.isNegativeInfinity() && !left.isPositiveInfinity(); +} + bool Range::intersectsRange(const Range & r) const { /// r to the left of me. diff --git a/src/Core/Range.h b/src/Core/Range.h index 6072795db0a9..921e1e6aa3f0 100644 --- a/src/Core/Range.h +++ b/src/Core/Range.h @@ -94,6 +94,8 @@ struct Range bool isBlank() const; + bool isPoint() const; + bool intersectsRange(const Range & r) const; bool containsRange(const Range & r) const; diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 009461d08614..bce6eaf3e4a3 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7062,6 +7062,9 @@ DECLARE(Bool, allow_experimental_ytsaurus_dictionary_source, false, R"( )", EXPERIMENTAL) \ DECLARE(Bool, distributed_plan_force_shuffle_aggregation, false, R"( Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distributed query plan. +)", EXPERIMENTAL) \ + DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"( +Allow Iceberg read optimization based on Iceberg metadata. )", EXPERIMENTAL) \ DECLARE(Bool, allow_retries_in_cluster_requests, false, R"( Allow retries in cluster request, when one node goes offline diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index bbee194e29ed..bc2cf3c8f5f4 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -41,11 +41,12 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() /// Note: please check if the key already exists to prevent duplicate entries. addSettingsChanges(settings_changes_history, "25.8.9.2000", { + {"allow_experimental_iceberg_read_optimization", true, true, "New setting."}, {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."}, - {"object_storage_cluster", "", "", "Antalya: New setting"}, - {"object_storage_max_nodes", 0, 0, "Antalya: New setting"}, - {"allow_retries_in_cluster_requests", false, false, "Antalya: New setting"}, + {"object_storage_cluster", "", "", "New setting"}, + {"object_storage_max_nodes", 0, 0, "New setting"}, + {"allow_retries_in_cluster_requests", false, false, "New setting"}, }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index 4b8999cafa44..32a607445891 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -101,6 +102,13 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings return write_settings; } +RelativePathWithMetadata::RelativePathWithMetadata(const DataFileInfo & info, std::optional metadata_) + : metadata(std::move(metadata_)) +{ + relative_path = info.file_path; + file_meta_info = info.file_meta_info; +} + std::string RelativePathWithMetadata::getPathOrPathToArchiveIfArchive() const { if (isArchive()) diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 0712ca0452da..942128df541b 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -107,6 +107,11 @@ struct ObjectMetadata ObjectAttributes attributes; }; + +struct DataFileInfo; +class DataFileMetaInfo; +using DataFileMetaInfoPtr = std::shared_ptr; + struct DataLakeObjectMetadata; struct RelativePathWithMetadata @@ -134,19 +139,24 @@ struct RelativePathWithMetadata std::optional metadata; /// Delta lake related object metadata. std::optional data_lake_metadata; + /// Information about columns + std::optional file_meta_info; /// Retry request after short pause CommandInTaskResponse command; RelativePathWithMetadata() = default; - explicit RelativePathWithMetadata(const String & task_string, std::optional metadata_ = std::nullopt) - : metadata(std::move(metadata_)) - , command(task_string) + explicit RelativePathWithMetadata(String command_or_path, std::optional metadata_ = std::nullopt) + : relative_path(std::move(command_or_path)) + , metadata(std::move(metadata_)) + , command(relative_path) { - if (!command.is_parsed()) - relative_path = task_string; + if (command.is_parsed()) + relative_path = ""; } + explicit RelativePathWithMetadata(const DataFileInfo & info, std::optional metadata_ = std::nullopt); + RelativePathWithMetadata(const RelativePathWithMetadata & other) = default; virtual ~RelativePathWithMetadata() = default; @@ -158,6 +168,9 @@ struct RelativePathWithMetadata virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } virtual std::string getPathOrPathToArchiveIfArchive() const; + void setFileMetaInfo(std::optional file_meta_info_ ) { file_meta_info = file_meta_info_; } + std::optional getFileMetaInfo() const { return file_meta_info; } + const CommandInTaskResponse & getCommand() const { return command; } }; diff --git a/src/Interpreters/ClusterFunctionReadTask.cpp b/src/Interpreters/ClusterFunctionReadTask.cpp index 9a91f8549cfa..9870ef6b65bf 100644 --- a/src/Interpreters/ClusterFunctionReadTask.cpp +++ b/src/Interpreters/ClusterFunctionReadTask.cpp @@ -29,6 +29,8 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o if (object->data_lake_metadata.has_value()) data_lake_metadata = object->data_lake_metadata.value(); + file_meta_info = object->file_meta_info; + const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes]; path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath(); } @@ -45,6 +47,7 @@ ObjectInfoPtr ClusterFunctionReadTaskResponse::getObjectInfo() const auto object = std::make_shared(path); object->data_lake_metadata = data_lake_metadata; + object->file_meta_info = file_meta_info; return object; } @@ -61,6 +64,14 @@ void ClusterFunctionReadTaskResponse::serialize(WriteBuffer & out, size_t protoc else ActionsDAG().serialize(out, registry); } + + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA) + { + if (file_meta_info.has_value()) + file_meta_info.value()->serialize(out); + else + DataFileMetaInfo().serialize(out); + } } void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in) @@ -87,6 +98,14 @@ void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in) data_lake_metadata.transform = std::move(transform); } } + + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA) + { + auto info = std::make_shared(DataFileMetaInfo::deserialize(in)); + + if (!path.empty() && !info->empty()) + file_meta_info = info; + } } } diff --git a/src/Interpreters/ClusterFunctionReadTask.h b/src/Interpreters/ClusterFunctionReadTask.h index 21e3238e0fb9..ba0fd799ab80 100644 --- a/src/Interpreters/ClusterFunctionReadTask.h +++ b/src/Interpreters/ClusterFunctionReadTask.h @@ -20,6 +20,8 @@ struct ClusterFunctionReadTaskResponse String path; /// Object metadata path, in case of data lake object. DataLakeObjectMetadata data_lake_metadata; + /// File's columns info + std::optional file_meta_info; /// Convert received response into ObjectInfo. ObjectInfoPtr getObjectInfo() const; diff --git a/src/Processors/Chunk.cpp b/src/Processors/Chunk.cpp index e27762f53dc4..34402cd58249 100644 --- a/src/Processors/Chunk.cpp +++ b/src/Processors/Chunk.cpp @@ -108,7 +108,12 @@ void Chunk::addColumn(ColumnPtr column) void Chunk::addColumn(size_t position, ColumnPtr column) { - if (position >= columns.size()) + if (position == columns.size()) + { + addColumn(column); + return; + } + if (position > columns.size()) throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND, "Position {} out of bound in Chunk::addColumn(), max position = {}", position, !columns.empty() ? columns.size() - 1 : 0); diff --git a/src/Processors/Sources/ConstChunkGenerator.h b/src/Processors/Sources/ConstChunkGenerator.h index 8232bd5f0dff..ddb30d2476f6 100644 --- a/src/Processors/Sources/ConstChunkGenerator.h +++ b/src/Processors/Sources/ConstChunkGenerator.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB @@ -13,7 +14,7 @@ class ConstChunkGenerator : public ISource public: ConstChunkGenerator(SharedHeader header, size_t total_num_rows, size_t max_block_size_) : ISource(std::move(header)) - , remaining_rows(total_num_rows), max_block_size(max_block_size_) + , generated_rows(0), remaining_rows(total_num_rows), max_block_size(max_block_size_) { } @@ -27,10 +28,14 @@ class ConstChunkGenerator : public ISource size_t num_rows = std::min(max_block_size, remaining_rows); remaining_rows -= num_rows; - return cloneConstWithDefault(Chunk{getPort().getHeader().getColumns(), 0}, num_rows); + auto chunk = cloneConstWithDefault(Chunk{getPort().getHeader().getColumns(), 0}, num_rows); + chunk.getChunkInfos().add(std::make_shared(generated_rows)); + generated_rows += num_rows; + return chunk; } private: + size_t generated_rows; size_t remaining_rows; size_t max_block_size; }; diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp index 92c40ae695d7..4a1a09969d99 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp @@ -1,9 +1,18 @@ #include #include +#include +#include +#include +#include namespace DB { +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; +}; + namespace { @@ -87,4 +96,109 @@ ReadFromFormatInfo IDataLakeMetadata::prepareReadingFromFormat( return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, context, supports_subset_of_columns, supports_tuple_elements); } +DataFileMetaInfo::DataFileMetaInfo( + const Iceberg::IcebergSchemaProcessor & schema_processor, + Int32 schema_id, + const std::unordered_map & columns_info_) +{ + + std::vector column_ids; + for (const auto & column : columns_info_) + column_ids.push_back(column.first); + + auto name_and_types = schema_processor.tryGetFieldsCharacteristics(schema_id, column_ids); + std::unordered_map name_by_index; + for (const auto & name_and_type : name_and_types) + { + const auto name = name_and_type.getNameInStorage(); + auto index = schema_processor.tryGetColumnIDByName(schema_id, name); + if (index.has_value()) + name_by_index[index.value()] = name; + } + + for (const auto & column : columns_info_) + { + auto i_name = name_by_index.find(column.first); + if (i_name != name_by_index.end()) + { + columns_info[i_name->second] = {column.second.rows_count, column.second.nulls_count, column.second.hyperrectangle}; + } + } +} + +constexpr size_t FIELD_MASK_ROWS = 0x1; +constexpr size_t FIELD_MASK_NULLS = 0x2; +constexpr size_t FIELD_MASK_RECT = 0x4; +constexpr size_t FIELD_MASK_ALL = 0x7; + +void DataFileMetaInfo::serialize(WriteBuffer & out) const +{ + auto size = columns_info.size(); + writeIntBinary(size, out); + for (const auto & column : columns_info) + { + writeStringBinary(column.first, out); + size_t field_mask = 0; + if (column.second.rows_count.has_value()) + field_mask |= FIELD_MASK_ROWS; + if (column.second.rows_count.has_value()) + field_mask |= FIELD_MASK_NULLS; + if (column.second.rows_count.has_value()) + field_mask |= FIELD_MASK_RECT; + writeIntBinary(field_mask, out); + + if (column.second.rows_count.has_value()) + writeIntBinary(column.second.rows_count.value(), out); + if (column.second.nulls_count.has_value()) + writeIntBinary(column.second.nulls_count.value(), out); + if (column.second.hyperrectangle.has_value()) + { + writeFieldBinary(column.second.hyperrectangle.value().left, out); + writeFieldBinary(column.second.hyperrectangle.value().right, out); + } + } +} + +DataFileMetaInfo DataFileMetaInfo::deserialize(ReadBuffer & in) +{ + DataFileMetaInfo result; + + size_t size; + readIntBinary(size, in); + + for (size_t i = 0; i < size; ++i) + { + std::string name; + readStringBinary(name, in); + size_t field_mask; + readIntBinary(field_mask, in); + if ((field_mask & FIELD_MASK_ALL) != field_mask) + throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected field mask: {}", field_mask); + + ColumnInfo & column = result.columns_info[name]; + + if (field_mask & FIELD_MASK_ROWS) + { + Int64 value; + readIntBinary(value, in); + column.rows_count = value; + } + if (field_mask & FIELD_MASK_NULLS) + { + Int64 value; + readIntBinary(value, in); + column.nulls_count = value; + } + if (field_mask & FIELD_MASK_RECT) + { + FieldRef left = readFieldBinary(in); + FieldRef right = readFieldBinary(in); + column.hyperrectangle = Range(left, true, right, true); + } + } + + return result; +} + + } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 962e965574b4..498d4f3b5803 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -15,6 +16,8 @@ #include #include +#include + namespace DataLake { @@ -27,7 +30,58 @@ namespace DB namespace ErrorCodes { extern const int UNSUPPORTED_METHOD; -} +}; + +namespace Iceberg +{ +struct ColumnInfo; +}; + +class DataFileMetaInfo +{ +public: + DataFileMetaInfo() = default; + + // subset of Iceberg::ColumnInfo now + struct ColumnInfo + { + std::optional rows_count; + std::optional nulls_count; + std::optional hyperrectangle; + }; + + // Extract metadata from Iceberg structure + explicit DataFileMetaInfo( + const Iceberg::IcebergSchemaProcessor & schema_processor, + Int32 schema_id, + const std::unordered_map & columns_info_); + + void serialize(WriteBuffer & out) const; + static DataFileMetaInfo deserialize(ReadBuffer & in); + + bool empty() const { return columns_info.empty(); } + + std::unordered_map columns_info; +}; + +using DataFileMetaInfoPtr = std::shared_ptr; + +struct DataFileInfo +{ + std::string file_path; + std::optional file_meta_info; + + explicit DataFileInfo(const std::string & file_path_) + : file_path(file_path_) {} + + explicit DataFileInfo(std::string && file_path_) + : file_path(std::move(file_path_)) {} + + bool operator==(const DataFileInfo & rhs) const + { + return file_path == rhs.file_path; + } +}; class SinkToStorage; using SinkToStoragePtr = std::shared_ptr; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp index 4e1d56234010..dddb95d9d568 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp @@ -299,6 +299,8 @@ IcebergIterator::IcebergIterator( , callback(std::move(callback_)) , format(configuration_.lock()->getFormat()) , compression_method(configuration_.lock()->getCompressionMethod()) + , persistent_components(persistent_components_) + , table_schema_id(table_snapshot_->schema_id) { auto delete_file = deletes_iterator.next(); while (delete_file.has_value()) @@ -333,6 +335,10 @@ ObjectInfoPtr IcebergIterator::next(size_t) { object_info->addEqualityDeleteObject(equality_delete); } + object_info->setFileMetaInfo(std::make_shared( + *persistent_components.schema_processor, + table_schema_id, /// current schema id to use current column names + manifest_file_entry.columns_infos)); return object_info; } { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h index 42fe16e437df..03fed84ba6e0 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h @@ -90,7 +90,7 @@ class IcebergIterator : public IObjectIterator IDataLakeMetadata::FileProgressCallback callback_, Iceberg::IcebergTableStateSnapshotPtr table_snapshot_, Iceberg::IcebergDataSnapshotPtr data_snapshot_, - Iceberg::PersistentTableComponents persistent_components); + Iceberg::PersistentTableComponents persistent_components_); ObjectInfoPtr next(size_t) override; @@ -111,6 +111,8 @@ class IcebergIterator : public IObjectIterator std::vector equality_deletes_files; std::exception_ptr exception; std::mutex exception_mutex; + Iceberg::PersistentTableComponents persistent_components; + Int32 table_schema_id; }; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index eef717dc37f4..0cadc66c6d76 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -822,12 +822,14 @@ void IcebergMetadata::addDeleteTransformers( if (!iceberg_object_info->position_deletes_objects.empty()) { + LOG_DEBUG(log, "Constructing filter transform for position delete, there are {} delete objects", iceberg_object_info->position_deletes_objects.size()); builder.addSimpleTransform( [&](const SharedHeader & header) { return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, local_context); }); } const auto & delete_files = iceberg_object_info->equality_deletes_objects; - LOG_DEBUG(log, "Constructing filter transform for equality delete, there are {} delete files", delete_files.size()); + if (!delete_files.empty()) + LOG_DEBUG(log, "Constructing filter transform for equality delete, there are {} delete files", delete_files.size()); for (const ManifestFileEntry & delete_file : delete_files) { auto simple_transform_adder = [&](const SharedHeader & header) @@ -969,6 +971,7 @@ ColumnMapperPtr IcebergMetadata::getColumnMapperForCurrentSchema() const SharedLockGuard lock(mutex); return persistent_components.schema_processor->getColumnMapperById(relevant_snapshot_schema_id); } + } #endif diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index cbfb37f2802e..f780d9d870cd 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -66,6 +66,7 @@ namespace Setting extern const SettingsBool use_iceberg_partition_pruning; extern const SettingsBool cluster_function_process_archive_on_multiple_nodes; extern const SettingsBool table_engine_read_through_distributed_cache; + extern const SettingsBool allow_experimental_iceberg_read_optimization; } namespace ErrorCodes @@ -273,7 +274,6 @@ void StorageObjectStorageSource::lazyInitialize() Chunk StorageObjectStorageSource::generate() { - lazyInitialize(); while (true) @@ -331,6 +331,16 @@ Chunk StorageObjectStorageSource::generate() .data_lake_snapshot_version = file_iterator->getSnapshotVersion()}, read_context); + /// Not empty when allow_experimental_iceberg_read_optimization=true + /// and some columns were removed from read list as columns with constant values. + /// Restore data for these columns. + for (const auto & constant_column : reader.constant_columns_with_values) + { + chunk.addColumn(constant_column.first, + constant_column.second.name_and_type.type->createColumnConst( + chunk.getNumRows(), constant_column.second.value)); + } + #if USE_PARQUET && USE_AWS_S3 if (chunk_size && chunk.hasColumns()) { @@ -496,9 +506,17 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade QueryPipelineBuilder builder; std::shared_ptr source; std::unique_ptr read_buf; + std::optional rows_count_from_metadata; auto try_get_num_rows_from_cache = [&]() -> std::optional { + if (rows_count_from_metadata.has_value()) + { + /// Must be non negative here + size_t value = rows_count_from_metadata.value(); + return value; + } + if (!schema_cache) return std::nullopt; @@ -517,6 +535,128 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade return schema_cache->tryGetNumRows(cache_key, get_last_mod_time); }; + /// List of columns with constant value in current file, and values + std::map constant_columns_with_values; + std::unordered_set constant_columns; + + NamesAndTypesList requested_columns_copy = read_from_format_info.requested_columns; + + std::unordered_map> requested_columns_list; + { + size_t column_index = 0; + for (const auto & column : requested_columns_copy) + requested_columns_list[column.getNameInStorage()] = std::make_pair(column_index++, column); + } + + if (context_->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization]) + { + auto file_meta_data = object_info->getFileMetaInfo(); + if (file_meta_data.has_value()) + { + bool is_all_rows_count_equals = true; + for (const auto & column : file_meta_data.value()->columns_info) + { + if (is_all_rows_count_equals && column.second.rows_count.has_value()) + { + if (rows_count_from_metadata.has_value()) + { + if (column.second.rows_count.value() != rows_count_from_metadata.value()) + { + LOG_WARNING(log, "Inconsistent rows count for file {} in metadats, ignored", object_info->getPath()); + is_all_rows_count_equals = false; + rows_count_from_metadata = std::nullopt; + } + } + else if (column.second.rows_count.value() < 0) + { + LOG_WARNING(log, "Negative rows count for file {} in metadats, ignored", object_info->getPath()); + is_all_rows_count_equals = false; + rows_count_from_metadata = std::nullopt; + } + else + rows_count_from_metadata = column.second.rows_count; + } + + if (column.second.hyperrectangle.has_value()) + { + auto column_name = column.first; + + auto i_column = requested_columns_list.find(column_name); + if (i_column == requested_columns_list.end()) + continue; + + if (column.second.hyperrectangle.value().isPoint() && + (!column.second.nulls_count.has_value() || column.second.nulls_count.value() <= 0)) + { + /// isPoint() method checks before that left==right + constant_columns_with_values[i_column->second.first] = + ConstColumnWithValue{ + i_column->second.second, + column.second.hyperrectangle.value().left + }; + constant_columns.insert(column_name); + + LOG_DEBUG(log, "In file {} constant column '{}' type '{}' with value '{}'", + object_info->getPath(), + column_name, + i_column->second.second.type, + column.second.hyperrectangle.value().left.dump()); + } + else if (column.second.rows_count.has_value() && column.second.nulls_count.has_value() + && column.second.rows_count.value() == column.second.nulls_count.value() + && i_column->second.second.type->isNullable()) + { + constant_columns_with_values[i_column->second.first] = + ConstColumnWithValue{ + i_column->second.second, + Field() + }; + constant_columns.insert(column_name); + + LOG_DEBUG(log, "In file {} constant column '{}' type '{}' with value 'NULL'", + object_info->getPath(), + column_name, + i_column->second.second.type); + } + } + } + for (const auto & column : requested_columns_list) + { + const auto & column_name = column.first; + + if (file_meta_data.value()->columns_info.contains(column_name)) + continue; + + if (!column.second.second.type->isNullable()) + continue; + + /// Column is nullable and absent in file + constant_columns_with_values[column.second.first] = + ConstColumnWithValue{ + column.second.second, + Field() + }; + constant_columns.insert(column_name); + + LOG_DEBUG(log, "In file {} constant column '{}' type '{}' with value 'NULL'", + object_info->getPath(), + column_name, + column.second.second.type); + } + } + + if (!constant_columns.empty()) + { + size_t original_columns = requested_columns_copy.size(); + requested_columns_copy = requested_columns_copy.eraseNames(constant_columns); + if (requested_columns_copy.size() + constant_columns.size() != original_columns) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't remove constant columns for file {} correct, fallback to read. Founded constant columns: [{}]", + object_info->getPath(), constant_columns); + if (requested_columns_copy.empty()) + need_only_count = true; + } + } + std::optional num_rows_from_cache = need_only_count && context_->getSettingsRef()[Setting::use_cache_for_count_from_files] ? try_get_num_rows_from_cache() : std::nullopt; @@ -534,6 +674,8 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade columns.emplace_back(type->createColumn(), type, name); builder.init(Pipe(std::make_shared( std::make_shared(columns), *num_rows_from_cache, max_block_size))); + if (!constant_columns.empty()) + configuration->addDeleteTransformers(object_info, builder, format_settings, context_); } else { @@ -635,7 +777,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade /// from chunk read by IInputFormat. builder.addSimpleTransform([&](const SharedHeader & header) { - return std::make_shared(header, read_from_format_info.requested_columns); + return std::make_shared(header, requested_columns_copy); }); auto pipeline = std::make_unique(QueryPipelineBuilder::getPipeline(std::move(builder))); @@ -644,7 +786,12 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade ProfileEvents::increment(ProfileEvents::EngineFileLikeReadFiles); return ReaderHolder( - object_info, std::move(read_buf), std::move(source), std::move(pipeline), std::move(current_reader)); + object_info, + std::move(read_buf), + std::move(source), + std::move(pipeline), + std::move(current_reader), + std::move(constant_columns_with_values)); } std::future StorageObjectStorageSource::createReaderAsync() @@ -1036,12 +1183,14 @@ StorageObjectStorageSource::ReaderHolder::ReaderHolder( std::unique_ptr read_buf_, std::shared_ptr source_, std::unique_ptr pipeline_, - std::unique_ptr reader_) + std::unique_ptr reader_, + std::map && constant_columns_with_values_) : object_info(std::move(object_info_)) , read_buf(std::move(read_buf_)) , source(std::move(source_)) , pipeline(std::move(pipeline_)) , reader(std::move(reader_)) + , constant_columns_with_values(std::move(constant_columns_with_values_)) { } @@ -1055,6 +1204,7 @@ StorageObjectStorageSource::ReaderHolder::operator=(ReaderHolder && other) noexc source = std::move(other.source); read_buf = std::move(other.read_buf); object_info = std::move(other.object_info); + constant_columns_with_values = std::move(other.constant_columns_with_values); return *this; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index 8145105a09e2..ead105e44753 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -87,6 +87,12 @@ class StorageObjectStorageSource : public ISource size_t total_rows_in_file = 0; LoggerPtr log = getLogger("StorageObjectStorageSource"); + struct ConstColumnWithValue + { + NameAndTypePair name_and_type; + Field value; + }; + struct ReaderHolder : private boost::noncopyable { public: @@ -95,7 +101,8 @@ class StorageObjectStorageSource : public ISource std::unique_ptr read_buf_, std::shared_ptr source_, std::unique_ptr pipeline_, - std::unique_ptr reader_); + std::unique_ptr reader_, + std::map && constant_columns_with_values_); ReaderHolder() = default; ReaderHolder(ReaderHolder && other) noexcept { *this = std::move(other); } @@ -114,6 +121,9 @@ class StorageObjectStorageSource : public ISource std::shared_ptr source; std::unique_ptr pipeline; std::unique_ptr reader; + + public: + std::map constant_columns_with_values; }; ReaderHolder reader; @@ -247,7 +257,7 @@ class StorageObjectStorageSource::KeysIterator : public IObjectIterator const ObjectStoragePtr object_storage; const NamesAndTypesList virtual_columns; const std::function file_progress_callback; - const std::vector keys; + const Strings keys; std::atomic index = 0; bool ignore_non_existent_files; bool skip_object_metadata; diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index eee2b36562f6..1f2c1969a91d 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -2004,6 +2004,7 @@ def check_validity_and_get_prunned_files(select_expression): == 1 ) + @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) def test_explicit_metadata_file(started_cluster, storage_type): instance = started_cluster.instances["node1"] @@ -2048,6 +2049,7 @@ def test_explicit_metadata_file(started_cluster, storage_type): with pytest.raises(Exception): create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="../metadata/v11.metadata.json") + @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) def test_minmax_pruning_with_null(started_cluster, storage_type): instance = started_cluster.instances["node1"] @@ -3283,3 +3285,207 @@ def verify_result_dictionary(diction : dict, allowed_content_types : set): except: print("Dictionary: {}, Allowed Content Types: {}".format(diction, allowed_content_types)) raise + + +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_read_constant_columns_optimization(started_cluster, storage_type, run_on_cluster): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_read_constant_columns_optimization_" + storage_type + "_" + get_uuid_str() + + def execute_spark_query(query: str): + return execute_spark_query_general( + spark, + started_cluster, + storage_type, + TABLE_NAME, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + date DATE, + date2 DATE, + name VARCHAR(50), + number BIGINT + ) + USING iceberg + PARTITIONED BY (identity(tag), years(date)) + OPTIONS('format-version'='2') + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, DATE '2024-01-20', DATE '2024-01-20', 'vasya', 5), + (1, DATE '2024-01-20', DATE '2024-01-20', 'vasilisa', 5), + (1, DATE '2025-01-20', DATE '2025-01-20', 'vasya', 5), + (1, DATE '2025-01-20', DATE '2025-01-20', 'vasya', 5), + (2, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa', 5), + (2, DATE '2025-01-21', DATE '2025-01-20', 'vasilisa', 5) + """ + ) + + execute_spark_query( + f""" + ALTER TABLE {TABLE_NAME} ALTER COLUMN number FIRST; + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (5, 3, DATE '2025-01-20', DATE '2024-01-20', 'vasilisa'), + (5, 3, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa') + """ + ) + + execute_spark_query( + f""" + ALTER TABLE {TABLE_NAME} RENAME COLUMN name TO name_old; + """ + ) + + execute_spark_query( + f""" + ALTER TABLE {TABLE_NAME} + ADD COLUMNS ( + name string + ); + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (5, 4, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg'), + (5, 4, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa', 'iceberg'), + (5, 5, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg'), + (5, 5, DATE '2025-01-20', DATE '2024-01-20', 'vasilisa', 'icebreaker'), + (5, 6, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg'), + (5, 6, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg') + """ + ) + + # Totally must be 7 files + # Partitioned column 'tag' is constant in each file + # Column 'date' is constant in 6 files, has different values in (2-2025) + # Column 'date2' is constant in 4 files (1-2024, 2-2025, 5-2025, 6-2025) + # Column 'name_old' is constant in 3 files (1-2025, 2-2025 as 'name', 6-2025 as 'name_old') + # Column 'number' is globally constant + # New column 'name2' is present only in 3 files (4-2025, 5-2025, 6-2025), constant in two (4-2025, 6-2025) + # Files 1-2025 and 6-2025 have only constant columns + + creation_expression = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster + ) + + # Warm up metadata cache + for replica in started_cluster.instances.values(): + replica.query(f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0") + + all_data_expected_query_id = str(uuid.uuid4()) + all_data_expected = instance.query( + f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=all_data_expected_query_id, + ) + const_only_expected_query_id = str(uuid.uuid4()) + const_only_expected = instance.query( + f"SELECT tag, number FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=const_only_expected_query_id, + ) + const_partial_expected_query_id = str(uuid.uuid4()) + const_partial_expected = instance.query( + f"SELECT tag, date2, number, name_old FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=const_partial_expected_query_id, + ) + const_partial2_expected_query_id = str(uuid.uuid4()) + const_partial2_expected = instance.query( + f"SELECT tag, date2, number, name FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=const_partial2_expected_query_id, + ) + count_expected_query_id = str(uuid.uuid4()) + count_expected = instance.query( + f"SELECT count(),tag FROM {creation_expression} GROUP BY ALL ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=count_expected_query_id, + ) + + all_data_query_id = str(uuid.uuid4()) + all_data_optimized = instance.query( + f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=all_data_query_id, + ) + const_only_query_id = str(uuid.uuid4()) + const_only_optimized = instance.query( + f"SELECT tag, number FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=const_only_query_id, + ) + const_partial_query_id = str(uuid.uuid4()) + const_partial_optimized = instance.query( + f"SELECT tag, date2, number, name_old FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=const_partial_query_id, + ) + const_partial2_query_id = str(uuid.uuid4()) + const_partial2_optimized = instance.query( + f"SELECT tag, date2, number, name FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=const_partial2_query_id, + ) + count_query_id = str(uuid.uuid4()) + count_optimized = instance.query( + f"SELECT count(),tag FROM {creation_expression} GROUP BY ALL ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=count_query_id, + ) + + assert all_data_expected == all_data_optimized + assert const_only_expected == const_only_optimized + assert const_partial_expected == const_partial_optimized + assert const_partial2_expected == const_partial2_optimized + assert count_expected == count_optimized + + for replica in started_cluster.instances.values(): + replica.query("SYSTEM FLUSH LOGS") + + def check_events(query_id, event, expected): + res = instance.query( + f""" + SELECT + sum(tupleElement(arrayJoin(ProfileEvents),2)) as value + FROM + clusterAllReplicas('cluster_simple', system.query_log) + WHERE + type='QueryFinish' + AND tupleElement(arrayJoin(ProfileEvents),1)='{event}' + AND initial_query_id='{query_id}' + GROUP BY ALL + FORMAT CSV + """) + assert int(res) == expected + + event = "S3GetObject" if storage_type == "s3" else "AzureGetObject" + + # Without optimization clickhouse reads all 7 files + check_events(all_data_expected_query_id, event, 7) + check_events(const_only_expected_query_id, event, 7) + check_events(const_partial_expected_query_id, event, 7) + check_events(const_partial2_expected_query_id, event, 7) + check_events(count_expected_query_id, event, 7) + + # If file has only constant columns it is not read + check_events(all_data_query_id, event, 5) # 1-2025, 6-2025 must not be read + check_events(const_only_query_id, event, 0) # All must not be read + check_events(const_partial_query_id, event, 4) # 1-2025, 6-2025 and 2-2025 must not be read + check_events(const_partial2_query_id, event, 3) # 6-2025 must not be read, 1-2024, 1-2025, 2-2025 don't have new column 'name' + check_events(count_query_id, event, 0) # All must not be read + + def compare_selects(query): + result_expected = instance.query(f"{query} SETTINGS allow_experimental_iceberg_read_optimization=0") + result_optimized = instance.query(f"{query} SETTINGS allow_experimental_iceberg_read_optimization=1") + assert result_expected == result_optimized + + compare_selects(f"SELECT _path,* FROM {creation_expression} ORDER BY ALL") + compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE name_old='vasily' ORDER BY ALL") + compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE ((tag + length(name_old)) % 2 = 1) ORDER BY ALL") diff --git a/tests/queries/0_stateless/03413_experimental_settings_cannot_be_enabled_by_default.sql b/tests/queries/0_stateless/03413_experimental_settings_cannot_be_enabled_by_default.sql index 718eb63ad923..26048ffe5694 100644 --- a/tests/queries/0_stateless/03413_experimental_settings_cannot_be_enabled_by_default.sql +++ b/tests/queries/0_stateless/03413_experimental_settings_cannot_be_enabled_by_default.sql @@ -4,5 +4,9 @@ -- However, some settings in the experimental tier are meant to control another experimental feature, and then they can be enabled as long as the feature itself is disabled. -- These are in the exceptions list inside NOT IN. -SELECT name, value FROM system.settings WHERE tier = 'Experimental' AND type = 'Bool' AND value != '0' AND name NOT IN ('throw_on_unsupported_query_inside_transaction'); +SELECT name, value FROM system.settings WHERE tier = 'Experimental' AND type = 'Bool' AND value != '0' AND name NOT IN ( + 'throw_on_unsupported_query_inside_transaction', +-- turned ON for Altinity Antalya builds specifically + 'allow_experimental_iceberg_read_optimization' +); SELECT name, value FROM system.merge_tree_settings WHERE tier = 'Experimental' AND type = 'Bool' AND value != '0' AND name NOT IN ('remove_rolled_back_parts_immediately');