Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7071,6 +7071,9 @@ Allow Iceberg read optimization based on Iceberg metadata.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
Allow retries in cluster request, when one node goes offline
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
)", EXPERIMENTAL) \
\
/** Experimental timeSeries* aggregate functions. */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"allow_retries_in_cluster_requests", false, false, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down
1 change: 1 addition & 0 deletions src/Databases/DataLake/DataLakeConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace DataLake
{

static constexpr auto DATABASE_ENGINE_NAME = "DataLakeCatalog";
static constexpr auto DATABASE_ALIAS_NAME = "Iceberg";
static constexpr std::string_view FILE_PATH_PREFIX = "file:/";

/// Some catalogs (Unity or Glue) may store not only Iceberg/DeltaLake tables but other kinds of "tables"
Expand Down
6 changes: 6 additions & 0 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,11 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
}

if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only");
}

for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);

Expand Down Expand Up @@ -813,6 +818,7 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
args.uuid);
};
factory.registerDatabase("DataLakeCatalog", create_fn, { .supports_arguments = true, .supports_settings = true });
factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true });
}

}
Expand Down
6 changes: 6 additions & 0 deletions src/IO/ReadBufferFromS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, range_begin);
}
else
{
LOG_TEST(
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}",
bucket, key, version_id.empty() ? "Latest" : version_id);
}

ProfileEvents::increment(ProfileEvents::S3GetObject);
if (client_ptr->isClientForDisk())
Expand Down
3 changes: 2 additions & 1 deletion src/Parsers/ASTSetQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ void ASTSetQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & format,
return true;
}

if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name)
if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name
|| DataLake::DATABASE_ALIAS_NAME == state.create_engine_name)
{
if (DataLake::SETTINGS_TO_HIDE.contains(change.name))
{
Expand Down
2 changes: 1 addition & 1 deletion src/Parsers/FunctionSecretArgumentsFinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ class FunctionSecretArgumentsFinder
/// S3('url', 'access_key_id', 'secret_access_key')
findS3DatabaseSecretArguments();
}
else if (engine_name == "DataLakeCatalog")
else if (engine_name == "DataLakeCatalog" || engine_name == "Iceberg")
{
findDataLakeCatalogSecretArguments();
}
Expand Down
81 changes: 78 additions & 3 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include <Storages/IStorageCluster.h>

#include <pcg_random.hpp>
#include <Common/randomSeed.h>

#include <Common/Exception.h>
#include <Core/Settings.h>
#include <Core/QueryProcessingStage.h>
Expand All @@ -13,6 +16,7 @@
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Planner/Utils.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/narrowPipe.h>
Expand All @@ -28,6 +32,8 @@
#include <Analyzer/QueryNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>

#include <algorithm>
#include <memory>
Expand All @@ -47,6 +53,7 @@ namespace Setting
extern const SettingsNonZeroUInt64 max_parallel_replicas;
extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode;
extern const SettingsUInt64 object_storage_max_nodes;
extern const SettingsBool object_storage_remote_initiator;
}

namespace ErrorCodes
Expand Down Expand Up @@ -283,8 +290,9 @@ void IStorageCluster::read(

storage_snapshot->check(column_names);

updateBeforeRead(context);
auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]);
const auto & settings = context->getSettingsRef();

auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]);

/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)

Expand All @@ -293,7 +301,7 @@ void IStorageCluster::read(

updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context);

if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
if (settings[Setting::allow_experimental_analyzer])
{
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_to_send, context, SelectQueryOptions(processed_stage));
}
Expand All @@ -306,6 +314,17 @@ void IStorageCluster::read(

updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);

if (settings[Setting::object_storage_remote_initiator])
{
auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send);
auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(storage_and_context.storage);
auto modified_query_info = query_info;
modified_query_info.cluster = src_distributed->getCluster();
auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context);
storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams);
return;
}

RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as<ASTSelectQuery &>(), 0));
data.remote_table.database = context->getCurrentDatabase();
Expand Down Expand Up @@ -333,6 +352,62 @@ void IStorageCluster::read(
query_plan.addStep(std::move(reading));
}

IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
ClusterPtr cluster,
ContextPtr context,
const std::string & cluster_name_from_settings,
ASTPtr query_to_send)
{
auto host_addresses = cluster->getShardsAddresses();
if (host_addresses.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings);

static pcg64 rng(randomSeed());
size_t shard_num = rng() % host_addresses.size();
auto shard_addresses = host_addresses[shard_num];
/// After getClusterImpl each shard must have exactly 1 replica
if (shard_addresses.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings);
auto host_name = shard_addresses[0].toString();

LOG_INFO(log, "Choose remote initiator '{}'", host_name);

bool secure = shard_addresses[0].secure == Protocol::Secure::Enable;
std::string remote_function_name = secure ? "remoteSecure" : "remote";

/// Clean object_storage_remote_initiator setting to avoid infinite remote call
auto new_context = Context::createCopy(context);
new_context->setSetting("object_storage_remote_initiator", false);

auto * select_query = query_to_send->as<ASTSelectQuery>();
if (!select_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query");

auto query_settings = select_query->settings();
if (query_settings)
{
auto & settings_ast = query_settings->as<ASTSetQuery &>();
if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty())
{
select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {});
}
}

ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send);
if (!table_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression");

auto remote_query = makeASTFunction(remote_function_name, std::make_shared<ASTLiteral>(host_name), table_expression->table_function);

table_expression->table_function = remote_query;

auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context);

auto storage = remote_function->execute(query_to_send, new_context, remote_function_name);

return RemoteCallVariables{storage, new_context};
}

SinkToStoragePtr IStorageCluster::write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
Expand Down
13 changes: 12 additions & 1 deletion src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,21 @@ class IStorageCluster : public IStorage
virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); }

protected:
virtual void updateBeforeRead(const ContextPtr &) {}
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context);

struct RemoteCallVariables
{
StoragePtr storage;
ContextPtr context;
};

RemoteCallVariables convertToRemote(
ClusterPtr cluster,
ContextPtr context,
const std::string & cluster_name_from_settings,
ASTPtr query_to_send);

virtual void readFallBackToPure(
QueryPlan & /* query_plan */,
const Names & /* column_names */,
Expand Down
66 changes: 52 additions & 14 deletions src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSetQuery.h>
#include <Disks/DiskType.h>

#include <memory>
Expand All @@ -42,20 +43,21 @@ namespace ErrorCodes

namespace DataLakeStorageSetting
{
extern DataLakeStorageSettingsBool allow_dynamic_metadata_for_data_lakes;
extern DataLakeStorageSettingsDatabaseDataLakeCatalogType storage_catalog_type;
extern DataLakeStorageSettingsString object_storage_endpoint;
extern DataLakeStorageSettingsString storage_aws_access_key_id;
extern DataLakeStorageSettingsString storage_aws_secret_access_key;
extern DataLakeStorageSettingsString storage_region;
extern DataLakeStorageSettingsString storage_catalog_url;
extern DataLakeStorageSettingsString storage_warehouse;
extern DataLakeStorageSettingsString storage_catalog_credential;

extern DataLakeStorageSettingsString storage_auth_scope;
extern DataLakeStorageSettingsString storage_auth_header;
extern DataLakeStorageSettingsString storage_oauth_server_uri;
extern DataLakeStorageSettingsBool storage_oauth_server_use_request_body;
extern const DataLakeStorageSettingsBool allow_dynamic_metadata_for_data_lakes;
extern const DataLakeStorageSettingsDatabaseDataLakeCatalogType storage_catalog_type;
extern const DataLakeStorageSettingsString object_storage_endpoint;
extern const DataLakeStorageSettingsString storage_aws_access_key_id;
extern const DataLakeStorageSettingsString storage_aws_secret_access_key;
extern const DataLakeStorageSettingsString storage_region;
extern const DataLakeStorageSettingsString storage_catalog_url;
extern const DataLakeStorageSettingsString storage_warehouse;
extern const DataLakeStorageSettingsString storage_catalog_credential;

extern const DataLakeStorageSettingsString storage_auth_scope;
extern const DataLakeStorageSettingsString storage_auth_header;
extern const DataLakeStorageSettingsString storage_oauth_server_uri;
extern const DataLakeStorageSettingsBool storage_oauth_server_use_request_body;
extern const DataLakeStorageSettingsString iceberg_metadata_file_path;
}

template <typename T>
Expand Down Expand Up @@ -324,6 +326,42 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
current_metadata->addDeleteTransformers(object_info, builder, format_settings, local_context);
}

