Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6135,8 +6135,12 @@ Possible values:
/** Experimental tsToGrid aggregate function. */ \
DECLARE(Bool, allow_experimental_ts_to_grid_aggregate_function, false, R"(
Experimental tsToGrid aggregate function for Prometheus-like timeseries resampling. Cloud only
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
)", EXPERIMENTAL) \
\

/* ####################################################### */ \
/* ############ END OF EXPERIMENTAL FEATURES ############# */ \
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"allow_experimental_database_unity_catalog", false, false, "Allow experimental database engine DataLakeCatalog with catalog_type = 'unity'"},
{"allow_experimental_database_glue_catalog", false, false, "Allow experimental database engine DataLakeCatalog with catalog_type = 'glue'"},
{"use_page_cache_with_distributed_cache", false, false, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"use_iceberg_metadata_files_cache", true, true, "New setting"},
{"use_query_condition_cache", false, false, "New setting."},
{"iceberg_timestamp_ms", 0, 0, "New setting."},
Expand Down
6 changes: 6 additions & 0 deletions src/IO/ReadBufferFromS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, range_begin);
}
else
{
LOG_TEST(
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}",
bucket, key, version_id.empty() ? "Latest" : version_id);
}

ProfileEvents::increment(ProfileEvents::S3GetObject);
if (client_ptr->isClientForDisk())
Expand Down
84 changes: 80 additions & 4 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include <Storages/IStorageCluster.h>

#include <pcg_random.hpp>
#include <Common/randomSeed.h>

#include <Common/Exception.h>
#include <Core/Settings.h>
#include <Core/QueryProcessingStage.h>
Expand All @@ -12,6 +15,7 @@
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Planner/Utils.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/narrowPipe.h>
Expand All @@ -21,6 +25,9 @@
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDictionary.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/extractTableFunctionFromSelectQuery.h>

#include <algorithm>
#include <memory>
Expand All @@ -39,6 +46,7 @@ namespace Setting
extern const SettingsString cluster_for_parallel_replicas;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
extern const SettingsUInt64 object_storage_max_nodes;
extern const SettingsBool object_storage_remote_initiator;
}

namespace ErrorCodes
Expand Down Expand Up @@ -72,7 +80,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
if (extension)
return;

extension = storage->getTaskIteratorExtension(predicate, context, cluster);
extension = storage->getTaskIteratorExtension(predicate, filter_actions_dag, context, cluster);
}

/// The code executes on initiator
Expand All @@ -96,15 +104,16 @@ void IStorageCluster::read(

storage_snapshot->check(column_names);

updateBeforeRead(context);
auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]);
const auto & settings = context->getSettingsRef();

auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]);

/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)

Block sample_block;
ASTPtr query_to_send = query_info.query;

if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
if (settings[Setting::allow_experimental_analyzer])
{
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
}
Expand All @@ -117,6 +126,17 @@ void IStorageCluster::read(

updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);

if (settings[Setting::object_storage_remote_initiator])
{
auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send);
auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(storage_and_context.storage);
auto modified_query_info = query_info;
modified_query_info.cluster = src_distributed->getCluster();
auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context);
storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams);
return;
}

RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0));
data.remote_table.database = context->getCurrentDatabase();
Expand Down Expand Up @@ -144,6 +164,62 @@ void IStorageCluster::read(
query_plan.addStep(std::move(reading));
}

IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
ClusterPtr cluster,
ContextPtr context,
const std::string & cluster_name_from_settings,
ASTPtr query_to_send)
{
auto host_addresses = cluster->getShardsAddresses();
if (host_addresses.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings);

static pcg64 rng(randomSeed());
size_t shard_num = rng() % host_addresses.size();
auto shard_addresses = host_addresses[shard_num];
/// After getClusterImpl each shard must have exactly 1 replica
if (shard_addresses.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings);
auto host_name = shard_addresses[0].toString();

LOG_INFO(log, "Choose remote initiator '{}'", host_name);

bool secure = shard_addresses[0].secure == Protocol::Secure::Enable;
std::string remote_function_name = secure ? "remoteSecure" : "remote";

/// Clean object_storage_remote_initiator setting to avoid infinite remote call
auto new_context = Context::createCopy(context);
new_context->setSetting("object_storage_remote_initiator", false);

auto * select_query = query_to_send->as<ASTSelectQuery>();
if (!select_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query");

auto query_settings = select_query->settings();
if (query_settings)
{
auto & settings_ast = query_settings->as<ASTSetQuery &>();
if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty())
{
select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {});
}
}

ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send);
if (!table_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression");

auto remote_query = makeASTFunction(remote_function_name, std::make_shared<ASTLiteral>(host_name), table_expression->table_function);

table_expression->table_function = remote_query;

auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context);

auto storage = remote_function->execute(query_to_send, new_context, remote_function_name);

return RemoteCallVariables{storage, new_context};
}

SinkToStoragePtr IStorageCluster::write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
Expand Down
14 changes: 13 additions & 1 deletion src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class IStorageCluster : public IStorage
/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const std::optional<ActionsDAG> & filter_actions_dag,
const ContextPtr & context,
ClusterPtr cluster) const = 0;

Expand All @@ -57,9 +58,20 @@ class IStorageCluster : public IStorage
virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); }

protected:
virtual void updateBeforeRead(const ContextPtr &) {}
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}

struct RemoteCallVariables
{
StoragePtr storage;
ContextPtr context;
};

RemoteCallVariables convertToRemote(
ClusterPtr cluster,
ContextPtr context,
const std::string & cluster_name_from_settings,
ASTPtr query_to_send);

virtual void readFallBackToPure(
QueryPlan & /* query_plan */,
const Names & /* column_names */,
Expand Down
35 changes: 34 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
return std::nullopt;
}

