Skip to content

Commit a1c4e5e

Browse files
authored
Merge pull request #1019 from Altinity/feature/optimize_count_in_datalake
Read optimization using Iceberg metadata
2 parents a4242c1 + fd23354 commit a1c4e5e

22 files changed

+760
-64
lines changed

src/Core/Range.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,20 @@
22
#include <Core/Range.h>
33
#include <IO/Operators.h>
44
#include <IO/WriteBufferFromString.h>
5+
#include <IO/ReadBufferFromString.h>
56
#include <Common/FieldVisitorToString.h>
67
#include <Common/FieldAccurateComparison.h>
78

89

910
namespace DB
1011
{
1112

13+
namespace ErrorCodes
14+
{
15+
extern const int INCORRECT_DATA;
16+
};
17+
18+
1219
FieldRef::FieldRef(ColumnsWithTypeAndName * columns_, size_t row_idx_, size_t column_idx_)
1320
: Field((*(*columns_)[column_idx_].column)[row_idx_]), columns(columns_), row_idx(row_idx_), column_idx(column_idx_)
1421
{
@@ -151,6 +158,13 @@ bool Range::isInfinite() const
151158
return left.isNegativeInfinity() && right.isPositiveInfinity();
152159
}
153160

161+
/// [x, x]
162+
bool Range::isPoint() const
163+
{
164+
return fullBounded() && left_included && right_included && equals(left, right)
165+
&& !left.isNegativeInfinity() && !left.isPositiveInfinity();
166+
}
167+
154168
bool Range::intersectsRange(const Range & r) const
155169
{
156170
/// r to the left of me.
@@ -332,6 +346,29 @@ String Range::toString() const
332346
return str.str();
333347
}
334348

349+
String Range::serialize() const
350+
{
351+
WriteBufferFromOwnString str;
352+
353+
str << left_included << right_included;
354+
writeFieldBinary(left, str);
355+
writeFieldBinary(right, str);
356+
357+
return str.str();
358+
}
359+
360+
void Range::deserialize(const String & range)
361+
{
362+
if (range.empty())
363+
throw Exception(ErrorCodes::INCORRECT_DATA, "Empty range dump");
364+
365+
ReadBufferFromOwnString str(range);
366+
367+
str >> left_included >> right_included;
368+
left = readFieldBinary(str);
369+
right = readFieldBinary(str);
370+
}
371+
335372
Hyperrectangle intersect(const Hyperrectangle & a, const Hyperrectangle & b)
336373
{
337374
size_t result_size = std::min(a.size(), b.size());

src/Core/Range.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ struct Range
9494

9595
bool isBlank() const;
9696

97+
bool isPoint() const;
98+
9799
bool intersectsRange(const Range & r) const;
98100

99101
bool containsRange(const Range & r) const;
@@ -114,6 +116,9 @@ struct Range
114116
bool nearByWith(const Range & r) const;
115117

116118
String toString() const;
119+
120+
String serialize() const;
121+
void deserialize(const String & range);
117122
};
118123

119124
Range intersect(const Range & a, const Range & b);

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6908,6 +6908,9 @@ Allow retries in cluster request, when one node goes offline
69086908
)", EXPERIMENTAL) \
69096909
DECLARE(Bool, object_storage_remote_initiator, false, R"(
69106910
Execute request to object storage as remote on one of object_storage_cluster nodes.
6911+
)", EXPERIMENTAL) \
6912+
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
6913+
Allow Iceberg read optimization based on Iceberg metadata.
69116914
)", EXPERIMENTAL) \
69126915
\
69136916
/** Experimental timeSeries* aggregate functions. */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
7979
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
8080
{"object_storage_remote_initiator", false, false, "New setting."},
8181
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
82+
{"allow_experimental_iceberg_read_optimization", true, true, "New setting."},
8283
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
8384
});
8485
addSettingsChanges(settings_changes_history, "25.6",

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
#include <Common/Exception.h>
99
#include <Common/ObjectStorageKeyGenerator.h>
1010

11+
/// TODO: move DataFileInfo into separate file
12+
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
13+
1114
#include <Poco/JSON/Object.h>
1215
#include <Poco/JSON/Parser.h>
1316
#include <Poco/JSON/JSONException.h>
@@ -101,6 +104,28 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings
101104
return write_settings;
102105
}
103106

