Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6108,8 +6108,12 @@ Limit for hosts used for request in object storage cluster table functions - azu
Possible values:
- Positive integer.
- 0 — All hosts in cluster.
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
)", EXPERIMENTAL) \
\

/* ####################################################### */ \
/* ############ END OF EXPERIMENTAL FEATURES ############# */ \
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
// Altinity Antalya modifications atop of 25.2
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"use_iceberg_metadata_files_cache", true, true, "New setting"},
{"iceberg_timestamp_ms", 0, 0, "New setting."},
{"iceberg_snapshot_id", 0, 0, "New setting."},
Expand Down
6 changes: 6 additions & 0 deletions src/IO/ReadBufferFromS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, range_begin);
}
else
{
LOG_TEST(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a case where we request whole object?

Copy link
Author

@ianton-ru ianton-ru May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg metadata for example

:) CREATE DATABASE datalake ENGINE = Iceberg('http://rest:8181/v1', 'minio', 'minio123') SETTINGS catalog_type = 'rest', storage_endpoint = 'http://minio:9000/warehouse', warehouse = 'iceberg'
:) SELECT * FROM datalake.`iceberg.bids`
Query id: d1bb9862-c077-403f-9843-94fd28173760

   ┌───────────────────datetime─┬─symbol─┬────bid─┬────ask─┐
1. │ 2019-08-09 08:35:00.000000 │ AAPL   │ 198.23 │ 195.45 │
2. │ 2019-08-09 08:35:00.000000 │ AAPL   │ 198.25 │  198.5 │
3. │ 2019-08-07 08:35:00.000000 │ AAPL   │ 195.23 │ 195.28 │
4. │ 2019-08-07 08:35:00.000000 │ AAPL   │ 195.22 │ 195.28 │
5. │ 2019-08-09 08:35:00.000000 │ AAPL   │ 198.23 │ 195.45 │
6. │ 2019-08-09 08:35:00.000000 │ AAPL   │ 198.25 │  198.5 │
   └────────────────────────────┴────────┴────────┴────────┘

:) select ProfileEvents['S3GetObject'] from system.query_log where type='QueryFinish' and query_id='d1bb9862-c077-403f-9843-94fd28173760'

   ┌─arrayElement⋯GetObject')─┐
1. │                        8 │
   └──────────────────────────┘
...

grep "Read S3 object" /var/log/clickhouse-server/clickhouse-server.log

2025.05.07 22:38:10.414791 [ 80 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/metadata/00003-ad725ef4-c28e-4ed4-aa4b-2e2aae0716d4.metadata.json, Version: Latest
2025.05.07 22:38:10.416600 [ 80 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/metadata/snap-182060351258856937-0-ff436521-29e9-4437-be5b-eb60f209baa9.avro, Version: Latest
2025.05.07 22:38:10.418360 [ 80 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/metadata/ff436521-29e9-4437-be5b-eb60f209baa9-m0.avro, Version: Latest
2025.05.07 22:38:10.420138 [ 80 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/metadata/6f3e6993-47c9-4556-b70c-c6c48d2ced6f-m0.avro, Version: Latest
2025.05.07 22:38:10.421658 [ 80 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/metadata/f0de1c43-e367-4e3d-8c9d-4076d8fb0cbd-m0.avro, Version: Latest
2025.05.07 22:38:10.426911 [ 767 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/data/datetime_day=2019-08-09/00000-0-ff436521-29e9-4437-be5b-eb60f209baa9.parquet, Version: Latest, Range: 0-1643
2025.05.07 22:38:10.427003 [ 762 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/data/datetime_day=2019-08-09/00000-0-6f3e6993-47c9-4556-b70c-c6c48d2ced6f.parquet, Version: Latest, Range: 0-1643
2025.05.07 22:38:10.427050 [ 770 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/data/datetime_day=2019-08-07/00000-0-f0de1c43-e367-4e3d-8c9d-4076d8fb0cbd.parquet, Version: Latest, Range: 0-1635

I added this for consistency, all requests count in ProfileEvents['S3GetObject'], but in logs only part of requests.

log, "Read S3 object. Bucket: {}, Key: {}, Version: {}",
bucket, key, version_id.empty() ? "Latest" : version_id);
}

ProfileEvents::increment(ProfileEvents::S3GetObject);
if (client_ptr->isClientForDisk())
Expand Down
82 changes: 79 additions & 3 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include <Storages/IStorageCluster.h>

#include <pcg_random.hpp>
#include <Common/randomSeed.h>

#include <Common/Exception.h>
#include <Core/Settings.h>
#include <Core/QueryProcessingStage.h>
Expand All @@ -13,6 +16,7 @@
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Parsers/queryToString.h>
#include <Planner/Utils.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/narrowPipe.h>
Expand All @@ -22,6 +26,9 @@
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDictionary.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>

#include <memory>
#include <string>
Expand All @@ -36,6 +43,7 @@ namespace Setting
extern const SettingsBool skip_unavailable_shards;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
extern const SettingsUInt64 object_storage_max_nodes;
extern const SettingsBool object_storage_remote_initiator;
}

namespace ErrorCodes
Expand Down Expand Up @@ -93,15 +101,16 @@ void IStorageCluster::read(

storage_snapshot->check(column_names);

updateBeforeRead(context);
auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]);
const auto & settings = context->getSettingsRef();

auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]);

/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)

Block sample_block;
ASTPtr query_to_send = query_info.query;

if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
if (settings[Setting::allow_experimental_analyzer])
{
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
}
Expand All @@ -114,6 +123,17 @@ void IStorageCluster::read(

updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);

if (settings[Setting::object_storage_remote_initiator])
{
auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send);
auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(storage_and_context.storage);
auto modified_query_info = query_info;
modified_query_info.cluster = src_distributed->getCluster();
auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context);
storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams);
return;
}

RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0));
data.remote_table.database = context->getCurrentDatabase();
Expand Down Expand Up @@ -141,6 +161,62 @@ void IStorageCluster::read(
query_plan.addStep(std::move(reading));
}

IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
ClusterPtr cluster,
ContextPtr context,
const std::string & cluster_name_from_settings,
ASTPtr query_to_send)
{
auto host_addresses = cluster->getShardsAddresses();
if (host_addresses.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings);

static pcg64 rng(randomSeed());
size_t shard_num = rng() % host_addresses.size();
auto shard_addresses = host_addresses[shard_num];
/// After getClusterImpl each shard must have exactly 1 replica
if (shard_addresses.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings);
auto host_name = shard_addresses[0].toString();

LOG_INFO(log, "Choose remote initiator '{}'", host_name);

bool secure = shard_addresses[0].secure == Protocol::Secure::Enable;
std::string remote_function_name = secure ? "remoteSecure" : "remote";

/// Clean object_storage_remote_initiator setting to avoid infinite remote call
auto new_context = Context::createCopy(context);
new_context->setSetting("object_storage_remote_initiator", false);

auto * select_query = query_to_send->as<ASTSelectQuery>();
if (!select_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query");

auto query_settings = select_query->settings();
if (query_settings)
{
auto & settings_ast = query_settings->as<ASTSetQuery &>();
if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty())
{
select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {});
}
}

ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send);
if (!table_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression");

auto remote_query = makeASTFunction(remote_function_name, std::make_shared<ASTLiteral>(host_name), table_expression->table_function);

table_expression->table_function = remote_query;

auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context);

auto storage = remote_function->execute(query_to_send, new_context, remote_function_name);

return RemoteCallVariables{storage, new_context};
}

SinkToStoragePtr IStorageCluster::write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
Expand Down
13 changes: 12 additions & 1 deletion src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,20 @@ class IStorageCluster : public IStorage
virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); }

protected:
virtual void updateBeforeRead(const ContextPtr &) {}
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}

struct RemoteCallVariables
{
StoragePtr storage;
ContextPtr context;
};

RemoteCallVariables convertToRemote(
ClusterPtr cluster,
ContextPtr context,
const std::string & cluster_name_from_settings,
ASTPtr query_to_send);

virtual void readFallBackToPure(
QueryPlan & /* query_plan */,
const Names & /* column_names */,
Expand Down
10 changes: 10 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
return std::nullopt;
}

std::optional<String> tryGetSamplePathFromMetadata() const override
{
if (!current_metadata)
return std::nullopt;
auto data_files = current_metadata->getDataFiles();
if (!data_files.empty())
return data_files[0];
return std::nullopt;
}

std::optional<size_t> totalRows() override
{
if (!current_metadata)
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class DeltaLakeMetadata final : public IDataLakeMetadata

DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);

Strings getDataFiles() const override { return data_files; }

NamesAndTypesList getTableSchema() const override { return schema; }

DeltaLakePartitionColumns getPartitionColumns() const { return partition_columns; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &)
return table_snapshot->update();
}

Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const
{
throwNotImplemented("getDataFiles()");
}

ObjectIterator DeltaLakeMetadataDeltaKernel::iterate(
const ActionsDAG * filter_dag,
FileProgressCallback callback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata

bool update(const ContextPtr & context) override;

Strings getDataFiles() const override;

NamesAndTypesList getTableSchema() const override;

NamesAndTypesList getReadSchema() const override;
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv
{
}

Strings HudiMetadata::getDataFiles(const ActionsDAG *) const
Strings HudiMetadata::getDataFiles() const
{
if (data_files.empty())
data_files = getDataFilesImpl();
return data_files;
}

ObjectIterator HudiMetadata::iterate(
const ActionsDAG * filter_dag,
const ActionsDAG * /* filter_dag */,
FileProgressCallback callback,
size_t /* list_batch_size */) const
{
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
return createKeysIterator(getDataFiles(), object_storage, callback);
}

}
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/HudiMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext

HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);

Strings getDataFiles() const override;

NamesAndTypesList getTableSchema() const override { return {}; }

bool operator ==(const IDataLakeMetadata & other) const override
Expand Down Expand Up @@ -49,7 +51,6 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext
mutable Strings data_files;

Strings getDataFilesImpl() const;
Strings getDataFiles(const ActionsDAG * filter_dag) const;
};

}
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class IDataLakeMetadata : boost::noncopyable

virtual bool operator==(const IDataLakeMetadata & other) const = 0;

/// List all data files.
/// For better parallelization, iterate() method should be used.
virtual Strings getDataFiles() const = 0;
/// Return iterator to `data files`.
using FileProgressCallback = std::function<void(FileProgress)>;
virtual ObjectIterator iterate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64
return create_fn();
}

Strings IcebergMetadata::getDataFiles(const ActionsDAG * filter_dag) const
Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
{
if (!relevant_snapshot)
return {};
Expand Down Expand Up @@ -716,7 +716,7 @@ ObjectIterator IcebergMetadata::iterate(
FileProgressCallback callback,
size_t /* list_batch_size */) const
{
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
return createKeysIterator(getDataFilesImpl(filter_dag), object_storage, callback);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext
const Poco::JSON::Object::Ptr & metadata_object,
IcebergMetadataFilesCachePtr cache_ptr);

/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
/// All subsequent calls when the same data snapshot is relevant will return saved list of files (because it cannot be changed
/// without changing metadata file). Drops on every snapshot update.
Strings getDataFiles() const override { return getDataFilesImpl(nullptr); }

/// Get table schema parsed from metadata.
NamesAndTypesList getTableSchema() const override
{
Expand Down Expand Up @@ -118,7 +123,7 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext

void updateState(const ContextPtr & local_context, bool metadata_file_changed);

Strings getDataFiles(const ActionsDAG * filter_dag) const;
Strings getDataFilesImpl(const ActionsDAG * filter_dag) const;

void updateSnapshot();

Expand Down
11 changes: 9 additions & 2 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ StorageObjectStorage::StorageObjectStorage(
bool distributed_processing_,
ASTPtr partition_by_,
bool is_table_function_,
bool lazy_init)
bool lazy_init,
std::optional<std::string> sample_path_)
: IStorage(table_id_)
, configuration(configuration_)
, object_storage(object_storage_)
Expand Down Expand Up @@ -130,7 +131,7 @@ StorageObjectStorage::StorageObjectStorage(
if (!do_lazy_init)
do_init();

std::string sample_path;
std::string sample_path = sample_path_.value_or("");
ColumnsDescription columns{columns_};
resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context);
configuration->check(context);
Expand Down Expand Up @@ -352,6 +353,11 @@ std::optional<ColumnsDescription> StorageObjectStorage::Configuration::tryGetTab
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetTableStructureFromMetadata is not implemented for basic configuration");
}

std::optional<String> StorageObjectStorage::Configuration::tryGetSamplePathFromMetadata() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetSamplePathFromMetadata is not implemented for basic configuration");
}

void StorageObjectStorage::read(
QueryPlan & query_plan,
const Names & column_names,
Expand Down Expand Up @@ -523,6 +529,7 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData(
auto table_structure = configuration->tryGetTableStructureFromMetadata();
if (table_structure)
{
sample_path = configuration->tryGetSamplePathFromMetadata().value_or("");
return table_structure.value();
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class StorageObjectStorage : public IStorage
bool distributed_processing_ = false,
ASTPtr partition_by_ = nullptr,
bool is_table_function_ = false,
bool lazy_init = false);
bool lazy_init = false,
std::optional<std::string> sample_path_ = std::nullopt);

String getName() const override;

Expand Down Expand Up @@ -246,6 +247,7 @@ class StorageObjectStorage::Configuration
ContextPtr local_context);

virtual std::optional<ColumnsDescription> tryGetTableStructureFromMetadata() const;
virtual std::optional<String> tryGetSamplePathFromMetadata() const;

virtual bool supportsFileIterator() const { return false; }
virtual ObjectIterator iterate(
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
format_settings_,
mode_,
/* distributed_processing */false,
partition_by_);
partition_by_,
/* is_table_function */false,
/* lazy_init */false,
sample_path);

auto virtuals_ = getVirtualsPtr();
if (virtuals_)
Expand Down
Loading
Loading