Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/Storages/ObjectStorage/Local/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,20 @@ StorageObjectStorageQuerySettings StorageLocalConfiguration::getQuerySettings(co
.ignore_non_existent_file = false};
}

ASTPtr StorageLocalConfiguration::createArgsWithAccessData() const
{
auto arguments = std::make_shared<ASTExpressionList>();

arguments->children.push_back(std::make_shared<ASTLiteral>(path.path));
if (getFormat() != "auto")
arguments->children.push_back(std::make_shared<ASTLiteral>(getFormat()));
if (getStructure() != "auto")
arguments->children.push_back(std::make_shared<ASTLiteral>(getStructure()));
if (getCompressionMethod() != "auto")
arguments->children.push_back(std::make_shared<ASTLiteral>(getCompressionMethod()));

return arguments;
}


}
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/Local/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class StorageLocalConfiguration : public StorageObjectStorageConfiguration

void addStructureAndFormatToArgsIfNeeded(ASTs &, const String &, const String &, ContextPtr, bool) override { }

ASTPtr createArgsWithAccessData() const override;

private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
{"IcebergS3", "icebergS3"},
{"IcebergAzure", "icebergAzure"},
{"IcebergHDFS", "icebergHDFS"},
{"IcebergLocal", "icebergLocal"},
{"DeltaLake", "deltaLake"},
{"DeltaLakeS3", "deltaLakeS3"},
{"DeltaLakeAzure", "deltaLakeAzure"},
Expand Down Expand Up @@ -416,6 +417,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
{"icebergS3", "icebergS3Cluster"},
{"icebergAzure", "icebergAzureCluster"},
{"icebergHDFS", "icebergHDFSCluster"},
{"icebergLocal", "icebergLocalCluster"},
{"deltaLake", "deltaLakeCluster"},
{"deltaLakeS3", "deltaLakeS3Cluster"},
{"deltaLakeAzure", "deltaLakeAzureCluster"},
Expand Down
7 changes: 7 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ struct IcebergHDFSClusterDefinition
static constexpr auto non_clustered_storage_engine_name = IcebergHDFSDefinition::storage_engine_name;
};

struct IcebergLocalClusterDefinition
{
static constexpr auto name = "icebergLocalCluster";
static constexpr auto storage_engine_name = "IcebergLocalCluster";
static constexpr auto non_clustered_storage_engine_name = IcebergLocalDefinition::storage_engine_name;
};

struct DeltaLakeClusterDefinition
{
static constexpr auto name = "deltaLakeCluster";
Expand Down
10 changes: 1 addition & 9 deletions src/TableFunctions/TableFunctionObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConf

#if USE_AVRO
template class TableFunctionObjectStorage<IcebergClusterDefinition, StorageIcebergConfiguration, true>;
template class TableFunctionObjectStorage<IcebergLocalClusterDefinition, StorageLocalIcebergConfiguration, true>;
#endif

#if USE_AVRO && USE_AWS_S3
Expand Down Expand Up @@ -334,13 +335,4 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory)
.allow_readonly = false});
}
#endif


void registerDataLakeTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO
registerTableFunctionIceberg(factory);
#endif
}
}
7 changes: 7 additions & 0 deletions src/TableFunctions/TableFunctionObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
.category = FunctionDocumentation::Category::TableFunction},
.allow_readonly = false});

factory.registerFunction<TableFunctionIcebergLocalCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on shared storage in parallel for many nodes in a specified cluster.)",
.examples{{IcebergLocalClusterDefinition::name, "SELECT * FROM icebergLocalCluster(cluster, filename, format, [,compression])", ""}},
.category = FunctionDocumentation::Category::TableFunction},
.allow_readonly = false});

# if USE_AWS_S3
factory.registerFunction<TableFunctionIcebergS3Cluster>(
{.documentation
Expand Down
1 change: 1 addition & 0 deletions src/TableFunctions/TableFunctionObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDe

#if USE_AVRO
using TableFunctionIcebergCluster = TableFunctionObjectStorageCluster<IcebergClusterDefinition, StorageIcebergConfiguration, true>;
using TableFunctionIcebergLocalCluster = TableFunctionObjectStorageCluster<IcebergLocalClusterDefinition, StorageLocalIcebergConfiguration, true>;
#endif

#if USE_AVRO && USE_AWS_S3
Expand Down
29 changes: 29 additions & 0 deletions src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ struct IcebergHDFSClusterFallbackDefinition
static constexpr auto storage_engine_cluster_name = "IcebergHDFSCluster";
};

struct IcebergLocalClusterFallbackDefinition
{
static constexpr auto name = "icebergLocal";
static constexpr auto storage_engine_name = "Local";
static constexpr auto storage_engine_cluster_name = "IcebergLocalCluster";
};

struct DeltaLakeClusterFallbackDefinition
{
static constexpr auto name = "deltaLake";
Expand Down Expand Up @@ -163,6 +170,7 @@ using TableFunctionHDFSClusterFallback = TableFunctionObjectStorageClusterFallba

#if USE_AVRO
using TableFunctionIcebergClusterFallback = TableFunctionObjectStorageClusterFallback<IcebergClusterFallbackDefinition, TableFunctionIcebergCluster>;
using TableFunctionIcebergLocalClusterFallback = TableFunctionObjectStorageClusterFallback<IcebergLocalClusterFallbackDefinition, TableFunctionIcebergLocalCluster>;
#endif

#if USE_AVRO && USE_AWS_S3
Expand Down Expand Up @@ -286,6 +294,27 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
.allow_readonly = false
}
);

factory.registerFunction<TableFunctionIcebergLocalClusterFallback>(
{
.documentation = {
.description=R"(The table function can be used to read the Iceberg table stored on shared disk in parallel for many nodes in a specified cluster or from single node.)",
.examples{
{
"icebergLocal",
"SELECT * FROM icebergLocal(filename)", ""
},
{
"icebergLocal",
"SELECT * FROM icebergLocal(filename) "
"SETTINGS object_storage_cluster='cluster'", ""
},
},
.category = FunctionDocumentation::Category::TableFunction
},
.allow_readonly = false
}
);
#endif

#if USE_AVRO && USE_AWS_S3
Expand Down
1 change: 0 additions & 1 deletion src/TableFunctions/registerTableFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ void registerTableFunctions()
registerTableFunctionObjectStorage(factory);
registerTableFunctionObjectStorageCluster(factory);
registerTableFunctionObjectStorageClusterFallback(factory);
registerDataLakeTableFunctions(factory);
registerDataLakeClusterTableFunctions(factory);

#if USE_YTSAURUS
Expand Down
1 change: 0 additions & 1 deletion src/TableFunctions/registerTableFunctions.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & factory);
void registerDataLakeTableFunctions(TableFunctionFactory & factory);
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);

void registerTableFunctionTimeSeries(TableFunctionFactory & factory);
Expand Down
Loading