-
Notifications
You must be signed in to change notification settings - Fork 11
Yet another export replicated partition pr #1124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 50 commits
6097651
43e9459
659b309
d3bb820
444e0ee
35c6cca
b884fd3
c7493cb
f4f9d52
91c7ec2
69cd83f
62cb51f
54c2dfb
7b3a7c9
3f3983c
b89cd5e
bb04fd9
55e7b94
abe14f3
e225798
0ca5e28
63c48ce
ee00ebb
5c61bd6
d609d04
b571f5a
4487431
f23ed2d
149a437
fb2d7f7
ee2abd0
d6e4226
c2b5d84
601be4c
d9796b4
3f64bff
26bd613
803a91a
7a12b3a
de9deb2
cf13ec2
0c0b85f
4024b52
6a8b390
7e90b70
0c67f05
059467c
a8b9ce4
460f2f4
522cfdb
147d21c
bada40f
0486df2
6a5e2a2
8602587
276dbf1
1ec94fa
4135ff7
a736a6c
37d50ec
7c7f251
3314f21
59cd727
6b9870a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6873,6 +6873,22 @@ Use roaring bitmap for iceberg positional deletes. | |
| )", 0) \ | ||
| DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"( | ||
| Overwrite file if it already exists when exporting a merge tree part | ||
| )", 0) \ | ||
| DECLARE(Bool, export_merge_tree_partition_force_export, false, R"( | ||
| Ignore existing partition export and overwrite the zookeeper entry | ||
| )", 0) \ | ||
| DECLARE(UInt64, export_merge_tree_partition_max_retries, 3, R"( | ||
| Maximum number of retries for exporting a merge tree part in an export partition task | ||
| )", 0) \ | ||
| DECLARE(UInt64, export_merge_tree_partition_manifest_ttl, 180, R"( | ||
| Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination. | ||
| This setting does not affect / delete in progress tasks. It'll only cleanup the completed ones. | ||
| )", 0) \ | ||
| DECLARE(String, export_merge_tree_part_file_already_exists_policy, "NO_OP", R"( | ||
| Possible values: | ||
| - NO_OP - No-op if the file already exists - Default. | ||
|
||
| - ERROR - Throw an error if the file already exists. | ||
| - OVERWRITE - Overwrite the file | ||
| )", 0) \ | ||
| DECLARE(Timezone, iceberg_timezone_for_timestamptz, "UTC", R"( | ||
| Timezone for Iceberg timestamptz field. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -545,6 +545,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS | |
| required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table); | ||
| break; | ||
| } | ||
| case ASTAlterCommand::EXPORT_PARTITION: | ||
| { | ||
| required_access.emplace_back(AccessType::ALTER_EXPORT_PARTITION, command.to_database, command.to_table); | ||
| required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table); | ||
|
Comment on lines
548
to
551
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When building the access list for Useful? React with 👍 / 👎. |
||
| break; | ||
| } | ||
| case ASTAlterCommand::FETCH_PARTITION: | ||
| { | ||
| required_access.emplace_back(AccessType::ALTER_FETCH_PARTITION, database, table); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -250,6 +250,76 @@ BlockIO InterpreterKillQueryQuery::execute() | |
|
|
||
| break; | ||
| } | ||
| case ASTKillQueryQuery::Type::ExportPartition: | ||
| { | ||
| Block exports_block = getSelectResult( | ||
| "source_database, source_table, transaction_id, destination_database, destination_table, partition_id", | ||
| "system.replicated_partition_exports"); | ||
| if (exports_block.empty()) | ||
| return res_io; | ||
|
|
||
| const ColumnString & src_db_col = typeid_cast<const ColumnString &>(*exports_block.getByName("source_database").column); | ||
| const ColumnString & src_table_col = typeid_cast<const ColumnString &>(*exports_block.getByName("source_table").column); | ||
| const ColumnString & dst_db_col = typeid_cast<const ColumnString &>(*exports_block.getByName("destination_database").column); | ||
| const ColumnString & dst_table_col = typeid_cast<const ColumnString &>(*exports_block.getByName("destination_table").column); | ||
| const ColumnString & tx_col = typeid_cast<const ColumnString &>(*exports_block.getByName("transaction_id").column); | ||
|
|
||
| auto header = exports_block.cloneEmpty(); | ||
| header.insert(0, {ColumnString::create(), std::make_shared<DataTypeString>(), "kill_status"}); | ||
|
|
||
| MutableColumns res_columns = header.cloneEmptyColumns(); | ||
| AccessRightsElements required_access_rights; | ||
| auto access = getContext()->getAccess(); | ||
| bool access_denied = false; | ||
|
|
||
| for (size_t i = 0; i < exports_block.rows(); ++i) | ||
| { | ||
| const auto src_database = src_db_col.getDataAt(i).toString(); | ||
| const auto src_table = src_table_col.getDataAt(i).toString(); | ||
| const auto dst_database = dst_db_col.getDataAt(i).toView(); | ||
| const auto dst_table = dst_table_col.getDataAt(i).toView(); | ||
|
|
||
| const auto table_id = StorageID{src_database, src_table}; | ||
| const auto transaction_id = tx_col.getDataAt(i).toString(); | ||
|
|
||
| CancellationCode code = CancellationCode::Unknown; | ||
| if (!query.test) | ||
| { | ||
| auto storage = DatabaseCatalog::instance().tryGetTable(table_id, getContext()); | ||
| if (!storage) | ||
| code = CancellationCode::NotFound; | ||
| else | ||
| { | ||
| ASTAlterCommand alter_command{}; | ||
| alter_command.type = ASTAlterCommand::EXPORT_PARTITION; | ||
| alter_command.move_destination_type = DataDestinationType::TABLE; | ||
| alter_command.from_database = src_database; | ||
| alter_command.from_table = src_table; | ||
| alter_command.to_database = dst_database; | ||
| alter_command.to_table = dst_table; | ||
|
|
||
| required_access_rights = InterpreterAlterQuery::getRequiredAccessForCommand( | ||
| alter_command, table_id.database_name, table_id.table_name); | ||
| if (!access->isGranted(required_access_rights)) | ||
| { | ||
| access_denied = true; | ||
| continue; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IDK, maybe
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is how it is implemented for all other kill operations. I don't know for sure which one makes more sense, but I would vote for keeping it consistent with the existing behavior |
||
| } | ||
| code = storage->killExportPartition(transaction_id); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is somewhat weird: Do we assume that if user can't kill an export then it is not supposed to know about it in the first place? |
||
| } | ||
| } | ||
|
|
||
| insertResultRow(i, code, exports_block, header, res_columns); | ||
| } | ||
|
|
||
| if (res_columns[0]->empty() && access_denied) | ||
| throw Exception(ErrorCodes::ACCESS_DENIED, "Not allowed to kill export partition. " | ||
| "To execute this query, it's necessary to have the grant {}", required_access_rights.toString()); | ||
|
|
||
| res_io.pipeline = QueryPipeline(Pipe(std::make_shared<SourceFromSingleChunk>(std::make_shared<const Block>(header.cloneWithColumns(std::move(res_columns)))))); | ||
|
|
||
| break; | ||
| } | ||
| case ASTKillQueryQuery::Type::Mutation: | ||
| { | ||
| Block mutations_block = getSelectResult("database, table, mutation_id, command", "system.mutations"); | ||
|
|
@@ -462,6 +532,9 @@ AccessRightsElements InterpreterKillQueryQuery::getRequiredAccessForDDLOnCluster | |
| | AccessType::ALTER_MATERIALIZE_COLUMN | ||
| | AccessType::ALTER_MATERIALIZE_TTL | ||
| ); | ||
| /// todo arthur think about this | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks ok, rest of the KILL operations require |
||
| else if (query.type == ASTKillQueryQuery::Type::ExportPartition) | ||
| required_access.emplace_back(AccessType::ALTER_EXPORT_PARTITION); | ||
| return required_access; | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should be an enum, like
TransactionsWaitCSNMode,JoinStrictness,Dialect, etc.That helps to avoid stuff like that you have in the MergeTreeData.cpp:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. How do I serialize it and deserialize it in json. https://github.com/Altinity/ClickHouse/pull/1124/files#diff-4de213b42d7a2dc4e48fd47e3fa625bee3144288a5d9a44c1336d1b63d3b70bbR164.
Do I need to wrap those calls with magic_enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, maybe I can use
ClickHouse/src/Core/SettingsEnums.h
Line 29 in efad38c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a separate thing, you'd have to serialize/deserialize to/from JSON anyhow. BUT whenever possible, we should use ENUMs for the setting types since that pushes validation forward in time, meaning that invalid value will fail sooner rather than later (which is a good thing).