Skip to content
37 changes: 37 additions & 0 deletions src/Core/Range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@
#include <Core/Range.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromString.h>
#include <Common/FieldVisitorToString.h>
#include <Common/FieldAccurateComparison.h>


namespace DB
{

namespace ErrorCodes
{
extern const int INCORRECT_DATA;
};


FieldRef::FieldRef(ColumnsWithTypeAndName * columns_, size_t row_idx_, size_t column_idx_)
: Field((*(*columns_)[column_idx_].column)[row_idx_]), columns(columns_), row_idx(row_idx_), column_idx(column_idx_)
{
Expand Down Expand Up @@ -151,6 +158,13 @@ bool Range::isInfinite() const
return left.isNegativeInfinity() && right.isPositiveInfinity();
}

/// [x, x]
bool Range::isPoint() const
{
return fullBounded() && left_included && right_included && equals(left, right)
&& !left.isNegativeInfinity() && !left.isPositiveInfinity();
}

bool Range::intersectsRange(const Range & r) const
{
/// r to the left of me.
Expand Down Expand Up @@ -332,6 +346,29 @@ String Range::toString() const
return str.str();
}

String Range::serialize() const
{
WriteBufferFromOwnString str;

str << left_included << right_included;
writeFieldBinary(left, str);
writeFieldBinary(right, str);

return str.str();
}

void Range::deserialize(const String & range)
{
if (range.empty())
throw Exception(ErrorCodes::INCORRECT_DATA, "Empty range dump");

ReadBufferFromOwnString str(range);

str >> left_included >> right_included;
left = readFieldBinary(str);
right = readFieldBinary(str);
}

Hyperrectangle intersect(const Hyperrectangle & a, const Hyperrectangle & b)
{
size_t result_size = std::min(a.size(), b.size());
Expand Down
5 changes: 5 additions & 0 deletions src/Core/Range.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ struct Range

bool isBlank() const;

bool isPoint() const;

bool intersectsRange(const Range & r) const;

bool containsRange(const Range & r) const;
Expand All @@ -114,6 +116,9 @@ struct Range
bool nearByWith(const Range & r) const;

String toString() const;

String serialize() const;
void deserialize(const String & range);
};

Range intersect(const Range & a, const Range & b);
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6908,6 +6908,9 @@ Allow retries in cluster request, when one node goes offline
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
Allow Iceberg read optimization based on Iceberg metadata.
)", EXPERIMENTAL) \
\
/** Experimental timeSeries* aggregate functions. */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
{"allow_experimental_iceberg_read_optimization", true, true, "New setting."},
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.6",
Expand Down
36 changes: 35 additions & 1 deletion src/Disks/ObjectStorages/IObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
#include <Common/Exception.h>
#include <Common/ObjectStorageKeyGenerator.h>

/// TODO: move DataFileInfo into separate file
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>

#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/JSONException.h>
Expand Down Expand Up @@ -101,6 +104,28 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings
return write_settings;
}

RelativePathWithMetadata::RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_)
: metadata(std::move(metadata_))
, command(task_string)
{
if (!command.isParsed())
relative_path = task_string;
else
{
auto file_path = command.getFilePath();
if (file_path.has_value())
relative_path = file_path.value();
file_meta_info = command.getFileMetaInfo();
}
}

RelativePathWithMetadata::RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_)
: metadata(std::move(metadata_))
{
relative_path = info.file_path;
file_meta_info = info.file_meta_info;
}

void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file)
{
if (!metadata)
Expand Down Expand Up @@ -129,20 +154,29 @@ RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std

successfully_parsed = true;

if (json->has("file_path"))
file_path = json->getValue<std::string>("file_path");
if (json->has("retry_after_us"))
retry_after_us = json->getValue<size_t>("retry_after_us");
if (json->has("meta_info"))
file_meta_info = std::make_shared<DataFileMetaInfo>(json->getObject("meta_info"));
}
catch (const Poco::JSON::JSONException &)
{ /// Not a JSON
return;
}
}

std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
std::string RelativePathWithMetadata::CommandInTaskResponse::toString() const
{
Poco::JSON::Object json;

if (file_path.has_value())
json.set("file_path", file_path.value());
if (retry_after_us.has_value())
json.set("retry_after_us", retry_after_us.value());
if (file_meta_info.has_value())
json.set("meta_info", file_meta_info.value()->toJson());

std::ostringstream oss;
oss.exceptions(std::ios::failbit);
Expand Down
35 changes: 24 additions & 11 deletions src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <Poco/Timestamp.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/JSON/Object.h>
#include <Core/Defines.h>
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
Expand Down Expand Up @@ -101,6 +102,10 @@ struct ObjectMetadata
ObjectAttributes attributes;
};

struct DataFileInfo;
class DataFileMetaInfo;
using DataFileMetaInfoPtr = std::shared_ptr<DataFileMetaInfo>;

struct RelativePathWithMetadata
{
class CommandInTaskResponse
Expand All @@ -109,31 +114,35 @@ struct RelativePathWithMetadata
CommandInTaskResponse() = default;
explicit CommandInTaskResponse(const std::string & task);

bool is_parsed() const { return successfully_parsed; }
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
bool isParsed() const { return successfully_parsed; }
void setFilePath(const std::string & file_path_ ) { file_path = file_path_; }
void setRetryAfterUs(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
void setFileMetaInfo(DataFileMetaInfoPtr file_meta_info_ ) { file_meta_info = file_meta_info_; }

std::string toString() const;

std::optional<std::string> getFilePath() const { return file_path; }

std::string to_string() const;
std::optional<Poco::Timestamp::TimeDiff> getRetryAfterUs() const { return retry_after_us; }

std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }
std::optional<DataFileMetaInfoPtr> getFileMetaInfo() const { return file_meta_info; }

private:
bool successfully_parsed = false;
std::optional<std::string> file_path;
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
std::optional<DataFileMetaInfoPtr> file_meta_info;
};

