Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Interpreters/DDLWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ DDLWorker::DDLWorker(
void DDLWorker::startup()
{
[[maybe_unused]] bool prev_stop_flag = stop_flag.exchange(false);
chassert(true);
chassert(prev_stop_flag);
main_thread = ThreadFromGlobalPool(&DDLWorker::runMainThread, this);
cleanup_thread = ThreadFromGlobalPool(&DDLWorker::runCleanupThread, this);
}
Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/InterpreterOptimizeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <Access/Common/AccessRightsElement.h>
#include <Common/typeid_cast.h>
#include <Parsers/ASTExpressionList.h>
#include <Storages/MergeTree/MergeTreeData.h>

#include <Interpreters/processColumnTransformers.h>

Expand Down Expand Up @@ -75,6 +76,9 @@ BlockIO InterpreterOptimizeQuery::execute()
}
}

if (auto * snapshot_data = dynamic_cast<MergeTreeData::SnapshotData *>(storage_snapshot->data.get()))
snapshot_data->parts = {};

table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, getContext());

return {};
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeFromLogEntryTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
write_part_log(ExecutionStatus::fromCurrentException());

if (storage.getSettings()->detach_not_byte_identical_parts)
storage.forgetPartAndMoveToDetached(std::move(part), "merge-not-byte-identical");
storage.forcefullyMovePartToDetachedAndRemoveFromMemory(std::move(part), "merge-not-byte-identical");
else
storage.tryRemovePartImmediately(std::move(part));

Expand Down
18 changes: 17 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3136,7 +3136,23 @@ void MergeTreeData::restoreAndActivatePart(const DataPartPtr & part, DataPartsLo
modifyPartState(part, DataPartState::Active);
}

void MergeTreeData::forgetPartAndMoveToDetached(const MergeTreeData::DataPartPtr & part_to_detach, const String & prefix, bool restore_covered)

void MergeTreeData::outdateBrokenPartAndCloneToDetached(const DataPartPtr & part_to_detach, const String & prefix)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
if (prefix.empty())
LOG_INFO(log, "Cloning part {} to {} and making it obsolete.", part_to_detach->data_part_storage->getPartDirectory(), part_to_detach->name);
else
LOG_INFO(log, "Cloning part {} to {}_{} and making it obsolete.", part_to_detach->data_part_storage->getPartDirectory(), prefix, part_to_detach->name);

part_to_detach->makeCloneInDetached(prefix, metadata_snapshot);

DataPartsLock lock = lockParts();
if (part_to_detach->getState() == DataPartState::Active)
removePartsFromWorkingSet(NO_TRANSACTION_RAW, {part_to_detach}, true, &lock);
}

