Skip to content
3 changes: 2 additions & 1 deletion src/Core/ProtocolDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54

static constexpr auto DBMS_CLUSTER_INITIAL_PROCESSING_PROTOCOL_VERSION = 1;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_METADATA = 2;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 2;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 3;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 3;

static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4;
Expand Down
7 changes: 7 additions & 0 deletions src/Core/Range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ bool Range::isInfinite() const
return left.isNegativeInfinity() && right.isPositiveInfinity();
}

/// [x, x]
bool Range::isPoint() const
{
return fullBounded() && left_included && right_included && equals(left, right)
&& !left.isNegativeInfinity() && !left.isPositiveInfinity();
}

bool Range::intersectsRange(const Range & r) const
{
/// r to the left of me.
Expand Down
2 changes: 2 additions & 0 deletions src/Core/Range.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ struct Range

bool isBlank() const;

bool isPoint() const;

bool intersectsRange(const Range & r) const;

bool containsRange(const Range & r) const;
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7062,6 +7062,9 @@ DECLARE(Bool, allow_experimental_ytsaurus_dictionary_source, false, R"(
)", EXPERIMENTAL) \
DECLARE(Bool, distributed_plan_force_shuffle_aggregation, false, R"(
Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distributed query plan.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
Allow Iceberg read optimization based on Iceberg metadata.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
Allow retries in cluster request, when one node goes offline
Expand Down
7 changes: 4 additions & 3 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// Note: please check if the key already exists to prevent duplicate entries.
addSettingsChanges(settings_changes_history, "25.8.9.2000",
{
{"allow_experimental_iceberg_read_optimization", true, true, "New setting."},
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."},
{"object_storage_cluster", "", "", "Antalya: New setting"},
{"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
{"allow_retries_in_cluster_requests", false, false, "Antalya: New setting"},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"allow_retries_in_cluster_requests", false, false, "New setting"},
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down
8 changes: 8 additions & 0 deletions src/Disks/ObjectStorages/IObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Interpreters/Context.h>
#include <Common/Exception.h>
#include <Common/ObjectStorageKeyGenerator.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>

#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
Expand Down Expand Up @@ -101,6 +102,13 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings
return write_settings;
}

RelativePathWithMetadata::RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_)
: metadata(std::move(metadata_))
{
relative_path = info.file_path;
file_meta_info = info.file_meta_info;
}

std::string RelativePathWithMetadata::getPathOrPathToArchiveIfArchive() const
{
if (isArchive())
Expand Down
23 changes: 18 additions & 5 deletions src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ struct ObjectMetadata
ObjectAttributes attributes;
};


struct DataFileInfo;
class DataFileMetaInfo;
using DataFileMetaInfoPtr = std::shared_ptr<DataFileMetaInfo>;

struct DataLakeObjectMetadata;

struct RelativePathWithMetadata
Expand Down Expand Up @@ -134,19 +139,24 @@ struct RelativePathWithMetadata
std::optional<ObjectMetadata> metadata;
/// Delta lake related object metadata.
std::optional<DataLakeObjectMetadata> data_lake_metadata;
/// Information about columns
std::optional<DataFileMetaInfoPtr> file_meta_info;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why optional shared_ptr? What would set null-value mean in this case and how is it different from unset value?

/// Retry request after short pause
CommandInTaskResponse command;

RelativePathWithMetadata() = default;

explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
: metadata(std::move(metadata_))
, command(task_string)
explicit RelativePathWithMetadata(String command_or_path, std::optional<ObjectMetadata> metadata_ = std::nullopt)
: relative_path(std::move(command_or_path))
, metadata(std::move(metadata_))
, command(relative_path)
{
if (!command.is_parsed())
relative_path = task_string;
if (command.is_parsed())
relative_path = "";
}

explicit RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_ = std::nullopt);

RelativePathWithMetadata(const RelativePathWithMetadata & other) = default;

virtual ~RelativePathWithMetadata() = default;
Expand All @@ -158,6 +168,9 @@ struct RelativePathWithMetadata
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
virtual std::string getPathOrPathToArchiveIfArchive() const;

void setFileMetaInfo(std::optional<DataFileMetaInfoPtr> file_meta_info_ ) { file_meta_info = file_meta_info_; }
std::optional<DataFileMetaInfoPtr> getFileMetaInfo() const { return file_meta_info; }

const CommandInTaskResponse & getCommand() const { return command; }
};

Expand Down
19 changes: 19 additions & 0 deletions src/Interpreters/ClusterFunctionReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o
if (object->data_lake_metadata.has_value())
data_lake_metadata = object->data_lake_metadata.value();

file_meta_info = object->file_meta_info;

const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes];
path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath();
}
Expand All @@ -45,6 +47,7 @@ ObjectInfoPtr ClusterFunctionReadTaskResponse::getObjectInfo() const

auto object = std::make_shared<ObjectInfo>(path);
object->data_lake_metadata = data_lake_metadata;
object->file_meta_info = file_meta_info;
return object;
}

Expand All @@ -61,6 +64,14 @@ void ClusterFunctionReadTaskResponse::serialize(WriteBuffer & out, size_t protoc
else
ActionsDAG().serialize(out, registry);
}

if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
{
if (file_meta_info.has_value())
file_meta_info.value()->serialize(out);
else
DataFileMetaInfo().serialize(out);
}
}

void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in)
Expand All @@ -87,6 +98,14 @@ void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in)
data_lake_metadata.transform = std::move(transform);
}
}

if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
{
auto info = std::make_shared<DataFileMetaInfo>(DataFileMetaInfo::deserialize(in));

if (!path.empty() && !info->empty())
file_meta_info = info;
}
}

}
2 changes: 2 additions & 0 deletions src/Interpreters/ClusterFunctionReadTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ struct ClusterFunctionReadTaskResponse
String path;
/// Object metadata path, in case of data lake object.
DataLakeObjectMetadata data_lake_metadata;
/// File's columns info
std::optional<DataFileMetaInfoPtr> file_meta_info;