std::optional<String> tryGetSamplePathFromMetadata() const override
{
if (!current_metadata)
return std::nullopt;
auto data_files = current_metadata->getDataFiles();
if (!data_files.empty())
return data_files[0];
return std::nullopt;
}

std::optional<size_t> totalRows() override
{
if (!current_metadata)
Expand Down Expand Up @@ -280,6 +290,7 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration,
std::string getEngineName() const override { return getImpl().getEngineName(); }
std::string getNamespaceType() const override { return getImpl().getNamespaceType(); }

Path getFullPath() const override { return getImpl().getFullPath(); }
Path getPath() const override { return getImpl().getPath(); }
void setPath(const Path & path) override { getImpl().setPath(path); }

Expand All @@ -296,9 +307,14 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration,
ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure) override
{ getImpl().addStructureAndFormatToArgsIfNeeded(args, structure_, format_, context, with_structure); }

bool withPartitionWildcard() const override { return getImpl().withPartitionWildcard(); }
bool withGlobsIgnorePartitionWildcard() const override { return getImpl().withGlobsIgnorePartitionWildcard(); }
bool isPathWithGlobs() const override { return getImpl().isPathWithGlobs(); }
bool isNamespaceWithGlobs() const override { return getImpl().isNamespaceWithGlobs(); }
std::string getPathWithoutGlobs() const override { return getImpl().getPathWithoutGlobs(); }

bool isArchive() const override { return getImpl().isArchive(); }
bool isPathInArchiveWithGlobs() const override { return getImpl().isPathInArchiveWithGlobs(); }
std::string getPathInArchive() const override { return getImpl().getPathInArchive(); }

void check(ContextPtr context) const override { getImpl().check(context); }
Expand Down Expand Up @@ -340,8 +356,19 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration,
std::optional<ColumnsDescription> tryGetTableStructureFromMetadata() const override
{ return getImpl().tryGetTableStructureFromMetadata(); }

bool supportsFileIterator() const override { return getImpl().supportsFileIterator(); }
ObjectIterator iterate(
const ActionsDAG * filter_dag,
std::function<void(FileProgress)> callback,
size_t list_batch_size) override
{
return getImpl().iterate(filter_dag, callback, list_batch_size);
}

void update(ObjectStoragePtr object_storage, ContextPtr local_context) override
{ return getImpl().update(object_storage, local_context); }
void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context) override
{ return getImpl().updateIfRequired(object_storage, local_context); }

void initialize(
ASTs & engine_args,
Expand All @@ -366,7 +393,6 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration,
void setCompressionMethod(const String & compression_method_) override { getImpl().setCompressionMethod(compression_method_); }
void setStructure(const String & structure_) override { getImpl().setStructure(structure_); }

protected:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override
{ return getImpl().fromNamedCollection(collection, context); }
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override
Expand Down Expand Up @@ -449,6 +475,13 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration,
createDynamicStorage(type);
}

std::optional<String> tryGetSamplePathFromMetadata() const override
{
return getImpl().tryGetSamplePathFromMetadata();
}

virtual void assertInitialized() const override { return getImpl().assertInitialized(); }

private:
inline StorageObjectStorage::Configuration & getImpl() const
{
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class DeltaLakeMetadata final : public IDataLakeMetadata

DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);

Strings getDataFiles() const override { return data_files; }

NamesAndTypesList getTableSchema() const override { return schema; }

DeltaLakePartitionColumns getPartitionColumns() const { return partition_columns; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &)
return table_snapshot->update();
}

Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const
{
throwNotImplemented("getDataFiles()");
}

ObjectIterator DeltaLakeMetadataDeltaKernel::iterate(
const ActionsDAG * filter_dag,
FileProgressCallback callback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata

bool update(const ContextPtr & context) override;

Strings getDataFiles() const override;

NamesAndTypesList getTableSchema() const override;

NamesAndTypesList getReadSchema() const override;
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv
{
}

Strings HudiMetadata::getDataFiles(const ActionsDAG *) const
Strings HudiMetadata::getDataFiles() const
{
if (data_files.empty())
data_files = getDataFilesImpl();
return data_files;
}

ObjectIterator HudiMetadata::iterate(
const ActionsDAG * filter_dag,
const ActionsDAG * /* filter_dag */,
FileProgressCallback callback,
size_t /* list_batch_size */) const
{
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
return createKeysIterator(getDataFiles(), object_storage, callback);
}

}
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/HudiMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext

HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);

Strings getDataFiles() const override;

NamesAndTypesList getTableSchema() const override { return {}; }

bool operator ==(const IDataLakeMetadata & other) const override
Expand Down Expand Up @@ -49,7 +51,6 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext
mutable Strings data_files;

Strings getDataFilesImpl() const;
Strings getDataFiles(const ActionsDAG * filter_dag) const;
};

}
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class IDataLakeMetadata : boost::noncopyable

virtual bool operator==(const IDataLakeMetadata & other) const = 0;

/// List all data files.
/// For better parallelization, iterate() method should be used.
virtual Strings getDataFiles() const = 0;
/// Return iterator to `data files`.
using FileProgressCallback = std::function<void(FileProgress)>;
virtual ObjectIterator iterate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64
return create_fn();
}

Strings IcebergMetadata::getDataFiles(const ActionsDAG * filter_dag) const
Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
{
if (!relevant_snapshot)
return {};
Expand Down Expand Up @@ -716,7 +716,7 @@ ObjectIterator IcebergMetadata::iterate(
FileProgressCallback callback,
size_t /* list_batch_size */) const
{
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
return createKeysIterator(getDataFilesImpl(filter_dag), object_storage, callback);
}

}
Expand Down
Loading
Loading