void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeTreeData::DataPartPtr & part_to_detach, const String & prefix, bool restore_covered)
{
if (prefix.empty())
LOG_INFO(log, "Renaming {} to {} and forgetting it.", part_to_detach->data_part_storage->getPartDirectory(), part_to_detach->name);
Expand Down
7 changes: 6 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,12 @@ class MergeTreeData : public IStorage, public WithMutableContext
/// Renames the part to detached/<prefix>_<part> and removes it from data_parts,
//// so it will not be deleted in clearOldParts.
/// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part.
void forgetPartAndMoveToDetached(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false);
/// NOTE: This method is safe to use only for parts which nobody else holds (like on server start or for parts which was not committed).
/// For active parts it's unsafe because this method modifies fields of part (rename) while some other thread can try to read it.
void forcefullyMovePartToDetachedAndRemoveFromMemory(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false);

/// Outdate broken part, set remove time to zero (remove as fast as possible) and make clone in detached directory.
void outdateBrokenPartAndCloneToDetached(const DataPartPtr & part, const String & prefix);

/// If the part is Obsolete and not used by anybody else, immediately delete it from filesystem and remove from memory.
void tryRemovePartImmediately(DataPartPtr && part);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MutateFromLogEntryTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
write_part_log(ExecutionStatus::fromCurrentException());

if (storage.getSettings()->detach_not_byte_identical_parts)
storage.forgetPartAndMoveToDetached(std::move(new_part), "mutate-not-byte-identical");
storage.forcefullyMovePartToDetachedAndRemoveFromMemory(std::move(new_part), "mutate-not-byte-identical");
else
storage.tryRemovePartImmediately(std::move(new_part));

Expand Down
4 changes: 2 additions & 2 deletions src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
LOG_ERROR(log, fmt::runtime(message));

/// Delete part locally.
storage.forgetPartAndMoveToDetached(part, "broken");
storage.outdateBrokenPartAndCloneToDetached(part, "broken");

/// Part is broken, let's try to find it and fetch.
searchForMissingPartAndFetchIfPossible(part_name, exists_in_zookeeper);
Expand All @@ -392,7 +392,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na

String message = "Unexpected part " + part_name + " in filesystem. Removing.";
LOG_ERROR(log, fmt::runtime(message));
storage.forgetPartAndMoveToDetached(part, "unexpected");
storage.outdateBrokenPartAndCloneToDetached(part, "unexpected");
return {part_name, false, message};
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
if (part)
{
LOG_DEBUG(log, "Found part {} with failed quorum. Moving to detached. This shouldn't happen often.", part_name);
storage.forgetPartAndMoveToDetached(part, "noquorum");
storage.forcefullyMovePartToDetachedAndRemoveFromMemory(part, "noquorum");
storage.queue.removeFailedQuorumPart(part->info);
}
}
Expand Down
46 changes: 32 additions & 14 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
for (const DataPartPtr & part : unexpected_parts)
{
LOG_ERROR(log, "Renaming unexpected part {} to ignored_{}", part->name, part->name);
forgetPartAndMoveToDetached(part, "ignored", true);
forcefullyMovePartToDetachedAndRemoveFromMemory(part, "ignored", true);
}
}

Expand Down Expand Up @@ -2452,6 +2452,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
std::vector<QueueEntryInfo> source_queue;
ActiveDataPartSet get_part_set{format_version};
ActiveDataPartSet drop_range_set{format_version};
std::unordered_set<String> exact_part_names;

{
std::vector<zkutil::ZooKeeper::FutureGet> queue_get_futures;
Expand Down Expand Up @@ -2489,14 +2490,22 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
info.parsed_entry->znode_name = source_queue_names[i];

if (info.parsed_entry->type == LogEntry::DROP_RANGE)
{
drop_range_set.add(info.parsed_entry->new_part_name);

if (info.parsed_entry->type == LogEntry::GET_PART)
}
else if (info.parsed_entry->type == LogEntry::GET_PART)
{
String maybe_covering_drop_range = drop_range_set.getContainingPart(info.parsed_entry->new_part_name);
if (maybe_covering_drop_range.empty())
get_part_set.add(info.parsed_entry->new_part_name);
}
else
{
/// We should keep local parts if they present in the queue of source replica.
/// There's a chance that we are the only replica that has these parts.
Strings entry_virtual_parts = info.parsed_entry->getVirtualPartNames(format_version);
std::move(entry_virtual_parts.begin(), entry_virtual_parts.end(), std::inserter(exact_part_names, exact_part_names.end()));
}
}
}

Expand All @@ -2516,11 +2525,17 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo

for (const auto & part : local_parts_in_zk)
{
if (get_part_set.getContainingPart(part).empty())
{
parts_to_remove_from_zk.emplace_back(part);
LOG_WARNING(log, "Source replica does not have part {}. Removing it from ZooKeeper.", part);
}
/// We look for exact match (and not for any covering part)
/// because our part might be dropped and covering part might be merged though gap.
/// (avoid resurrection of data that was removed a long time ago)
if (get_part_set.getContainingPart(part) == part)
continue;

if (exact_part_names.contains(part))
continue;

parts_to_remove_from_zk.emplace_back(part);
LOG_WARNING(log, "Source replica does not have part {}. Removing it from ZooKeeper.", part);
}