107+
RelativePathWithMetadata::RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_)
108+
: metadata(std::move(metadata_))
109+
, command(task_string)
110+
{
111+
if (!command.isParsed())
112+
relative_path = task_string;
113+
else
114+
{
115+
auto file_path = command.getFilePath();
116+
if (file_path.has_value())
117+
relative_path = file_path.value();
118+
file_meta_info = command.getFileMetaInfo();
119+
}
120+
}
121+
122+
RelativePathWithMetadata::RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_)
123+
: metadata(std::move(metadata_))
124+
{
125+
relative_path = info.file_path;
126+
file_meta_info = info.file_meta_info;
127+
}
128+
104129
void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file)
105130
{
106131
if (!metadata)
@@ -129,20 +154,29 @@ RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std
129154

130155
successfully_parsed = true;
131156

157+
if (json->has("file_path"))
158+
file_path = json->getValue<std::string>("file_path");
132159
if (json->has("retry_after_us"))
133160
retry_after_us = json->getValue<size_t>("retry_after_us");
161+
if (json->has("meta_info"))
162+
file_meta_info = std::make_shared<DataFileMetaInfo>(json->getObject("meta_info"));
134163
}
135164
catch (const Poco::JSON::JSONException &)
136165
{ /// Not a JSON
137166
return;
138167
}
139168
}
140169

141-
std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
170+
std::string RelativePathWithMetadata::CommandInTaskResponse::toString() const
142171
{
143172
Poco::JSON::Object json;
173+
174+
if (file_path.has_value())
175+
json.set("file_path", file_path.value());
144176
if (retry_after_us.has_value())
145177
json.set("retry_after_us", retry_after_us.value());
178+
if (file_meta_info.has_value())
179+
json.set("meta_info", file_meta_info.value()->toJson());
146180

147181
std::ostringstream oss;
148182
oss.exceptions(std::ios::failbit);

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include <Poco/Timestamp.h>
1111
#include <Poco/Util/AbstractConfiguration.h>
12+
#include <Poco/JSON/Object.h>
1213
#include <Core/Defines.h>
1314
#include <IO/ReadSettings.h>
1415
#include <IO/WriteSettings.h>
@@ -101,6 +102,10 @@ struct ObjectMetadata
101102
ObjectAttributes attributes;
102103
};
103104

105+
struct DataFileInfo;
106+
class DataFileMetaInfo;
107+
using DataFileMetaInfoPtr = std::shared_ptr<DataFileMetaInfo>;
108+
104109
struct RelativePathWithMetadata
105110
{
106111
class CommandInTaskResponse
@@ -109,31 +114,35 @@ struct RelativePathWithMetadata
109114
CommandInTaskResponse() = default;
110115
explicit CommandInTaskResponse(const std::string & task);
111116

112-
bool is_parsed() const { return successfully_parsed; }
113-
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
117+
bool isParsed() const { return successfully_parsed; }
118+
void setFilePath(const std::string & file_path_ ) { file_path = file_path_; }
119+
void setRetryAfterUs(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
120+
void setFileMetaInfo(DataFileMetaInfoPtr file_meta_info_ ) { file_meta_info = file_meta_info_; }
121+
122+
std::string toString() const;
123+
124+
std::optional<std::string> getFilePath() const { return file_path; }
114125

115-
std::string to_string() const;
126+
std::optional<Poco::Timestamp::TimeDiff> getRetryAfterUs() const { return retry_after_us; }
116127

117-
std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }
128+
std::optional<DataFileMetaInfoPtr> getFileMetaInfo() const { return file_meta_info; }
118129

119130
private:
120131
bool successfully_parsed = false;
132+
std::optional<std::string> file_path;
121133
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
134+
std::optional<DataFileMetaInfoPtr> file_meta_info;
122135
};
123136