/// Convert received response into ObjectInfo.
ObjectInfoPtr getObjectInfo() const;
Expand Down
7 changes: 6 additions & 1 deletion src/Processors/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ void Chunk::addColumn(ColumnPtr column)

void Chunk::addColumn(size_t position, ColumnPtr column)
{
if (position >= columns.size())
if (position == columns.size())
{
addColumn(column);
return;
}
if (position > columns.size())
throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND,
"Position {} out of bound in Chunk::addColumn(), max position = {}",
position, !columns.empty() ? columns.size() - 1 : 0);
Expand Down
9 changes: 7 additions & 2 deletions src/Processors/Sources/ConstChunkGenerator.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <Processors/ISource.h>
#include <Processors/Formats/IInputFormat.h>


namespace DB
Expand All @@ -13,7 +14,7 @@ class ConstChunkGenerator : public ISource
public:
ConstChunkGenerator(SharedHeader header, size_t total_num_rows, size_t max_block_size_)
: ISource(std::move(header))
, remaining_rows(total_num_rows), max_block_size(max_block_size_)
, generated_rows(0), remaining_rows(total_num_rows), max_block_size(max_block_size_)
{
}

Expand All @@ -27,10 +28,14 @@ class ConstChunkGenerator : public ISource

size_t num_rows = std::min(max_block_size, remaining_rows);
remaining_rows -= num_rows;
return cloneConstWithDefault(Chunk{getPort().getHeader().getColumns(), 0}, num_rows);
auto chunk = cloneConstWithDefault(Chunk{getPort().getHeader().getColumns(), 0}, num_rows);
chunk.getChunkInfos().add(std::make_shared<ChunkInfoRowNumOffset>(generated_rows));
generated_rows += num_rows;
return chunk;
}

private:
size_t generated_rows;
size_t remaining_rows;
size_t max_block_size;
};
Expand Down
114 changes: 114 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Core/Field.h>

namespace DB
{

namespace ErrorCodes
{
extern const int INCORRECT_DATA;
};

namespace
{

Expand Down Expand Up @@ -87,4 +96,109 @@ ReadFromFormatInfo IDataLakeMetadata::prepareReadingFromFormat(
return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, context, supports_subset_of_columns, supports_tuple_elements);
}

DataFileMetaInfo::DataFileMetaInfo(
const Iceberg::IcebergSchemaProcessor & schema_processor,
Int32 schema_id,
const std::unordered_map<Int32, Iceberg::ColumnInfo> & columns_info_)
{

std::vector<Int32> column_ids;
for (const auto & column : columns_info_)
column_ids.push_back(column.first);

auto name_and_types = schema_processor.tryGetFieldsCharacteristics(schema_id, column_ids);
std::unordered_map<Int32, std::string> name_by_index;
for (const auto & name_and_type : name_and_types)
{
const auto name = name_and_type.getNameInStorage();
auto index = schema_processor.tryGetColumnIDByName(schema_id, name);
if (index.has_value())
name_by_index[index.value()] = name;
}

for (const auto & column : columns_info_)
{
auto i_name = name_by_index.find(column.first);
if (i_name != name_by_index.end())
{
columns_info[i_name->second] = {column.second.rows_count, column.second.nulls_count, column.second.hyperrectangle};
}
}
}

constexpr size_t FIELD_MASK_ROWS = 0x1;
constexpr size_t FIELD_MASK_NULLS = 0x2;
constexpr size_t FIELD_MASK_RECT = 0x4;
constexpr size_t FIELD_MASK_ALL = 0x7;

void DataFileMetaInfo::serialize(WriteBuffer & out) const
{
auto size = columns_info.size();
writeIntBinary(size, out);
for (const auto & column : columns_info)
{
writeStringBinary(column.first, out);
size_t field_mask = 0;
if (column.second.rows_count.has_value())
field_mask |= FIELD_MASK_ROWS;
if (column.second.rows_count.has_value())
field_mask |= FIELD_MASK_NULLS;
if (column.second.rows_count.has_value())
field_mask |= FIELD_MASK_RECT;
writeIntBinary(field_mask, out);

if (column.second.rows_count.has_value())
writeIntBinary(column.second.rows_count.value(), out);
if (column.second.nulls_count.has_value())
writeIntBinary(column.second.nulls_count.value(), out);
if (column.second.hyperrectangle.has_value())
{
writeFieldBinary(column.second.hyperrectangle.value().left, out);
writeFieldBinary(column.second.hyperrectangle.value().right, out);
}
}
}

DataFileMetaInfo DataFileMetaInfo::deserialize(ReadBuffer & in)
{
DataFileMetaInfo result;

size_t size;
readIntBinary(size, in);

for (size_t i = 0; i < size; ++i)
{
std::string name;
readStringBinary(name, in);
size_t field_mask;
readIntBinary(field_mask, in);
if ((field_mask & FIELD_MASK_ALL) != field_mask)
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected field mask: {}", field_mask);

ColumnInfo & column = result.columns_info[name];

if (field_mask & FIELD_MASK_ROWS)
{
Int64 value;
readIntBinary(value, in);
column.rows_count = value;
}
if (field_mask & FIELD_MASK_NULLS)
{
Int64 value;
readIntBinary(value, in);
column.nulls_count = value;
}
if (field_mask & FIELD_MASK_RECT)
{
FieldRef left = readFieldBinary(in);
FieldRef right = readFieldBinary(in);
column.hyperrectangle = Range(left, true, right, true);
}
}

return result;
}


}
Loading
Loading