diff --git a/src/Core/Range.cpp b/src/Core/Range.cpp index 139fb8db76c9..2f941d2e5840 100644 --- a/src/Core/Range.cpp +++ b/src/Core/Range.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -9,6 +10,12 @@ namespace DB { +namespace ErrorCodes +{ +extern const int INCORRECT_DATA; +}; + + FieldRef::FieldRef(ColumnsWithTypeAndName * columns_, size_t row_idx_, size_t column_idx_) : Field((*(*columns_)[column_idx_].column)[row_idx_]), columns(columns_), row_idx(row_idx_), column_idx(column_idx_) { @@ -151,6 +158,13 @@ bool Range::isInfinite() const return left.isNegativeInfinity() && right.isPositiveInfinity(); } +/// [x, x] +bool Range::isPoint() const +{ + return fullBounded() && left_included && right_included && equals(left, right) + && !left.isNegativeInfinity() && !left.isPositiveInfinity(); +} + bool Range::intersectsRange(const Range & r) const { /// r to the left of me. @@ -332,6 +346,29 @@ String Range::toString() const return str.str(); } +String Range::serialize() const +{ + WriteBufferFromOwnString str; + + str << left_included << right_included; + writeFieldBinary(left, str); + writeFieldBinary(right, str); + + return str.str(); +} + +void Range::deserialize(const String & range) +{ + if (range.empty()) + throw Exception(ErrorCodes::INCORRECT_DATA, "Empty range dump"); + + ReadBufferFromOwnString str(range); + + str >> left_included >> right_included; + left = readFieldBinary(str); + right = readFieldBinary(str); +} + Hyperrectangle intersect(const Hyperrectangle & a, const Hyperrectangle & b) { size_t result_size = std::min(a.size(), b.size()); diff --git a/src/Core/Range.h b/src/Core/Range.h index 6072795db0a9..12463d448945 100644 --- a/src/Core/Range.h +++ b/src/Core/Range.h @@ -94,6 +94,8 @@ struct Range bool isBlank() const; + bool isPoint() const; + bool intersectsRange(const Range & r) const; bool containsRange(const Range & r) const; @@ -114,6 +116,9 @@ struct Range bool nearByWith(const Range & r) const; String toString() const; + + String serialize() const; + void deserialize(const String & range); }; Range intersect(const Range & a, const Range & b); diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 023c7951fc91..71fb52bff1c4 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6908,6 +6908,9 @@ Allow retries in cluster request, when one node goes offline )", EXPERIMENTAL) \ DECLARE(Bool, object_storage_remote_initiator, false, R"( Execute request to object storage as remote on one of object_storage_cluster nodes. +)", EXPERIMENTAL) \ + DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"( +Allow Iceberg read optimization based on Iceberg metadata. )", EXPERIMENTAL) \ \ /** Experimental timeSeries* aggregate functions. */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 714ac3796000..0d1dd9cf0835 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -79,6 +79,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"object_storage_remote_initiator", false, false, "New setting."}, {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, + {"allow_experimental_iceberg_read_optimization", true, true, "New setting."}, {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.6", diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index 2e674d41e9a3..d126bae13da4 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -8,6 +8,9 @@ #include #include +/// TODO: move DataFileInfo into separate file +#include + #include #include #include @@ -101,6 +104,28 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings return write_settings; } +RelativePathWithMetadata::RelativePathWithMetadata(const String & task_string, std::optional metadata_) + : metadata(std::move(metadata_)) + , command(task_string) +{ + if (!command.isParsed()) + relative_path = task_string; + else + { + auto file_path = command.getFilePath(); + if (file_path.has_value()) + relative_path = file_path.value(); + file_meta_info = command.getFileMetaInfo(); + } +} + +RelativePathWithMetadata::RelativePathWithMetadata(const DataFileInfo & info, std::optional metadata_) + : metadata(std::move(metadata_)) +{ + relative_path = info.file_path; + file_meta_info = info.file_meta_info; +} + void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file) { if (!metadata) @@ -129,8 +154,12 @@ RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std successfully_parsed = true; + if (json->has("file_path")) + file_path = json->getValue("file_path"); if (json->has("retry_after_us")) retry_after_us = json->getValue("retry_after_us"); + if (json->has("meta_info")) + file_meta_info = std::make_shared(json->getObject("meta_info")); } catch (const Poco::JSON::JSONException &) { /// Not a JSON @@ -138,11 +167,16 @@ RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std } } -std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const +std::string RelativePathWithMetadata::CommandInTaskResponse::toString() const { Poco::JSON::Object json; + + if (file_path.has_value()) + json.set("file_path", file_path.value()); if (retry_after_us.has_value()) json.set("retry_after_us", retry_after_us.value()); + if (file_meta_info.has_value()) + json.set("meta_info", file_meta_info.value()->toJson()); std::ostringstream oss; oss.exceptions(std::ios::failbit); diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 12f041234c46..c455bdd449cd 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -101,6 +102,10 @@ struct ObjectMetadata ObjectAttributes attributes; }; +struct DataFileInfo; +class DataFileMetaInfo; +using DataFileMetaInfoPtr = std::shared_ptr; + struct RelativePathWithMetadata { class CommandInTaskResponse @@ -109,31 +114,35 @@ struct RelativePathWithMetadata CommandInTaskResponse() = default; explicit CommandInTaskResponse(const std::string & task); - bool is_parsed() const { return successfully_parsed; } - void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; } + bool isParsed() const { return successfully_parsed; } + void setFilePath(const std::string & file_path_ ) { file_path = file_path_; } + void setRetryAfterUs(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; } + void setFileMetaInfo(DataFileMetaInfoPtr file_meta_info_ ) { file_meta_info = file_meta_info_; } + + std::string toString() const; + + std::optional getFilePath() const { return file_path; } - std::string to_string() const; + std::optional getRetryAfterUs() const { return retry_after_us; } - std::optional get_retry_after_us() const { return retry_after_us; } + std::optional getFileMetaInfo() const { return file_meta_info; } private: bool successfully_parsed = false; + std::optional file_path; std::optional retry_after_us; + std::optional file_meta_info; }; String relative_path; std::optional metadata; CommandInTaskResponse command; + std::optional file_meta_info; RelativePathWithMetadata() = default; - explicit RelativePathWithMetadata(const String & task_string, std::optional metadata_ = std::nullopt) - : metadata(std::move(metadata_)) - , command(task_string) - { - if (!command.is_parsed()) - relative_path = task_string; - } + explicit RelativePathWithMetadata(const String & task_string, std::optional metadata_ = std::nullopt); + explicit RelativePathWithMetadata(const DataFileInfo & info, std::optional metadata_ = std::nullopt); virtual ~RelativePathWithMetadata() = default; @@ -145,6 +154,10 @@ struct RelativePathWithMetadata virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } + void setFileMetaInfo(DataFileMetaInfoPtr file_meta_info_ ) { file_meta_info = file_meta_info_; } + void setFileMetaInfo(std::optional file_meta_info_ ) { file_meta_info = file_meta_info_; } + std::optional getFileMetaInfo() const { return file_meta_info; } + void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file); const CommandInTaskResponse & getCommand() const { return command; } }; diff --git a/src/Processors/Chunk.cpp b/src/Processors/Chunk.cpp index e27762f53dc4..34402cd58249 100644 --- a/src/Processors/Chunk.cpp +++ b/src/Processors/Chunk.cpp @@ -108,7 +108,12 @@ void Chunk::addColumn(ColumnPtr column) void Chunk::addColumn(size_t position, ColumnPtr column) { - if (position >= columns.size()) + if (position == columns.size()) + { + addColumn(column); + return; + } + if (position > columns.size()) throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND, "Position {} out of bound in Chunk::addColumn(), max position = {}", position, !columns.empty() ? columns.size() - 1 : 0); diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 4768936e3627..d70a05e6a325 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -104,7 +104,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl return std::nullopt; auto data_files = current_metadata->getDataFiles(); if (!data_files.empty()) - return data_files[0]; + return data_files[0].file_path; return std::nullopt; } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp index e640e467c6d9..8a6ce493e203 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp @@ -158,7 +158,7 @@ struct DeltaLakeMetadataImpl struct DeltaLakeMetadata { NamesAndTypesList schema; - Strings data_files; + DataFileInfos data_files; DeltaLakePartitionColumns partition_columns; }; @@ -195,7 +195,7 @@ struct DeltaLakeMetadataImpl processMetadataFile(key, current_schema, current_partition_columns, result_files); } - return DeltaLakeMetadata{current_schema, Strings(result_files.begin(), result_files.end()), current_partition_columns}; + return DeltaLakeMetadata{current_schema, DataFileInfos(result_files.begin(), result_files.end()), current_partition_columns}; } /** diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index a3cf16cd6673..c46e91d9dcbe 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -35,7 +35,7 @@ class DeltaLakeMetadata final : public IDataLakeMetadata DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); - Strings getDataFiles() const override { return data_files; } + DataFileInfos getDataFiles() const override { return data_files; } NamesAndTypesList getTableSchema() const override { return schema; } @@ -67,12 +67,12 @@ class DeltaLakeMetadata final : public IDataLakeMetadata ContextPtr context) const override; private: - mutable Strings data_files; + mutable DataFileInfos data_files; NamesAndTypesList schema; DeltaLakePartitionColumns partition_columns; ObjectStoragePtr object_storage; - Strings getDataFiles(const ActionsDAG *) const { return data_files; } + DataFileInfos getDataFiles(const ActionsDAG *) const { return data_files; } }; } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp index e54bc89e3a39..838ec22b4547 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp @@ -35,7 +35,7 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &) return table_snapshot->update(); } -Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const +DataFileInfos DeltaLakeMetadataDeltaKernel::getDataFiles() const { throwNotImplemented("getDataFiles()"); } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h index cfc57b791040..3bee4a028414 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h @@ -33,7 +33,7 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata bool update(const ContextPtr & context) override; - Strings getDataFiles() const override; + DataFileInfos getDataFiles() const override; NamesAndTypesList getTableSchema() const override; diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp index 6398ad34a4b6..a1f97a00038a 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp @@ -40,7 +40,7 @@ namespace ErrorCodes * hoodie.parquet.max.file.size option. Once a single Parquet file is too large, Hudi creates a second file group. * Each file group is identified by File Id. */ -Strings HudiMetadata::getDataFilesImpl() const +DataFileInfos HudiMetadata::getDataFilesImpl() const { auto configuration_ptr = configuration.lock(); auto log = getLogger("HudiMetadata"); @@ -76,12 +76,12 @@ Strings HudiMetadata::getDataFilesImpl() const } } - Strings result; + DataFileInfos result; for (auto & [partition, partition_data] : files) { LOG_TRACE(log, "Adding {} data files from partition {}", partition, partition_data.size()); for (auto & [file_id, file_data] : partition_data) - result.push_back(std::move(file_data.key)); + result.push_back(DataFileInfo(std::move(file_data.key))); } return result; } @@ -91,7 +91,7 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv { } -Strings HudiMetadata::getDataFiles() const +DataFileInfos HudiMetadata::getDataFiles() const { if (data_files.empty()) data_files = getDataFilesImpl(); diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h index 2c23269b928a..589f3906d7f5 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h @@ -19,7 +19,7 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); - Strings getDataFiles() const override; + DataFileInfos getDataFiles() const override; NamesAndTypesList getTableSchema() const override { return {}; } @@ -49,9 +49,9 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext private: const ObjectStoragePtr object_storage; const ConfigurationObserverPtr configuration; - mutable Strings data_files; + mutable DataFileInfos data_files; - Strings getDataFilesImpl() const; + DataFileInfos getDataFilesImpl() const; }; } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp index df4f5ed3a45b..1e36e24c3a53 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp @@ -1,5 +1,8 @@ #include "IDataLakeMetadata.h" #include +#include + +#include namespace DB { @@ -11,7 +14,7 @@ class KeysIterator : public IObjectIterator { public: KeysIterator( - Strings && data_files_, + DataFileInfos && data_files_, ObjectStoragePtr object_storage_, IDataLakeMetadata::FileProgressCallback callback_) : data_files(data_files_) @@ -33,7 +36,7 @@ class KeysIterator : public IObjectIterator if (current_index >= data_files.size()) return nullptr; - auto key = data_files[current_index]; + auto key = data_files[current_index].file_path; if (callback) { @@ -43,12 +46,14 @@ class KeysIterator : public IObjectIterator callback(FileProgress(0, 1)); } - return std::make_shared(key, std::nullopt); + auto result = std::make_shared(key, std::nullopt); + result->setFileMetaInfo(data_files[current_index].file_meta_info); + return result; } } private: - Strings data_files; + DataFileInfos data_files; ObjectStoragePtr object_storage; std::atomic index = 0; IDataLakeMetadata::FileProgressCallback callback; @@ -57,7 +62,7 @@ class KeysIterator : public IObjectIterator } ObjectIterator IDataLakeMetadata::createKeysIterator( - Strings && data_files_, + DataFileInfos && data_files_, ObjectStoragePtr object_storage_, IDataLakeMetadata::FileProgressCallback callback_) const { @@ -73,4 +78,111 @@ DB::ReadFromFormatInfo IDataLakeMetadata::prepareReadingFromFormat( return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, context, supports_subset_of_columns); } +DataFileMetaInfo::DataFileMetaInfo(Poco::JSON::Object::Ptr file_info) +{ + if (!file_info) + return; + + auto log = getLogger("DataFileMetaInfo"); + + if (file_info->has("columns")) + { + auto columns = file_info->getArray("columns"); + for (size_t i = 0; i < columns->size(); ++i) + { + auto column = columns->getObject(static_cast(i)); + + std::string name; + if (column->has("name")) + name = column->get("name").toString(); + else + { + LOG_WARNING(log, "Can't read column name, ignored"); + continue; + } + + DB::DataFileMetaInfo::ColumnInfo column_info; + if (column->has("rows")) + column_info.rows_count = column->get("rows"); + if (column->has("nulls")) + column_info.nulls_count = column->get("nulls"); + if (column->has("range")) + { + Range range(""); + std::string r = column->get("range"); + try + { + range.deserialize(r); + column_info.hyperrectangle = std::move(range); + } + catch (const Exception & e) + { + LOG_WARNING(log, "Can't read range for column {}, range '{}' ignored, error: {}", name, r, e.what()); + } + } + + columns_info[name] = column_info; + } + } +} + +DataFileMetaInfo::DataFileMetaInfo( + const IcebergSchemaProcessor & schema_processor, + Int32 schema_id, + const std::unordered_map & columns_info_) +{ + std::vector column_ids; + for (const auto & column : columns_info_) + column_ids.push_back(column.first); + auto name_and_types = schema_processor.tryGetFieldsCharacteristics(schema_id, column_ids); + std::unordered_map name_by_index; + for (const auto & name_and_type : name_and_types) + { + const auto name = name_and_type.getNameInStorage(); + auto index = schema_processor.tryGetColumnIDByName(schema_id, name); + if (index.has_value()) + name_by_index[index.value()] = name; + } + + for (const auto & column : columns_info_) + { + auto i_name = name_by_index.find(column.first); + if (i_name != name_by_index.end()) + { + if (column.second.nulls_count.has_value() && column.second.nulls_count.value() >= 0) + columns_info[i_name->second] = {column.second.rows_count, column.second.nulls_count, column.second.hyperrectangle}; + else + columns_info[i_name->second] = {column.second.rows_count, std::nullopt, column.second.hyperrectangle}; + } + } +} + +Poco::JSON::Object::Ptr DataFileMetaInfo::toJson() const +{ + Poco::JSON::Object::Ptr file_info = new Poco::JSON::Object(); + + if (!columns_info.empty()) + { + Poco::JSON::Array::Ptr columns = new Poco::JSON::Array(); + + for (const auto & column : columns_info) + { + Poco::JSON::Object::Ptr column_info = new Poco::JSON::Object(); + column_info->set("name", column.first); + if (column.second.rows_count.has_value()) + column_info->set("rows", column.second.rows_count.value()); + if (column.second.nulls_count.has_value()) + column_info->set("nulls", column.second.nulls_count.value()); + if (column.second.hyperrectangle.has_value()) + column_info->set("range", column.second.hyperrectangle.value().serialize()); + + columns->add(column_info); + } + + file_info->set("columns", columns); + } + + return file_info; +} + } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index b0c049ea9b7f..c4228f57cc98 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -1,10 +1,21 @@ #pragma once #include #include +#include #include -#include "Interpreters/ActionsDAG.h" +#include #include #include +#include + +#include + +namespace Iceberg +{ + +struct ColumnInfo; + +}; namespace DB { @@ -12,7 +23,55 @@ namespace DB namespace ErrorCodes { extern const int UNSUPPORTED_METHOD; -} +}; + +class DataFileMetaInfo +{ +public: + DataFileMetaInfo() = default; + + // Extract metadata from Iceberg structure + explicit DataFileMetaInfo( + const IcebergSchemaProcessor & schema_processor, + Int32 schema_id, + const std::unordered_map & columns_info_); + + // Deserialize from json in distributed requests + explicit DataFileMetaInfo(const Poco::JSON::Object::Ptr file_info); + + // Serialize to json in distributed requests + Poco::JSON::Object::Ptr toJson() const; + + struct ColumnInfo + { + std::optional rows_count; + std::optional nulls_count; + std::optional hyperrectangle; + }; + + std::unordered_map columns_info; +}; + +using DataFileMetaInfoPtr = std::shared_ptr; + +struct DataFileInfo +{ + std::string file_path; + std::optional file_meta_info; + + explicit DataFileInfo(const std::string & file_path_) + : file_path(file_path_) {} + + explicit DataFileInfo(std::string && file_path_) + : file_path(std::move(file_path_)) {} + + bool operator==(const DataFileInfo & rhs) const + { + return file_path == rhs.file_path; + } +}; + +using DataFileInfos = std::vector; class IDataLakeMetadata : boost::noncopyable { @@ -23,7 +82,7 @@ class IDataLakeMetadata : boost::noncopyable /// List all data files. /// For better parallelization, iterate() method should be used. - virtual Strings getDataFiles() const = 0; + virtual DataFileInfos getDataFiles() const = 0; /// Return iterator to `data files`. using FileProgressCallback = std::function; virtual ObjectIterator iterate( @@ -65,7 +124,7 @@ class IDataLakeMetadata : boost::noncopyable protected: ObjectIterator createKeysIterator( - Strings && data_files_, + DataFileInfos && data_files_, ObjectStoragePtr object_storage_, IDataLakeMetadata::FileProgressCallback callback_) const; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 2af04ec743f0..19c6ac74be91 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -60,6 +60,7 @@ extern const SettingsInt64 iceberg_timestamp_ms; extern const SettingsInt64 iceberg_snapshot_id; extern const SettingsBool use_iceberg_metadata_files_cache; extern const SettingsBool use_iceberg_partition_pruning; +extern const SettingsBool allow_experimental_iceberg_read_optimization; } @@ -1083,9 +1084,10 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const return create_fn(); } -Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, ContextPtr local_context) const +DataFileInfos IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, ContextPtr local_context) const { bool use_partition_pruning = filter_dag && local_context->getSettingsRef()[Setting::use_iceberg_partition_pruning]; + bool use_iceberg_read_optimization = local_context->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization]; { std::lock_guard cache_lock(cached_unprunned_files_for_last_processed_snapshot_mutex); @@ -1093,7 +1095,7 @@ Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, Context return cached_unprunned_files_for_last_processed_snapshot.value(); } - Strings data_files; + DataFileInfos data_files; { SharedLockGuard lock(mutex); @@ -1115,7 +1117,14 @@ Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, Context if (!pruner.canBePruned(manifest_file_entry)) { if (std::holds_alternative(manifest_file_entry.file)) - data_files.push_back(std::get(manifest_file_entry.file).file_name); + { + data_files.push_back(DataFileInfo(std::get(manifest_file_entry.file).file_name)); + if (use_iceberg_read_optimization) + data_files.back().file_meta_info = std::make_shared( + schema_processor, + relevant_snapshot_schema_id, + manifest_file_entry.columns_infos); + } } } } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index c3ada72354e7..670e8d6638e9 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -48,7 +48,7 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext /// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files. /// All subsequent calls when the same data snapshot is relevant will return saved list of files (because it cannot be changed /// without changing metadata file). Drops on every snapshot update. - Strings getDataFiles() const override { return getDataFilesImpl(nullptr, getContext()); } + DataFileInfos getDataFiles() const override { return getDataFilesImpl(nullptr, getContext()); } /// Get table schema parsed from metadata. NamesAndTypesList getTableSchema() const override; @@ -115,11 +115,11 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext Int64 relevant_snapshot_id TSA_GUARDED_BY(mutex) {-1}; const String table_location; - mutable std::optional cached_unprunned_files_for_last_processed_snapshot TSA_GUARDED_BY(cached_unprunned_files_for_last_processed_snapshot_mutex); + mutable std::optional cached_unprunned_files_for_last_processed_snapshot TSA_GUARDED_BY(cached_unprunned_files_for_last_processed_snapshot_mutex); mutable std::mutex cached_unprunned_files_for_last_processed_snapshot_mutex; void updateState(const ContextPtr & local_context, Poco::JSON::Object::Ptr metadata_object, bool metadata_file_changed) TSA_REQUIRES(mutex); - Strings getDataFilesImpl(const ActionsDAG * filter_dag, ContextPtr local_context) const; + DataFileInfos getDataFilesImpl(const ActionsDAG * filter_dag, ContextPtr local_context) const; void updateSnapshot(ContextPtr local_context, Poco::JSON::Object::Ptr metadata_object) TSA_REQUIRES(mutex); ManifestFileCacheKeys getManifestList(ContextPtr local_context, const String & filename) const; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 66c03d1c43b3..3e124b3c6f8a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -66,6 +66,7 @@ namespace Setting extern const SettingsBool cluster_function_process_archive_on_multiple_nodes; extern const SettingsBool table_engine_read_through_distributed_cache; extern const SettingsBool use_object_storage_list_objects_cache; + extern const SettingsBool allow_experimental_iceberg_read_optimization; } namespace ErrorCodes @@ -277,9 +278,10 @@ void StorageObjectStorageSource::lazyInitialize() Chunk StorageObjectStorageSource::generate() { - lazyInitialize(); + bool use_iceberg_read_optimization = read_context->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization]; + while (true) { if (isCancelled() || !reader) @@ -334,6 +336,16 @@ Chunk StorageObjectStorageSource::generate() .etag = &(object_info->metadata->etag)}, read_context); + if (use_iceberg_read_optimization) + { + for (const auto & constant_column : reader.constant_columns_with_values) + { + chunk.addColumn(constant_column.first, + constant_column.second.name_and_type.type->createColumnConst( + chunk.getNumRows(), constant_column.second.value)->convertToFullColumnIfConst()); + } + } + if (chunk_size && chunk.hasColumns()) { const auto * object_with_partition_columns_info = dynamic_cast(object_info.get()); @@ -482,9 +494,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (!object_info) return {}; - if (object_info->getCommand().is_parsed()) + if (object_info->getCommand().isParsed()) { - auto retry_after_us = object_info->getCommand().get_retry_after_us(); + auto retry_after_us = object_info->getCommand().getRetryAfterUs(); if (retry_after_us.has_value()) { not_a_path = true; @@ -506,9 +518,17 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade QueryPipelineBuilder builder; std::shared_ptr source; std::unique_ptr read_buf; + std::optional rows_count_from_metadata; auto try_get_num_rows_from_cache = [&]() -> std::optional { + if (rows_count_from_metadata.has_value()) + { + /// Must be non negative here + size_t value = rows_count_from_metadata.value(); + return value; + } + if (!schema_cache) return std::nullopt; @@ -527,6 +547,140 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade return schema_cache->tryGetNumRows(cache_key, get_last_mod_time); }; + /// List of columns with constant value in current file, and values + std::map constant_columns_with_values; + std::unordered_set constant_columns; + + NamesAndTypesList requested_columns_copy = read_from_format_info.requested_columns; + + std::unordered_map> requested_columns_list; + { + size_t column_index = 0; + for (const auto & column : requested_columns_copy) + requested_columns_list[column.getNameInStorage()] = std::make_pair(column_index++, column); + } + + if (context_->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization]) + { + auto file_meta_data = object_info->getFileMetaInfo(); + if (file_meta_data.has_value()) + { + bool is_all_rows_count_equals = true; + for (const auto & column : file_meta_data.value()->columns_info) + { + if (is_all_rows_count_equals && column.second.rows_count.has_value()) + { + if (rows_count_from_metadata.has_value()) + { + if (column.second.rows_count.value() != rows_count_from_metadata.value()) + { + LOG_WARNING(log, "Inconsistent rows count for file {} in metadats, ignored", object_info->getPath()); + is_all_rows_count_equals = false; + rows_count_from_metadata = std::nullopt; + } + } + else if (column.second.rows_count.value() < 0) + { + LOG_WARNING(log, "Negative rows count for file {} in metadats, ignored", object_info->getPath()); + is_all_rows_count_equals = false; + rows_count_from_metadata = std::nullopt; + } + else + rows_count_from_metadata = column.second.rows_count; + } + if (column.second.hyperrectangle.has_value()) + { + if (column.second.hyperrectangle.value().isPoint() && + (!column.second.nulls_count.has_value() || !column.second.nulls_count.value())) + { + auto column_name = column.first; + + auto i_column = requested_columns_list.find(column_name); + if (i_column == requested_columns_list.end()) + continue; + + /// isPoint() method checks that left==right + constant_columns_with_values[i_column->second.first] = + ConstColumnWithValue{ + i_column->second.second, + column.second.hyperrectangle.value().left + }; + constant_columns.insert(column_name); + + LOG_DEBUG(log, "In file {} constant column '{}' id {} type '{}' with value '{}'", + object_info->getPath(), + column_name, + i_column->second.first, + i_column->second.second.type, + column.second.hyperrectangle.value().left.dump()); + } + else if (column.second.rows_count.has_value() && column.second.nulls_count.has_value() + && column.second.rows_count.value() == column.second.nulls_count.value()) + { + auto column_name = column.first; + + auto i_column = requested_columns_list.find(column_name); + if (i_column == requested_columns_list.end()) + continue; + + if (!i_column->second.second.type->isNullable()) + continue; + + constant_columns_with_values[i_column->second.first] = + ConstColumnWithValue{ + i_column->second.second, + Field() + }; + constant_columns.insert(column_name); + + LOG_DEBUG(log, "In file {} constant column '{}' id {} type '{}' with value 'NULL'", + object_info->getPath(), + column_name, + i_column->second.first, + i_column->second.second.type); + } + } + } +/* + for (const auto & column : requested_columns_list) + { + const auto & column_name = column.first; + + if (file_meta_data.value()->columns_info.contains(column_name)) + continue; + + if (!column.second.second.type->isNullable()) + continue; + + /// Column is nullable and absent in file + constant_columns_with_values[column.second.first] = + ConstColumnWithValue{ + column.second.second, + Field() + }; + constant_columns.insert(column_name); + + LOG_DEBUG(log, "In file {} constant column '{}' id {} type '{}' with value 'NULL'", + object_info->getPath(), + column_name, + column.second.first, + column.second.second.type); + } +*/ + } + + if (!constant_columns.empty()) + { + size_t original_columns = requested_columns_copy.size(); + requested_columns_copy = requested_columns_copy.eraseNames(constant_columns); + if (requested_columns_copy.size() + constant_columns.size() != original_columns) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't remove constant columns for file {} correct, fallback to read. Founded constant columns: [{}]", + object_info->getPath(), constant_columns); + if (requested_columns_copy.empty()) + need_only_count = true; + } + } + std::optional num_rows_from_cache = need_only_count && context_->getSettingsRef()[Setting::use_cache_for_count_from_files] ? try_get_num_rows_from_cache() : std::nullopt; @@ -567,7 +721,6 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade initial_header = sample_header; } - auto input_format = FormatFactory::instance().getInput( configuration->getFormat(), *read_buf, @@ -603,7 +756,6 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade }); } - if (read_from_format_info.columns_description.hasDefaults()) { builder.addSimpleTransform( @@ -620,7 +772,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade /// from chunk read by IInputFormat. builder.addSimpleTransform([&](const Block & header) { - return std::make_shared(header, read_from_format_info.requested_columns); + return std::make_shared(header, requested_columns_copy); }); auto pipeline = std::make_unique(QueryPipelineBuilder::getPipeline(std::move(builder))); @@ -629,7 +781,12 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade ProfileEvents::increment(ProfileEvents::EngineFileLikeReadFiles); return ReaderHolder( - object_info, std::move(read_buf), std::move(source), std::move(pipeline), std::move(current_reader)); + object_info, + std::move(read_buf), + std::move(source), + std::move(pipeline), + std::move(current_reader), + std::move(constant_columns_with_values)); } std::future StorageObjectStorageSource::createReaderAsync() @@ -968,6 +1125,24 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne return object_infos[index++]; } +StorageObjectStorageSource::KeysIterator::KeysIterator( + const DataFileInfos & file_infos_, + ObjectStoragePtr object_storage_, + const NamesAndTypesList & virtual_columns_, + ObjectInfos * read_keys_, + bool ignore_non_existent_files_, + bool skip_object_metadata_, + std::function file_progress_callback_) + : object_storage(object_storage_) + , virtual_columns(virtual_columns_) + , file_progress_callback(file_progress_callback_) + , file_infos(file_infos_) + , ignore_non_existent_files(ignore_non_existent_files_) + , skip_object_metadata(skip_object_metadata_) +{ + fillKeys(read_keys_); +} + StorageObjectStorageSource::KeysIterator::KeysIterator( const Strings & keys_, ObjectStoragePtr object_storage_, @@ -979,16 +1154,21 @@ StorageObjectStorageSource::KeysIterator::KeysIterator( : object_storage(object_storage_) , virtual_columns(virtual_columns_) , file_progress_callback(file_progress_callback_) - , keys(keys_) + , file_infos(keys_.begin(), keys_.end()) , ignore_non_existent_files(ignore_non_existent_files_) , skip_object_metadata(skip_object_metadata_) +{ + fillKeys(read_keys_); +} + +void StorageObjectStorageSource::KeysIterator::fillKeys(ObjectInfos * read_keys_) { if (read_keys_) { /// TODO: should we add metadata if we anyway fetch it if file_progress_callback is passed? - for (auto && key : keys) + for (auto && file_info : file_infos) { - auto object_info = std::make_shared(key); + auto object_info = std::make_shared(file_info.file_path); read_keys_->emplace_back(object_info); } } @@ -999,29 +1179,29 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::KeysIterator::ne while (true) { size_t current_index = index.fetch_add(1, std::memory_order_relaxed); - if (current_index >= keys.size()) + if (current_index >= file_infos.size()) return nullptr; - auto key = keys[current_index]; + auto file_info = file_infos[current_index]; ObjectMetadata object_metadata{}; if (!skip_object_metadata) { if (ignore_non_existent_files) { - auto metadata = object_storage->tryGetObjectMetadata(key); + auto metadata = object_storage->tryGetObjectMetadata(file_info.file_path); if (!metadata) continue; object_metadata = *metadata; } else - object_metadata = object_storage->getObjectMetadata(key); + object_metadata = object_storage->getObjectMetadata(file_info.file_path); } if (file_progress_callback) file_progress_callback(FileProgress(0, object_metadata.size_bytes)); - return std::make_shared(key, object_metadata); + return std::make_shared(file_info, object_metadata); } } @@ -1030,12 +1210,14 @@ StorageObjectStorageSource::ReaderHolder::ReaderHolder( std::unique_ptr read_buf_, std::shared_ptr source_, std::unique_ptr pipeline_, - std::unique_ptr reader_) + std::unique_ptr reader_, + std::map && constant_columns_with_values_) : object_info(std::move(object_info_)) , read_buf(std::move(read_buf_)) , source(std::move(source_)) , pipeline(std::move(pipeline_)) , reader(std::move(reader_)) + , constant_columns_with_values(std::move(constant_columns_with_values_)) { } @@ -1049,6 +1231,7 @@ StorageObjectStorageSource::ReaderHolder::operator=(ReaderHolder && other) noexc source = std::move(other.source); read_buf = std::move(other.read_buf); object_info = std::move(other.object_info); + constant_columns_with_values = std::move(other.constant_columns_with_values); return *this; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index 5e4345f3ad37..c8284ab7d997 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -96,6 +96,12 @@ class StorageObjectStorageSource : public SourceWithKeyCondition size_t total_rows_in_file = 0; LoggerPtr log = getLogger("StorageObjectStorageSource"); + struct ConstColumnWithValue + { + NameAndTypePair name_and_type; + Field value; + }; + struct ReaderHolder : private boost::noncopyable { public: @@ -104,7 +110,8 @@ class StorageObjectStorageSource : public SourceWithKeyCondition std::unique_ptr read_buf_, std::shared_ptr source_, std::unique_ptr pipeline_, - std::unique_ptr reader_); + std::unique_ptr reader_, + std::map && constant_columns_with_values_); ReaderHolder() = default; ReaderHolder(ReaderHolder && other) noexcept { *this = std::move(other); } @@ -123,6 +130,9 @@ class StorageObjectStorageSource : public SourceWithKeyCondition std::shared_ptr source; std::unique_ptr pipeline; std::unique_ptr reader; + + public: + std::map constant_columns_with_values; }; ReaderHolder reader; @@ -240,6 +250,15 @@ class StorageObjectStorageSource::GlobIterator : public IObjectIterator, WithCon class StorageObjectStorageSource::KeysIterator : public IObjectIterator { public: + KeysIterator( + const DataFileInfos & file_infos_, + ObjectStoragePtr object_storage_, + const NamesAndTypesList & virtual_columns_, + ObjectInfos * read_keys_, + bool ignore_non_existent_files_, + bool skip_object_metadata_, + std::function file_progress_callback = {}); + KeysIterator( const Strings & keys_, ObjectStoragePtr object_storage_, @@ -253,13 +272,15 @@ class StorageObjectStorageSource::KeysIterator : public IObjectIterator ObjectInfoPtr next(size_t processor) override; - size_t estimatedKeysCount() override { return keys.size(); } + size_t estimatedKeysCount() override { return file_infos.size(); } private: + void fillKeys(ObjectInfos * read_keys_); + const ObjectStoragePtr object_storage; const NamesAndTypesList virtual_columns; const std::function file_progress_callback; - const std::vector keys; + const DataFileInfos file_infos; std::atomic index = 0; bool ignore_non_existent_files; bool skip_object_metadata; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 78652fd803e3..a08b7f8c91b1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -170,6 +170,15 @@ std::optional StorageObjectStorageStableTaskDistributor::getMatchingFile file_path = object_info->getPath(); } + auto file_meta_info = object_info->getFileMetaInfo(); + if (file_meta_info.has_value()) + { + RelativePathWithMetadata::CommandInTaskResponse response; + response.setFilePath(file_path); + response.setFileMetaInfo(file_meta_info.value()); + file_path = response.toString(); + } + size_t file_replica_idx = getReplicaForFile(file_path); if (file_replica_idx == number_of_current_replica) { @@ -242,8 +251,8 @@ std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocess /// All unprocessed files owned by alive replicas with recenlty activity /// Need to retry after (oldest_activity - activity_limit) microseconds RelativePathWithMetadata::CommandInTaskResponse response; - response.set_retry_after_us(oldest_activity - activity_limit); - return response.to_string(); + response.setRetryAfterUs(oldest_activity - activity_limit); + return response.toString(); } return std::nullopt; diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 569ca8faa8a4..45fa8e445006 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3071,6 +3071,7 @@ def check_validity_and_get_prunned_files(select_expression): == 1 ) + @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) def test_explicit_metadata_file(started_cluster, storage_type): instance = started_cluster.instances["node1"] @@ -3115,6 +3116,7 @@ def test_explicit_metadata_file(started_cluster, storage_type): with pytest.raises(Exception): create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="../metadata/v11.metadata.json") + @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) @pytest.mark.parametrize("run_on_cluster", [False, True]) def test_minmax_pruning_with_null(started_cluster, storage_type, run_on_cluster): @@ -3341,6 +3343,7 @@ def execute_spark_query(query: str): run_on_cluster=True, ) + @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) def test_minmax_pruning_for_arrays_and_maps_subfields_disabled(started_cluster, storage_type): instance = started_cluster.instances["node1"] @@ -3410,6 +3413,7 @@ def execute_spark_query(query: str): instance.query(f"SELECT * FROM {table_select_expression} ORDER BY ALL") + @pytest.mark.parametrize("storage_type", ["s3"]) def test_system_tables_partition_sorting_keys(started_cluster, storage_type): instance = started_cluster.instances["node1"] @@ -3455,6 +3459,7 @@ def test_system_tables_partition_sorting_keys(started_cluster, storage_type): assert res == '"bucket(16, id), day(ts)","id desc, hour(ts) asc"' + @pytest.mark.parametrize("storage_type", ["local", "s3"]) def test_compressed_metadata(started_cluster, storage_type): instance = started_cluster.instances["node1"] @@ -3492,3 +3497,203 @@ def test_compressed_metadata(started_cluster, storage_type): create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="") assert instance.query(f"SELECT * FROM {TABLE_NAME} WHERE not ignore(*)") == "1\tAlice\n2\tBob\n" + + +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_read_constant_columns_optimization(started_cluster, storage_type, run_on_cluster): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_read_constant_columns_optimization_" + storage_type + "_" + get_uuid_str() + + def execute_spark_query(query: str): + return execute_spark_query_general( + spark, + started_cluster, + storage_type, + TABLE_NAME, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + date DATE, + date2 DATE, + name VARCHAR(50), + number BIGINT + ) + USING iceberg + PARTITIONED BY (identity(tag), years(date)) + OPTIONS('format-version'='2') + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, DATE '2024-01-20', DATE '2024-01-20', 'vasya', 5), + (1, DATE '2024-01-20', DATE '2024-01-20', 'vasilisa', 5), + (1, DATE '2025-01-20', DATE '2025-01-20', 'vasya', 5), + (1, DATE '2025-01-20', DATE '2025-01-20', 'vasya', 5), + (2, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa', 5), + (2, DATE '2025-01-21', DATE '2025-01-20', 'vasilisa', 5) + """ + ) + + execute_spark_query( + f""" + ALTER TABLE {TABLE_NAME} ALTER COLUMN number FIRST; + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (5, 3, DATE '2025-01-20', DATE '2024-01-20', 'vasilisa'), + (5, 3, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa') + """ + ) + + execute_spark_query( + f""" + ALTER TABLE {TABLE_NAME} + ADD COLUMNS ( + name2 string + ); + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (5, 4, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg'), + (5, 4, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa', 'iceberg'), + (5, 5, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg'), + (5, 5, DATE '2025-01-20', DATE '2024-01-20', 'vasilisa', 'icebreaker'), + (5, 6, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg'), + (5, 6, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg') + """ + ) + + # Totally must be 7 files + # Partitioned column 'tag' is constant in each file + # Column 'date' is constant in 6 files, has different values in (2-2025) + # Column 'date2' is constant in 4 files (1-2024, 2-2025, 5-2025, 6-2025) + # Column 'name' is constant in 3 files (1-2025, 2-2025, 6-2025) + # Column 'number' is globally constant + # Column 'name2' is present only in 3 files (4-2025, 5-2025, 6-2025), constant in two (4-2025, 6-2025) + # Files 1-2025 and 6-2025 have only constant columns + + creation_expression = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster + ) + + # Warm up metadata cache + for replica in started_cluster.instances.values(): + replica.query(f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0") + + all_data_expected_query_id = str(uuid.uuid4()) + all_data_expected = instance.query( + f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=all_data_expected_query_id, + ) + const_only_expected_query_id = str(uuid.uuid4()) + const_only_expected = instance.query( + f"SELECT tag, number FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=const_only_expected_query_id, + ) + const_partial_expected_query_id = str(uuid.uuid4()) + const_partial_expected = instance.query( + f"SELECT tag, date2, number, name FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=const_partial_expected_query_id, + ) + const_partial2_expected_query_id = str(uuid.uuid4()) + const_partial2_expected = instance.query( + f"SELECT tag, date2, number, name2 FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=const_partial2_expected_query_id, + ) + count_expected_query_id = str(uuid.uuid4()) + count_expected = instance.query( + f"SELECT count(),tag FROM {creation_expression} GROUP BY ALL ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=count_expected_query_id, + ) + + all_data_query_id = str(uuid.uuid4()) + all_data_optimized = instance.query( + f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=all_data_query_id, + ) + const_only_query_id = str(uuid.uuid4()) + const_only_optimized = instance.query( + f"SELECT tag, number FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=const_only_query_id, + ) + const_partial_query_id = str(uuid.uuid4()) + const_partial_optimized = instance.query( + f"SELECT tag, date2, number, name FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=const_partial_query_id, + ) + const_partial2_query_id = str(uuid.uuid4()) + const_partial2_optimized = instance.query( + f"SELECT tag, date2, number, name2 FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=const_partial2_query_id, + ) + count_query_id = str(uuid.uuid4()) + count_optimized = instance.query( + f"SELECT count(),tag FROM {creation_expression} GROUP BY ALL ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=count_query_id, + ) + + assert all_data_expected == all_data_optimized + assert const_only_expected == const_only_optimized + assert const_partial_expected == const_partial_optimized + assert const_partial2_expected == const_partial2_optimized + assert count_expected == count_optimized + + for replica in started_cluster.instances.values(): + replica.query("SYSTEM FLUSH LOGS") + + def get_events(query_id, event): + res = instance.query( + f""" + SELECT + sum(tupleElement(arrayJoin(ProfileEvents),2)) as value + FROM + clusterAllReplicas('cluster_simple', system.query_log) + WHERE + type='QueryFinish' + AND tupleElement(arrayJoin(ProfileEvents),1)='{event}' + AND initial_query_id='{query_id}' + GROUP BY ALL + FORMAT CSV + """) + return int(res) + + event = "S3GetObject" if storage_type == "s3" else "AzureGetObject" + + events_all_data_expected = get_events(all_data_expected_query_id, event) + events_const_only_expected = get_events(const_only_expected_query_id, event) + events_const_partial_expected = get_events(const_partial_expected_query_id, event) + events_const_partial2_expected = get_events(const_partial2_expected_query_id, event) + events_count_expected = get_events(count_expected_query_id, event) + + # Without optimization clickhouse reads all 7 files + assert events_all_data_expected == 7 + assert events_const_only_expected == 7 + assert events_const_partial_expected == 7 + assert events_const_partial2_expected == 7 + assert events_count_expected == 7 + + events_all_data_optimized = get_events(all_data_query_id, event) # 1-2025, 6-2025 must not be read + events_const_only_optimized = get_events(const_only_query_id, event) # All must not be read + events_const_partial_optimized = get_events(const_partial_query_id, event) # 1-2025, 6-2025 and 2-2025 must not be read + events_const_partial2_optimized = get_events(const_partial2_query_id, event) # 6-2025 must not be read, 1-2024, 1-2025, 2-2025 readed because of nulls + events_count_optimized = get_events(count_query_id, event) # All must not be read + + assert events_all_data_optimized == 6 # 5 + assert events_const_only_optimized == 0 + assert events_const_partial_optimized == 4 + assert events_const_partial2_optimized == 6 + assert events_count_optimized == 0