ASTPtr createArgsWithAccessData() const override
{
auto res = BaseStorageConfiguration::createArgsWithAccessData();

auto iceberg_metadata_file_path = (*settings)[DataLakeStorageSetting::iceberg_metadata_file_path];

if (iceberg_metadata_file_path.changed)
{
auto * arguments = res->template as<ASTExpressionList>();
if (!arguments)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not an expression list");

bool has_settings = false;

for (auto & arg : arguments->children)
{
if (auto * settings_ast = arg->template as<ASTSetQuery>())
{
has_settings = true;
settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value);
break;
}
}

if (!has_settings)
{
std::shared_ptr<ASTSetQuery> settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value);
arguments->children.push_back(settings_ast);
}
}

return res;
}

private:
DataLakeMetadataPtr current_metadata;
LoggerPtr log = getLogger("DataLakeConfiguration");
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,20 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, StorageObjectStorag
{
}

Strings HudiMetadata::getDataFiles(const ActionsDAG *) const
Strings HudiMetadata::getDataFiles() const
{
if (data_files.empty())
data_files = getDataFilesImpl();
return data_files;
}

ObjectIterator HudiMetadata::iterate(
const ActionsDAG * filter_dag,
const ActionsDAG * /* filter_dag */,
FileProgressCallback callback,
size_t /* list_batch_size */,
ContextPtr /* context */) const
{
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
return createKeysIterator(getDataFiles(), object_storage, callback);
}

}
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/DataLakes/HudiMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext
mutable Strings data_files;

Strings getDataFilesImpl() const;
Strings getDataFiles(const ActionsDAG * filter_dag) const;
Strings getDataFiles() const;
};

}
5 changes: 3 additions & 2 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ StorageObjectStorage::StorageObjectStorage(
bool distributed_processing_,
ASTPtr partition_by_,
bool is_table_function,
bool lazy_init)
bool lazy_init,
std::optional<std::string> sample_path_)
: IStorage(table_id_)
, configuration(configuration_)
, object_storage(object_storage_)
Expand Down Expand Up @@ -156,7 +157,7 @@ StorageObjectStorage::StorageObjectStorage(
/// (e.g. read always follows constructor immediately).
update_configuration_on_read_write = !is_table_function || !updated_configuration;

std::string sample_path;
std::string sample_path = sample_path_.value_or("");

ColumnsDescription columns{columns_in_table_or_function_definition};
if (need_resolve_columns_or_format)
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class StorageObjectStorage : public IStorage
bool distributed_processing_ = false,
ASTPtr partition_by_ = nullptr,
bool is_table_function_ = false,
bool lazy_init = false);
bool lazy_init = false,
std::optional<std::string> sample_path_ = std::nullopt);

String getName() const override;

Expand Down
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
/* distributed_processing */false,
partition_by,
/* is_table_function */false,
/* lazy_init */lazy_init);
/* lazy_init */lazy_init,
sample_path);

auto virtuals_ = getVirtualsPtr();
if (virtuals_)
Expand Down
Loading
Loading