From f5bb1c16d7d60d3b601fa4277786038337d65d9d Mon Sep 17 00:00:00 2001 From: Sergei Trifonov Date: Mon, 30 Oct 2023 09:04:38 +0100 Subject: [PATCH] Merge pull request #56030 from azat/disks/least_used-fix Fix incorrect free space accounting for least_used JBOD policy --- .../mergetree-family/mergetree.md | 17 ++--- src/Disks/StoragePolicy.cpp | 3 +- src/Disks/VolumeJBOD.cpp | 26 ++++++-- src/Disks/VolumeJBOD.h | 13 +++- .../config.d/storage_configuration.xml | 1 + .../test_jbod_load_balancing/test.py | 63 +++++++++++++++++++ 6 files changed, 107 insertions(+), 16 deletions(-) diff --git a/docs/en/engines/table-engines/mergetree-family/mergetree.md b/docs/en/engines/table-engines/mergetree-family/mergetree.md index 95efe42c7573..549c0f7f8e8f 100644 --- a/docs/en/engines/table-engines/mergetree-family/mergetree.md +++ b/docs/en/engines/table-engines/mergetree-family/mergetree.md @@ -797,14 +797,15 @@ Storage policies configuration markup: Tags: -- `policy_name_N` — Policy name. Policy names must be unique. -- `volume_name_N` — Volume name. Volume names must be unique. -- `disk` — a disk within a volume. -- `max_data_part_size_bytes` — the maximum size of a part that can be stored on any of the volume’s disks. If the a size of a merged part estimated to be bigger than `max_data_part_size_bytes` then this part will be written to a next volume. Basically this feature allows to keep new/small parts on a hot (SSD) volume and move them to a cold (HDD) volume when they reach large size. Do not use this setting if your policy has only one volume. -- `move_factor` — when the amount of available space gets lower than this factor, data automatically starts to move on the next volume if any (by default, 0.1). ClickHouse sorts existing parts by size from largest to smallest (in descending order) and selects parts with the total size that is sufficient to meet the `move_factor` condition. If the total size of all parts is insufficient, all parts will be moved. -- `prefer_not_to_merge` — Disables merging of data parts on this volume. When this setting is enabled, merging data on this volume is not allowed. This allows controlling how ClickHouse works with slow disks. -- `perform_ttl_move_on_insert` — Disables TTL move on data part INSERT. By default if we insert a data part that already expired by the TTL move rule it immediately goes to a volume/disk declared in move rule. This can significantly slowdown insert in case if destination volume/disk is slow (e.g. S3). -- `load_balancing` - Policy for disk balancing, `round_robin` or `least_used`. +- `policy_name_N` — Policy name. Policy names must be unique. +- `volume_name_N` — Volume name. Volume names must be unique. +- `disk` — a disk within a volume. +- `max_data_part_size_bytes` — the maximum size of a part that can be stored on any of the volume’s disks. If the a size of a merged part estimated to be bigger than `max_data_part_size_bytes` then this part will be written to a next volume. Basically this feature allows to keep new/small parts on a hot (SSD) volume and move them to a cold (HDD) volume when they reach large size. Do not use this setting if your policy has only one volume. +- `move_factor` — when the amount of available space gets lower than this factor, data automatically starts to move on the next volume if any (by default, 0.1). ClickHouse sorts existing parts by size from largest to smallest (in descending order) and selects parts with the total size that is sufficient to meet the `move_factor` condition. If the total size of all parts is insufficient, all parts will be moved. +- `prefer_not_to_merge` — Disables merging of data parts on this volume. When this setting is enabled, merging data on this volume is not allowed. This allows controlling how ClickHouse works with slow disks. +- `perform_ttl_move_on_insert` — Disables TTL move on data part INSERT. By default (if enabled) if we insert a data part that already expired by the TTL move rule it immediately goes to a volume/disk declared in move rule. This can significantly slowdown insert in case if destination volume/disk is slow (e.g. S3). If disabled then already expired data part is written into a default volume and then right after moved to TTL volume. +- `load_balancing` - Policy for disk balancing, `round_robin` or `least_used`. +- `least_used_ttl_ms` - Configure timeout (in milliseconds) for the updating available space on all disks (`0` - update always, `-1` - never update, default is `60000`). Note, if the disk can be used by ClickHouse only and is not subject to a online filesystem resize/shrink you can use `-1`, in all other cases it is not recommended, since eventually it will lead to incorrect space distribution. Cofiguration examples: diff --git a/src/Disks/StoragePolicy.cpp b/src/Disks/StoragePolicy.cpp index ec0f201b8014..9a3f3dd94220 100644 --- a/src/Disks/StoragePolicy.cpp +++ b/src/Disks/StoragePolicy.cpp @@ -70,7 +70,8 @@ StoragePolicy::StoragePolicy( /* max_data_part_size_= */ 0, /* are_merges_avoided_= */ false, /* perform_ttl_move_on_insert_= */ true, - VolumeLoadBalancing::ROUND_ROBIN); + VolumeLoadBalancing::ROUND_ROBIN, + /* least_used_ttl_ms_= */ 60'000); volumes.emplace_back(std::move(default_volume)); } diff --git a/src/Disks/VolumeJBOD.cpp b/src/Disks/VolumeJBOD.cpp index 64bd2619665f..f5bff460b4d6 100644 --- a/src/Disks/VolumeJBOD.cpp +++ b/src/Disks/VolumeJBOD.cpp @@ -68,6 +68,7 @@ VolumeJBOD::VolumeJBOD( perform_ttl_move_on_insert = config.getBool(config_prefix + ".perform_ttl_move_on_insert", true); are_merges_avoided = config.getBool(config_prefix + ".prefer_not_to_merge", false); + least_used_ttl_ms = config.getUInt64(config_prefix + ".least_used_ttl_ms", 60'000); } VolumeJBOD::VolumeJBOD(const VolumeJBOD & volume_jbod, @@ -93,6 +94,11 @@ DiskPtr VolumeJBOD::getDisk(size_t /* index */) const case VolumeLoadBalancing::LEAST_USED: { std::lock_guard lock(mutex); + if (!least_used_ttl_ms || least_used_update_watch.elapsedMilliseconds() >= least_used_ttl_ms) + { + disks_by_size = LeastUsedDisksQueue(disks.begin(), disks.end()); + least_used_update_watch.restart(); + } return disks_by_size.top().disk; } } @@ -127,11 +133,23 @@ ReservationPtr VolumeJBOD::reserve(UInt64 bytes) { std::lock_guard lock(mutex); - DiskWithSize disk = disks_by_size.top(); - disks_by_size.pop(); + ReservationPtr reservation; + if (!least_used_ttl_ms || least_used_update_watch.elapsedMilliseconds() >= least_used_ttl_ms) + { + disks_by_size = LeastUsedDisksQueue(disks.begin(), disks.end()); + least_used_update_watch.restart(); - ReservationPtr reservation = disk.reserve(bytes); - disks_by_size.push(disk); + DiskWithSize disk = disks_by_size.top(); + reservation = disk.reserve(bytes); + } + else + { + DiskWithSize disk = disks_by_size.top(); + disks_by_size.pop(); + + reservation = disk.reserve(bytes); + disks_by_size.push(disk); + } return reservation; } diff --git a/src/Disks/VolumeJBOD.h b/src/Disks/VolumeJBOD.h index 81da64c488d1..4afc1e909be8 100644 --- a/src/Disks/VolumeJBOD.h +++ b/src/Disks/VolumeJBOD.h @@ -4,6 +4,9 @@ #include #include +#include +#include +#include namespace DB @@ -22,9 +25,10 @@ using VolumesJBOD = std::vector; class VolumeJBOD : public IVolume { public: - VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_, bool are_merges_avoided_, bool perform_ttl_move_on_insert_, VolumeLoadBalancing load_balancing_) + VolumeJBOD(String name_, Disks disks_, UInt64 max_data_part_size_, bool are_merges_avoided_, bool perform_ttl_move_on_insert_, VolumeLoadBalancing load_balancing_, UInt64 least_used_ttl_ms_) : IVolume(name_, disks_, max_data_part_size_, perform_ttl_move_on_insert_, load_balancing_) , are_merges_avoided(are_merges_avoided_) + , least_used_ttl_ms(least_used_ttl_ms_) { } @@ -69,7 +73,7 @@ class VolumeJBOD : public IVolume DiskPtr disk; uint64_t free_size = 0; - DiskWithSize(DiskPtr disk_) + explicit DiskWithSize(DiskPtr disk_) : disk(disk_) , free_size(disk->getUnreservedSpace()) {} @@ -96,7 +100,10 @@ class VolumeJBOD : public IVolume /// Index of last used disk, for load_balancing=round_robin mutable std::atomic last_used = 0; /// Priority queue of disks sorted by size, for load_balancing=least_used - mutable std::priority_queue disks_by_size; + using LeastUsedDisksQueue = std::priority_queue; + mutable LeastUsedDisksQueue disks_by_size TSA_GUARDED_BY(mutex); + mutable Stopwatch least_used_update_watch TSA_GUARDED_BY(mutex); + UInt64 least_used_ttl_ms = 0; /// True if parts on this volume participate in merges according to START/STOP MERGES ON VOLUME. std::atomic> are_merges_avoided_user_override{std::nullopt}; diff --git a/tests/integration/test_jbod_load_balancing/configs/config.d/storage_configuration.xml b/tests/integration/test_jbod_load_balancing/configs/config.d/storage_configuration.xml index 529eb1bc0b51..5a47aab06f24 100644 --- a/tests/integration/test_jbod_load_balancing/configs/config.d/storage_configuration.xml +++ b/tests/integration/test_jbod_load_balancing/configs/config.d/storage_configuration.xml @@ -31,6 +31,7 @@ jbod3 least_used + 0 diff --git a/tests/integration/test_jbod_load_balancing/test.py b/tests/integration/test_jbod_load_balancing/test.py index 9c62d1bbdfca..204f9740cfde 100644 --- a/tests/integration/test_jbod_load_balancing/test.py +++ b/tests/integration/test_jbod_load_balancing/test.py @@ -134,3 +134,66 @@ def test_jbod_load_balancing_least_used_next_disk(start_cluster): ] finally: node.query("DROP TABLE IF EXISTS data_least_used_next_disk SYNC") + + +def test_jbod_load_balancing_least_used_detect_background_changes(start_cluster): + def get_parts_on_disks(): + parts = node.query( + """ + SELECT count(), disk_name + FROM system.parts + WHERE table = 'data_least_used_detect_background_changes' + GROUP BY disk_name + ORDER BY disk_name + """ + ) + parts = [l.split("\t") for l in parts.strip().split("\n")] + return parts + + try: + node.query( + """ + CREATE TABLE data_least_used_detect_background_changes (p UInt8) + ENGINE = MergeTree + ORDER BY tuple() + SETTINGS storage_policy = 'jbod_least_used'; + + SYSTEM STOP MERGES data_least_used_detect_background_changes; + """ + ) + + node.exec_in_container(["fallocate", "-l200M", "/jbod3/.test"]) + node.query( + """ + INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10); + INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10); + INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10); + INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10); + """ + ) + parts = get_parts_on_disks() + assert parts == [ + ["4", "jbod2"], + ] + + node.exec_in_container(["rm", "/jbod3/.test"]) + node.query( + """ + INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10); + INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10); + INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10); + INSERT INTO data_least_used_detect_background_changes SELECT * FROM numbers(10); + """ + ) + parts = get_parts_on_disks() + assert parts == [ + # previous INSERT + ["4", "jbod2"], + # this INSERT + ["4", "jbod3"], + ] + finally: + node.exec_in_container(["rm", "-f", "/jbod3/.test"]) + node.query( + "DROP TABLE IF EXISTS data_least_used_detect_background_changes SYNC" + )