forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathStorageURLCluster.cpp
More file actions
107 lines (87 loc) · 4 KB
/
StorageURLCluster.cpp
File metadata and controls
107 lines (87 loc) · 4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#include "Interpreters/Context_fwd.h"
#include <Storages/StorageURLCluster.h>
#include <Common/HTTPHeaderFilter.h>
#include <Core/QueryProcessingStage.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Sources/RemoteSource.h>
#include <Parsers/queryToString.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageURL.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <Storages/VirtualColumnUtils.h>
#include <TableFunctions/TableFunctionURLCluster.h>
#include <memory>
namespace DB
{
namespace Setting
{
extern const SettingsUInt64 glob_expansion_max_elements;
}
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
StorageURLCluster::StorageURLCluster(
const ContextPtr & context,
const String & cluster_name_,
const String & uri_,
const String & format_,
const String & compression_method,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const StorageURL::Configuration & configuration_)
: IStorageCluster(cluster_name_, table_id_, getLogger("StorageURLCluster (" + table_id_.table_name + ")"))
, uri(uri_), format_name(format_)
{
context->getRemoteHostFilter().checkURL(Poco::URI(uri));
context->getHTTPHeaderFilter().checkHeaders(configuration_.headers);
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
ColumnsDescription columns;
if (format_name == "auto")
std::tie(columns, format_name) = StorageURL::getTableStructureAndFormatFromData(
uri, chooseCompressionMethod(Poco::URI(uri).getPath(), compression_method), configuration_.headers, std::nullopt, context);
else
columns = StorageURL::getTableStructureFromData(
format_, uri, chooseCompressionMethod(Poco::URI(uri).getPath(), compression_method), configuration_.headers, std::nullopt, context);
storage_metadata.setColumns(columns);
}
else
{
if (format_name == "auto")
format_name = StorageURL::getTableStructureAndFormatFromData(
uri, chooseCompressionMethod(Poco::URI(uri).getPath(), compression_method), configuration_.headers, std::nullopt, context).second;
storage_metadata.setColumns(columns_);
}
storage_metadata.setConstraints(constraints_);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.columns, context, getSampleURI(uri, context)));
setInMemoryMetadata(storage_metadata);
}
void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context)
{
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function urlCluster, got '{}'", queryToString(query));
TableFunctionURLCluster::updateStructureAndFormatArgumentsIfNeeded(
expression_list->children, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context);
}
RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
ClusterPtr) const
{
auto iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(
uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), context);
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); });
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
}
}