{
Expand All @@ -2542,11 +2557,14 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo

for (const auto & part : local_active_parts)
{
if (get_part_set.getContainingPart(part->name).empty())
{
parts_to_remove_from_working_set.emplace_back(part);
LOG_WARNING(log, "Source replica does not have part {}. Removing it from working set.", part->name);
}
if (get_part_set.getContainingPart(part->name) == part->name)
continue;

if (exact_part_names.contains(part->name))
continue;

parts_to_remove_from_working_set.emplace_back(part);
LOG_WARNING(log, "Source replica does not have part {}. Removing it from working set.", part->name);
}

if (getSettings()->detach_old_local_parts_when_cloning_replica)
Expand Down Expand Up @@ -5128,7 +5146,7 @@ void StorageReplicatedMergeTree::restoreMetadataInZooKeeper()
if (part->getState() == DataPartState::Active)
active_parts_names.push_back(part->name);

forgetPartAndMoveToDetached(part);
forcefullyMovePartToDetachedAndRemoveFromMemory(part);
}

LOG_INFO(log, "Moved all parts to detached/");
Expand Down
4 changes: 2 additions & 2 deletions tests/clickhouse-test
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ class TestCase:
and (proc.stderr is None)
and (proc.stdout is None or "Exception" not in proc.stdout)
)
need_drop_database = not maybe_passed
need_drop_database = maybe_passed