124137
String relative_path;
125138
std::optional<ObjectMetadata> metadata;
126139
CommandInTaskResponse command;
140+
std::optional<DataFileMetaInfoPtr> file_meta_info;
127141

128142
RelativePathWithMetadata() = default;
129143

130-
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
131-
: metadata(std::move(metadata_))
132-
, command(task_string)
133-
{
134-
if (!command.is_parsed())
135-
relative_path = task_string;
136-
}
144+
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt);
145+
explicit RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_ = std::nullopt);
137146

138147
virtual ~RelativePathWithMetadata() = default;
139148

@@ -145,6 +154,10 @@ struct RelativePathWithMetadata
145154
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
146155
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
147156

157+
void setFileMetaInfo(DataFileMetaInfoPtr file_meta_info_ ) { file_meta_info = file_meta_info_; }
158+
void setFileMetaInfo(std::optional<DataFileMetaInfoPtr> file_meta_info_ ) { file_meta_info = file_meta_info_; }
159+
std::optional<DataFileMetaInfoPtr> getFileMetaInfo() const { return file_meta_info; }
160+
148161
void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file);
149162
const CommandInTaskResponse & getCommand() const { return command; }
150163
};

src/Processors/Chunk.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,12 @@ void Chunk::addColumn(ColumnPtr column)
108108

109109
void Chunk::addColumn(size_t position, ColumnPtr column)
110110
{
111-
if (position >= columns.size())
111+
if (position == columns.size())
112+
{
113+
addColumn(column);
114+
return;
115+
}
116+
if (position > columns.size())
112117
throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND,
113118
"Position {} out of bound in Chunk::addColumn(), max position = {}",
114119
position, !columns.empty() ? columns.size() - 1 : 0);

src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
104104
return std::nullopt;
105105
auto data_files = current_metadata->getDataFiles();
106106
if (!data_files.empty())
107-
return data_files[0];
107+
return data_files[0].file_path;
108108
return std::nullopt;
109109
}
110110

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ struct DeltaLakeMetadataImpl
158158
struct DeltaLakeMetadata
159159
{
160160
NamesAndTypesList schema;
161-
Strings data_files;
161+
DataFileInfos data_files;
162162
DeltaLakePartitionColumns partition_columns;
163163
};
164164

@@ -195,7 +195,7 @@ struct DeltaLakeMetadataImpl
195195
processMetadataFile(key, current_schema, current_partition_columns, result_files);
196196
}
197197

198-
return DeltaLakeMetadata{current_schema, Strings(result_files.begin(), result_files.end()), current_partition_columns};
198+
return DeltaLakeMetadata{current_schema, DataFileInfos(result_files.begin(), result_files.end()), current_partition_columns};
199199
}
200200

201201
/**

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class DeltaLakeMetadata final : public IDataLakeMetadata
3535

3636
DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);
3737

38-
Strings getDataFiles() const override { return data_files; }
38+
DataFileInfos getDataFiles() const override { return data_files; }
3939

4040
NamesAndTypesList getTableSchema() const override { return schema; }
4141

@@ -67,12 +67,12 @@ class DeltaLakeMetadata final : public IDataLakeMetadata
6767
ContextPtr context) const override;
6868

6969
private:
70-
mutable Strings data_files;
70+
mutable DataFileInfos data_files;
7171
NamesAndTypesList schema;
7272
DeltaLakePartitionColumns partition_columns;
7373
ObjectStoragePtr object_storage;
7474

75-
Strings getDataFiles(const ActionsDAG *) const { return data_files; }
75+
DataFileInfos getDataFiles(const ActionsDAG *) const { return data_files; }
7676
};
7777

7878
}

0 commit comments

Comments
 (0)