-
Notifications
You must be signed in to change notification settings - Fork 11
Yet another export replicated partition pr #1124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6097651
43e9459
659b309
d3bb820
444e0ee
35c6cca
b884fd3
c7493cb
f4f9d52
91c7ec2
69cd83f
62cb51f
54c2dfb
7b3a7c9
3f3983c
b89cd5e
bb04fd9
55e7b94
abe14f3
e225798
0ca5e28
63c48ce
ee00ebb
5c61bd6
d609d04
b571f5a
4487431
f23ed2d
149a437
fb2d7f7
ee2abd0
d6e4226
c2b5d84
601be4c
d9796b4
3f64bff
26bd613
803a91a
7a12b3a
de9deb2
cf13ec2
0c0b85f
4024b52
6a8b390
7e90b70
0c67f05
059467c
a8b9ce4
460f2f4
522cfdb
147d21c
bada40f
0486df2
6a5e2a2
8602587
276dbf1
1ec94fa
4135ff7
a736a6c
37d50ec
7c7f251
3314f21
59cd727
6b9870a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,4 +7,6 @@ namespace DB | |
|
|
||
| uint64_t generateSnowflakeID(); | ||
|
|
||
| std::string generateSnowflakeIDString(); | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
| #include <DataTypes/DataTypeString.h> | ||
| #include <Columns/ColumnsNumber.h> | ||
| #include <DataTypes/DataTypesNumber.h> | ||
| #include <Core/ServerSettings.h> | ||
| #include <Processors/Sources/SourceFromSingleChunk.h> | ||
| #include <Processors/ISource.h> | ||
| #include <Processors/Executors/PullingPipelineExecutor.h> | ||
|
|
@@ -37,11 +38,17 @@ namespace Setting | |
| extern const SettingsUInt64 max_parser_depth; | ||
| } | ||
|
|
||
| namespace ServerSetting | ||
| { | ||
| extern const ServerSettingsBool enable_experimental_export_merge_tree_partition_feature; | ||
| } | ||
|
|
||
| namespace ErrorCodes | ||
| { | ||
| extern const int LOGICAL_ERROR; | ||
| extern const int ACCESS_DENIED; | ||
| extern const int NOT_IMPLEMENTED; | ||
| extern const int SUPPORT_IS_DISABLED; | ||
| } | ||
|
|
||
|
|
||
|
|
@@ -250,6 +257,82 @@ BlockIO InterpreterKillQueryQuery::execute() | |
|
|
||
| break; | ||
| } | ||
| case ASTKillQueryQuery::Type::ExportPartition: | ||
| { | ||
| if (!getContext()->getServerSettings()[ServerSetting::enable_experimental_export_merge_tree_partition_feature]) | ||
| { | ||
| throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, | ||
| "Exporting merge tree partition is experimental. Set the server setting `enable_experimental_export_merge_tree_partition_feature` to enable it"); | ||
| } | ||
|
|
||
| Block exports_block = getSelectResult( | ||
| "source_database, source_table, transaction_id, destination_database, destination_table, partition_id", | ||
| "system.replicated_partition_exports"); | ||
| if (exports_block.empty()) | ||
| return res_io; | ||
|
|
||
| const ColumnString & src_db_col = typeid_cast<const ColumnString &>(*exports_block.getByName("source_database").column); | ||
| const ColumnString & src_table_col = typeid_cast<const ColumnString &>(*exports_block.getByName("source_table").column); | ||
| const ColumnString & dst_db_col = typeid_cast<const ColumnString &>(*exports_block.getByName("destination_database").column); | ||
| const ColumnString & dst_table_col = typeid_cast<const ColumnString &>(*exports_block.getByName("destination_table").column); | ||
| const ColumnString & tx_col = typeid_cast<const ColumnString &>(*exports_block.getByName("transaction_id").column); | ||
|
|
||
| auto header = exports_block.cloneEmpty(); | ||
| header.insert(0, {ColumnString::create(), std::make_shared<DataTypeString>(), "kill_status"}); | ||
|
|
||
| MutableColumns res_columns = header.cloneEmptyColumns(); | ||
| AccessRightsElements required_access_rights; | ||
| auto access = getContext()->getAccess(); | ||
| bool access_denied = false; | ||
|
|
||
| for (size_t i = 0; i < exports_block.rows(); ++i) | ||
| { | ||
| const auto src_database = src_db_col.getDataAt(i).toString(); | ||
| const auto src_table = src_table_col.getDataAt(i).toString(); | ||
| const auto dst_database = dst_db_col.getDataAt(i).toView(); | ||
| const auto dst_table = dst_table_col.getDataAt(i).toView(); | ||
|
|
||
| const auto table_id = StorageID{src_database, src_table}; | ||
| const auto transaction_id = tx_col.getDataAt(i).toString(); | ||
|
|
||
| CancellationCode code = CancellationCode::Unknown; | ||
| if (!query.test) | ||
| { | ||
| auto storage = DatabaseCatalog::instance().tryGetTable(table_id, getContext()); | ||
| if (!storage) | ||
| code = CancellationCode::NotFound; | ||
| else | ||
| { | ||
| ASTAlterCommand alter_command{}; | ||
| alter_command.type = ASTAlterCommand::EXPORT_PARTITION; | ||
| alter_command.move_destination_type = DataDestinationType::TABLE; | ||
| alter_command.from_database = src_database; | ||
| alter_command.from_table = src_table; | ||
| alter_command.to_database = dst_database; | ||
| alter_command.to_table = dst_table; | ||
|
|
||
| required_access_rights = InterpreterAlterQuery::getRequiredAccessForCommand( | ||
| alter_command, table_id.database_name, table_id.table_name); | ||
| if (!access->isGranted(required_access_rights)) | ||
| { | ||
| access_denied = true; | ||
| continue; | ||
| } | ||
| code = storage->killExportPartition(transaction_id); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is somewhat weird: Do we assume that if user can't kill an export then it is not supposed to know about it in the first place? |
||
| } | ||
| } | ||
|
|
||
| insertResultRow(i, code, exports_block, header, res_columns); | ||
| } | ||
|
|
||
| if (res_columns[0]->empty() && access_denied) | ||
| throw Exception(ErrorCodes::ACCESS_DENIED, "Not allowed to kill export partition. " | ||
| "To execute this query, it's necessary to have the grant {}", required_access_rights.toString()); | ||
|
|
||
| res_io.pipeline = QueryPipeline(Pipe(std::make_shared<SourceFromSingleChunk>(std::make_shared<const Block>(header.cloneWithColumns(std::move(res_columns)))))); | ||
|
|
||
| break; | ||
| } | ||
| case ASTKillQueryQuery::Type::Mutation: | ||
| { | ||
| Block mutations_block = getSelectResult("database, table, mutation_id, command", "system.mutations"); | ||
|
|
@@ -462,6 +545,9 @@ AccessRightsElements InterpreterKillQueryQuery::getRequiredAccessForDDLOnCluster | |
| | AccessType::ALTER_MATERIALIZE_COLUMN | ||
| | AccessType::ALTER_MATERIALIZE_TTL | ||
| ); | ||
| /// todo arthur think about this | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks ok, rest of the KILL operations require |
||
| else if (query.type == ASTKillQueryQuery::Type::ExportPartition) | ||
| required_access.emplace_back(AccessType::ALTER_EXPORT_PARTITION); | ||
| return required_access; | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IDK, maybe
code = CancelCannotBeSent, so the it appears in theres_columns?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how it is implemented for all other kill operations. I don't know for sure which one makes more sense, but I would vote for keeping it consistent with the existing behavior