String relative_path;
std::optional<ObjectMetadata> metadata;
CommandInTaskResponse command;
std::optional<DataFileMetaInfoPtr> file_meta_info;

RelativePathWithMetadata() = default;

explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
: metadata(std::move(metadata_))
, command(task_string)
{
if (!command.is_parsed())
relative_path = task_string;
}
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt);
explicit RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_ = std::nullopt);

virtual ~RelativePathWithMetadata() = default;

Expand All @@ -145,6 +154,10 @@ struct RelativePathWithMetadata
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }

void setFileMetaInfo(DataFileMetaInfoPtr file_meta_info_ ) { file_meta_info = file_meta_info_; }
void setFileMetaInfo(std::optional<DataFileMetaInfoPtr> file_meta_info_ ) { file_meta_info = file_meta_info_; }
std::optional<DataFileMetaInfoPtr> getFileMetaInfo() const { return file_meta_info; }

void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file);
const CommandInTaskResponse & getCommand() const { return command; }
};
Expand Down
7 changes: 6 additions & 1 deletion src/Processors/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ void Chunk::addColumn(ColumnPtr column)

void Chunk::addColumn(size_t position, ColumnPtr column)
{
if (position >= columns.size())
if (position == columns.size())
{
addColumn(column);
return;
}
if (position > columns.size())
throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND,
"Position {} out of bound in Chunk::addColumn(), max position = {}",
position, !columns.empty() ? columns.size() - 1 : 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
return std::nullopt;
auto data_files = current_metadata->getDataFiles();
if (!data_files.empty())
return data_files[0];
return data_files[0].file_path;
return std::nullopt;
}

Expand Down
4 changes: 2 additions & 2 deletions src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ struct DeltaLakeMetadataImpl
struct DeltaLakeMetadata
{
NamesAndTypesList schema;
Strings data_files;
DataFileInfos data_files;
DeltaLakePartitionColumns partition_columns;
};

Expand Down Expand Up @@ -195,7 +195,7 @@ struct DeltaLakeMetadataImpl
processMetadataFile(key, current_schema, current_partition_columns, result_files);
}

return DeltaLakeMetadata{current_schema, Strings(result_files.begin(), result_files.end()), current_partition_columns};
return DeltaLakeMetadata{current_schema, DataFileInfos(result_files.begin(), result_files.end()), current_partition_columns};
}

/**
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class DeltaLakeMetadata final : public IDataLakeMetadata

DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);

Strings getDataFiles() const override { return data_files; }
DataFileInfos getDataFiles() const override { return data_files; }

NamesAndTypesList getTableSchema() const override { return schema; }

Expand Down Expand Up @@ -67,12 +67,12 @@ class DeltaLakeMetadata final : public IDataLakeMetadata
ContextPtr context) const override;

private:
mutable Strings data_files;
mutable DataFileInfos data_files;
NamesAndTypesList schema;
DeltaLakePartitionColumns partition_columns;
ObjectStoragePtr object_storage;

Strings getDataFiles(const ActionsDAG *) const { return data_files; }
DataFileInfos getDataFiles(const ActionsDAG *) const { return data_files; }
};

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &)
return table_snapshot->update();
}

Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const
DataFileInfos DeltaLakeMetadataDeltaKernel::getDataFiles() const
{
throwNotImplemented("getDataFiles()");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata

bool update(const ContextPtr & context) override;

Strings getDataFiles() const override;
DataFileInfos getDataFiles() const override;

NamesAndTypesList getTableSchema() const override;

Expand Down
8 changes: 4 additions & 4 deletions src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace ErrorCodes
* hoodie.parquet.max.file.size option. Once a single Parquet file is too large, Hudi creates a second file group.
* Each file group is identified by File Id.
*/
Strings HudiMetadata::getDataFilesImpl() const
DataFileInfos HudiMetadata::getDataFilesImpl() const
{
auto configuration_ptr = configuration.lock();
auto log = getLogger("HudiMetadata");
Expand Down Expand Up @@ -76,12 +76,12 @@ Strings HudiMetadata::getDataFilesImpl() const
}
}

Strings result;
DataFileInfos result;
for (auto & [partition, partition_data] : files)
{
LOG_TRACE(log, "Adding {} data files from partition {}", partition, partition_data.size());
for (auto & [file_id, file_data] : partition_data)
result.push_back(std::move(file_data.key));
result.push_back(DataFileInfo(std::move(file_data.key)));
}
return result;
}
Expand All @@ -91,7 +91,7 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv
{
}

Strings HudiMetadata::getDataFiles() const
DataFileInfos HudiMetadata::getDataFiles() const
{
if (data_files.empty())
data_files = getDataFilesImpl();
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/ObjectStorage/DataLakes/HudiMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext

HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);

Strings getDataFiles() const override;
DataFileInfos getDataFiles() const override;

NamesAndTypesList getTableSchema() const override { return {}; }

Expand Down Expand Up @@ -49,9 +49,9 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext
private:
const ObjectStoragePtr object_storage;
const ConfigurationObserverPtr configuration;
mutable Strings data_files;
mutable DataFileInfos data_files;

Strings getDataFilesImpl() const;
DataFileInfos getDataFilesImpl() const;
};

}
Loading
Loading