Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6135,8 +6135,12 @@ Possible values:
/** Experimental tsToGrid aggregate function. */ \
DECLARE(Bool, allow_experimental_ts_to_grid_aggregate_function, false, R"(
Experimental tsToGrid aggregate function for Prometheus-like timeseries resampling. Cloud only
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
)", EXPERIMENTAL) \
\

/* ####################################################### */ \
/* ############ END OF EXPERIMENTAL FEATURES ############# */ \
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"allow_experimental_database_unity_catalog", false, false, "Allow experimental database engine DataLakeCatalog with catalog_type = 'unity'"},
{"allow_experimental_database_glue_catalog", false, false, "Allow experimental database engine DataLakeCatalog with catalog_type = 'glue'"},
{"use_page_cache_with_distributed_cache", false, false, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"use_iceberg_metadata_files_cache", true, true, "New setting"},
{"use_query_condition_cache", false, false, "New setting."},
{"iceberg_timestamp_ms", 0, 0, "New setting."},
Expand Down
6 changes: 6 additions & 0 deletions src/IO/ReadBufferFromS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, range_begin);
}
else
{
LOG_TEST(
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}",
bucket, key, version_id.empty() ? "Latest" : version_id);
}

ProfileEvents::increment(ProfileEvents::S3GetObject);
if (client_ptr->isClientForDisk())
Expand Down
82 changes: 79 additions & 3 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include <Storages/IStorageCluster.h>

#include <pcg_random.hpp>
#include <Common/randomSeed.h>

#include <Common/Exception.h>
#include <Core/Settings.h>
#include <Core/QueryProcessingStage.h>
Expand All @@ -12,6 +15,7 @@
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Planner/Utils.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/narrowPipe.h>
Expand All @@ -21,6 +25,9 @@
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDictionary.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/extractTableFunctionFromSelectQuery.h>

#include <algorithm>
#include <memory>
Expand All @@ -39,6 +46,7 @@ namespace Setting
extern const SettingsString cluster_for_parallel_replicas;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
extern const SettingsUInt64 object_storage_max_nodes;
extern const SettingsBool object_storage_remote_initiator;
}

namespace ErrorCodes
Expand Down Expand Up @@ -96,15 +104,16 @@ void IStorageCluster::read(

storage_snapshot->check(column_names);

updateBeforeRead(context);
auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]);
const auto & settings = context->getSettingsRef();

auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]);

/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)

Block sample_block;
ASTPtr query_to_send = query_info.query;

if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
if (settings[Setting::allow_experimental_analyzer])
{
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
}
Expand All @@ -117,6 +126,17 @@ void IStorageCluster::read(

updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);

if (settings[Setting::object_storage_remote_initiator])
{
auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send);
auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(storage_and_context.storage);
auto modified_query_info = query_info;
modified_query_info.cluster = src_distributed->getCluster();
auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context);
storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams);
return;
}

RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0));
data.remote_table.database = context->getCurrentDatabase();
Expand Down Expand Up @@ -144,6 +164,62 @@ void IStorageCluster::read(
query_plan.addStep(std::move(reading));
}

IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
ClusterPtr cluster,
ContextPtr context,
const std::string & cluster_name_from_settings,
ASTPtr query_to_send)
{
auto host_addresses = cluster->getShardsAddresses();
if (host_addresses.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings);

static pcg64 rng(randomSeed());
size_t shard_num = rng() % host_addresses.size();
auto shard_addresses = host_addresses[shard_num];
/// After getClusterImpl each shard must have exactly 1 replica
if (shard_addresses.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings);
auto host_name = shard_addresses[0].toString();

LOG_INFO(log, "Choose remote initiator '{}'", host_name);

bool secure = shard_addresses[0].secure == Protocol::Secure::Enable;
std::string remote_function_name = secure ? "remoteSecure" : "remote";

/// Clean object_storage_remote_initiator setting to avoid infinite remote call
auto new_context = Context::createCopy(context);
new_context->setSetting("object_storage_remote_initiator", false);

auto * select_query = query_to_send->as<ASTSelectQuery>();
if (!select_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query");

auto query_settings = select_query->settings();
if (query_settings)
{
auto & settings_ast = query_settings->as<ASTSetQuery &>();
if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty())
{
select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {});
}
}

ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send);
if (!table_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression");

auto remote_query = makeASTFunction(remote_function_name, std::make_shared<ASTLiteral>(host_name), table_expression->table_function);

table_expression->table_function = remote_query;

auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context);

auto storage = remote_function->execute(query_to_send, new_context, remote_function_name);

return RemoteCallVariables{storage, new_context};
}

SinkToStoragePtr IStorageCluster::write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
Expand Down
13 changes: 12 additions & 1 deletion src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,20 @@ class IStorageCluster : public IStorage
virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); }

protected:
virtual void updateBeforeRead(const ContextPtr &) {}
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}

struct RemoteCallVariables
{
StoragePtr storage;
ContextPtr context;
};

RemoteCallVariables convertToRemote(
ClusterPtr cluster,
ContextPtr context,
const std::string & cluster_name_from_settings,
ASTPtr query_to_send);

virtual void readFallBackToPure(
QueryPlan & /* query_plan */,
const Names & /* column_names */,
Expand Down
16 changes: 16 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
return std::nullopt;
}