debug_log = ""
if os.path.exists(self.testcase_args.debug_log_file):
Expand Down Expand Up @@ -1971,7 +1971,7 @@ if __name__ == "__main__":
parser.add_argument(
"--no-drop-if-fail",
action="store_true",
help="Do not drop database for test if test has failed",
help="Do not drop database for test if test has failed (does not work if reference file mismatch)",
)
parser.add_argument(
"--hide-db-name",
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/test_broken_part_during_merge/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def test_merge_and_part_corruption(started_cluster):
node1.query(
"""
CREATE TABLE replicated_mt(date Date, id UInt32, value Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_mt', '{replica}') ORDER BY id;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_mt', '{replica}') ORDER BY id
SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1;
""".format(
replica=node1.name
)
Expand Down
26 changes: 16 additions & 10 deletions tests/integration/test_lost_part/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def remove_part_from_disk(node, table, part_name):
def test_lost_part_same_replica(start_cluster):
for node in [node1, node2]:
node.query(
"CREATE TABLE mt0 (id UInt64, date Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{}') ORDER BY tuple() PARTITION BY date".format(
"CREATE TABLE mt0 (id UInt64, date Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{}') ORDER BY tuple() PARTITION BY date "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1".format(
node.name
)
)
Expand Down Expand Up @@ -73,7 +74,7 @@ def test_lost_part_same_replica(start_cluster):
node1.query("ATTACH TABLE mt0")

node1.query("SYSTEM START MERGES mt0")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt0")
res, err = node1.query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt0")
print("result: ", res)
print("error: ", res)

Expand Down Expand Up @@ -104,7 +105,8 @@ def test_lost_part_same_replica(start_cluster):
def test_lost_part_other_replica(start_cluster):
for node in [node1, node2]:
node.query(
"CREATE TABLE mt1 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', '{}') ORDER BY tuple()".format(
"CREATE TABLE mt1 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', '{}') ORDER BY tuple() "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1".format(
node.name
)
)
Expand Down Expand Up @@ -136,7 +138,7 @@ def test_lost_part_other_replica(start_cluster):
node1.query("CHECK TABLE mt1")

node2.query("SYSTEM START REPLICATION QUEUES")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt1")
res, err = node1.query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt1")
print("result: ", res)
print("error: ", res)

Expand Down Expand Up @@ -168,7 +170,8 @@ def test_lost_part_other_replica(start_cluster):
def test_lost_part_mutation(start_cluster):
for node in [node1, node2]:
node.query(
"CREATE TABLE mt2 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t2', '{}') ORDER BY tuple()".format(
"CREATE TABLE mt2 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t2', '{}') ORDER BY tuple() "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1".format(
node.name
)
)
Expand Down Expand Up @@ -196,7 +199,7 @@ def test_lost_part_mutation(start_cluster):
node1.query("CHECK TABLE mt2")

node1.query("SYSTEM START MERGES mt2")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt2")
res, err = node1.query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt2")
print("result: ", res)
print("error: ", res)

Expand Down Expand Up @@ -225,7 +228,9 @@ def test_lost_last_part(start_cluster):
for node in [node1, node2]:
node.query(
"CREATE TABLE mt3 (id UInt64, p String) ENGINE ReplicatedMergeTree('/clickhouse/tables/t3', '{}') "
"ORDER BY tuple() PARTITION BY p".format(node.name)
"ORDER BY tuple() PARTITION BY p SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1".format(
node.name
)
)

node1.query("SYSTEM STOP MERGES mt3")
Expand All @@ -246,9 +251,6 @@ def test_lost_last_part(start_cluster):
node1.query("CHECK TABLE mt3")

node1.query("SYSTEM START MERGES mt3")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt3")
print("result: ", res)
print("error: ", res)

for i in range(10):
result = node1.query("SELECT count() FROM system.replication_queue")
Expand All @@ -259,6 +261,10 @@ def test_lost_last_part(start_cluster):
"DROP/DETACH PARTITION"
):
break
if node1.contains_in_log(
"Created empty part 8b8f0fede53df97513a9fb4cb19dc1e4_0_0_0 "
):
break
time.sleep(1)
else:
assert False, "Don't have required messages in node1 log"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Sum before DETACH PARTITION:
Sum after DETACH PARTITION:
0
system.detached_parts after DETACH PARTITION:
default not_partitioned all all_1_2_1 default 1 2 1
default not_partitioned all all_1_2_1 1 2 1
*** Partitioned by week ***
Parts before OPTIMIZE:
1999-12-27 19991227_1_1_0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- Tags: no-s3-storage
SELECT '*** Not partitioned ***';

DROP TABLE IF EXISTS not_partitioned;
Expand All @@ -19,7 +18,7 @@ ALTER TABLE not_partitioned DETACH PARTITION ID 'all';
SELECT 'Sum after DETACH PARTITION:';
SELECT sum(x) FROM not_partitioned;
SELECT 'system.detached_parts after DETACH PARTITION:';
SELECT * FROM system.detached_parts WHERE database = currentDatabase() AND table = 'not_partitioned';
SELECT system.detached_parts.* EXCEPT disk FROM system.detached_parts WHERE database = currentDatabase() AND table = 'not_partitioned';

DROP TABLE not_partitioned;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env bash
# Tags: long, no-s3-storage
# no-s3 because read FileOpen metric

set -e

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
-- Tags: no-parallel, no-s3-storage
-- With s3 policy TTL TO DISK 'default' doesn't work (because we have no default, only 's3')

drop table if exists ttl;
set mutations_sync = 2;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env bash
# Tags: long, no-replicated-database, no-s3-storage
# Tags: long, no-replicated-database
# Tag no-replicated-database: Fails due to additional replicas or shards
# Tag no-s3-storage: Merge assigned to replica 2, but replication queues are stopped for it


set -e
Expand Down
2 changes: 1 addition & 1 deletion tests/queries/0_stateless/01533_multiple_nested.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Tags: no-s3-storage
-- Temporary supressed
-- no-s3 because read FileOpen metric
DROP TABLE IF EXISTS nested;

SET flatten_nested = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- Tags: no-parallel, no-fasttest, no-s3-storage
-- Tags: no-parallel, no-fasttest

DROP TABLE IF EXISTS t_s3_compressed_blocks;

Expand Down
11 changes: 11 additions & 0 deletions tests/queries/0_stateless/02448_clone_replica_lost_part.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
1 [2,3,4,5]
2 [1,2,3,4,5]
3 [1,2,3,4,5]
4 [3,4,5]
5 [1,2,3,4,5]
6 [1,2,3,4,5]
7 [1,2,3,4,5,20,30,40,50]
8 [1,2,3,4,5,10,20,30,40,50]
9 [1,2,3,4,5,10,20,30,40,50]
11 [1,2,3,4,5,10,20,30,40,50,100,300,400,500,600]
12 [1,2,3,4,5,10,20,30,40,50,100,300,400,500,600]
Loading