std::optional<String> tryGetSamplePathFromMetadata() const override
{
if (!current_metadata)
return std::nullopt;
auto data_files = current_metadata->getDataFiles();
if (!data_files.empty())
return data_files[0];
return std::nullopt;
}

std::optional<size_t> totalRows() override
{
if (!current_metadata)
Expand Down Expand Up @@ -465,8 +475,14 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration,
createDynamicStorage(type);
}

std::optional<String> tryGetSamplePathFromMetadata() const override
{
return getImpl().tryGetSamplePathFromMetadata();
}

virtual void assertInitialized() const override { return getImpl().assertInitialized(); }


private:
inline StorageObjectStorage::Configuration & getImpl() const
{
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class DeltaLakeMetadata final : public IDataLakeMetadata

DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);

Strings getDataFiles() const override { return data_files; }

NamesAndTypesList getTableSchema() const override { return schema; }

DeltaLakePartitionColumns getPartitionColumns() const { return partition_columns; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &)
return table_snapshot->update();
}

Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const
{
throwNotImplemented("getDataFiles()");
}

ObjectIterator DeltaLakeMetadataDeltaKernel::iterate(
const ActionsDAG * filter_dag,
FileProgressCallback callback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata

bool update(const ContextPtr & context) override;

Strings getDataFiles() const override;

NamesAndTypesList getTableSchema() const override;

NamesAndTypesList getReadSchema() const override;
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv
{
}

Strings HudiMetadata::getDataFiles(const ActionsDAG *) const
Strings HudiMetadata::getDataFiles() const
{
if (data_files.empty())
data_files = getDataFilesImpl();
return data_files;
}

ObjectIterator HudiMetadata::iterate(
const ActionsDAG * filter_dag,
const ActionsDAG * /* filter_dag */,
FileProgressCallback callback,
size_t /* list_batch_size */) const
{
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
return createKeysIterator(getDataFiles(), object_storage, callback);
}

}
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/HudiMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext

HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);

Strings getDataFiles() const override;

NamesAndTypesList getTableSchema() const override { return {}; }

bool operator ==(const IDataLakeMetadata & other) const override
Expand Down Expand Up @@ -49,7 +51,6 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext
mutable Strings data_files;

Strings getDataFilesImpl() const;
Strings getDataFiles(const ActionsDAG * filter_dag) const;
};

}
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class IDataLakeMetadata : boost::noncopyable

virtual bool operator==(const IDataLakeMetadata & other) const = 0;

/// List all data files.
/// For better parallelization, iterate() method should be used.
virtual Strings getDataFiles() const = 0;
/// Return iterator to `data files`.
using FileProgressCallback = std::function<void(FileProgress)>;
virtual ObjectIterator iterate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64
return create_fn();
}

Strings IcebergMetadata::getDataFiles(const ActionsDAG * filter_dag) const
Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
{
if (!relevant_snapshot)
return {};
Expand Down Expand Up @@ -716,7 +716,7 @@ ObjectIterator IcebergMetadata::iterate(
FileProgressCallback callback,
size_t /* list_batch_size */) const
{
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
return createKeysIterator(getDataFilesImpl(filter_dag), object_storage, callback);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext
const Poco::JSON::Object::Ptr & metadata_object,
IcebergMetadataFilesCachePtr cache_ptr);

/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
/// All subsequent calls when the same data snapshot is relevant will return saved list of files (because it cannot be changed
/// without changing metadata file). Drops on every snapshot update.
Strings getDataFiles() const override { return getDataFilesImpl(nullptr); }

/// Get table schema parsed from metadata.
NamesAndTypesList getTableSchema() const override
{
Expand Down Expand Up @@ -118,7 +123,7 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext

void updateState(const ContextPtr & local_context, bool metadata_file_changed);

Strings getDataFiles(const ActionsDAG * filter_dag) const;
Strings getDataFilesImpl(const ActionsDAG * filter_dag) const;

void updateSnapshot();

Expand Down
12 changes: 8 additions & 4 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ StorageObjectStorage::StorageObjectStorage(
bool distributed_processing_,
ASTPtr partition_by_,
bool is_table_function_,
bool lazy_init)
bool lazy_init,
std::optional<std::string> sample_path_)
: IStorage(table_id_)
, configuration(configuration_)
, object_storage(object_storage_)
Expand Down Expand Up @@ -131,7 +132,7 @@ StorageObjectStorage::StorageObjectStorage(
if (!do_lazy_init)
do_init();

std::string sample_path;
std::string sample_path = sample_path_.value_or("");
ColumnsDescription columns{columns_};
resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context);
configuration->check(context);
Expand Down Expand Up @@ -353,6 +354,11 @@ std::optional<ColumnsDescription> StorageObjectStorage::Configuration::tryGetTab
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetTableStructureFromMetadata is not implemented for basic configuration");
}

std::optional<String> StorageObjectStorage::Configuration::tryGetSamplePathFromMetadata() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetSamplePathFromMetadata is not implemented for basic configuration");
}

void StorageObjectStorage::read(
QueryPlan & query_plan,
const Names & column_names,
Expand Down Expand Up @@ -523,9 +529,7 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData(

auto table_structure = configuration->tryGetTableStructureFromMetadata();
if (table_structure)
{
return table_structure.value();
}
}

ObjectInfos read_keys;
Expand Down
Loading
Loading