Skip to content

Commit 38a927a

Browse files
authored
chore: ensuring all imperfect blocks are compacted (#18860)
* chore: compact block limit check * add test * fix * fix
1 parent ebb242f commit 38a927a

File tree

5 files changed

+104
-23
lines changed

5 files changed

+104
-23
lines changed

src/query/settings/src/settings_default.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -950,8 +950,8 @@ impl DefaultSettings {
950950
range: Some(SettingRange::Numeric(0..=u64::MAX)),
951951
}),
952952
("compact_max_block_selection", DefaultSettingValue {
953-
value: UserSettingValue::UInt64(10000),
954-
desc: "Limits the maximum number of blocks that can be selected during a compact operation.",
953+
value: UserSettingValue::UInt64(1000),
954+
desc: "Limits the maximum number of imperfect blocks that can be selected during a compact operation.",
955955
mode: SettingMode::Both,
956956
scope: SettingScope::Both,
957957
range: Some(SettingRange::Numeric(2..=u64::MAX)),

src/query/storages/fuse/src/operations/common/generators/append_generator.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,14 +207,10 @@ impl SnapshotGenerator for AppendGenerator {
207207
// If imperfect_count is larger, SLIGHTLY increase the number of blocks
208208
// eligible for auto-compaction, this adjustment is intended to help reduce
209209
// fragmentation over time.
210-
//
211-
// To prevent the off-by-one mistake, we need to add 1 to it;
212-
// this way, the potentially previously left non-compacted segment will
213-
// also be included.
214210
let compact_num_block_hint = std::cmp::min(
215211
imperfect_count,
216212
(auto_compaction_imperfect_blocks_threshold as f64 * 1.5).ceil() as u64,
217-
) + 1;
213+
);
218214
info!("set compact_num_block_hint to {compact_num_block_hint }");
219215
self.ctx
220216
.set_compaction_num_block_hint(table_info.name.as_str(), compact_num_block_hint);

src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ impl BlockCompactMutator {
118118

119119
let mut segment_idx = 0;
120120
let mut is_end = false;
121+
let mut stop_after_next = false;
121122
let mut parts = Vec::new();
122123
let chunk_size = max_threads * 4;
123124
for chunk in segment_locations.chunks(chunk_size) {
@@ -154,10 +155,25 @@ impl BlockCompactMutator {
154155
checker.generate_part(segments, &mut parts);
155156
}
156157

157-
if checker.is_limit_reached(num_segment_limit, num_block_limit) {
158+
if stop_after_next {
158159
is_end = true;
159160
break;
160161
}
162+
163+
match checker.is_limit_reached(num_segment_limit, num_block_limit) {
164+
CompactLimitState::Continue => {}
165+
CompactLimitState::ReachedBlockLimit => {
166+
// When the block limit is reached, we allow one more iteration
167+
// to include the next segment in compaction.
168+
// This "+1" behavior ensures that previously un-compacted segments
169+
// near the boundary are not skipped due to strict block counting.
170+
stop_after_next = true;
171+
}
172+
CompactLimitState::ReachedSegmentLimit => {
173+
is_end = true;
174+
break;
175+
}
176+
}
161177
}
162178

163179
// Status.
@@ -303,14 +319,24 @@ impl BlockCompactMutator {
303319
}
304320
}
305321

322+
// CompactLimitState indicates the current compaction progress state.
323+
pub enum CompactLimitState {
324+
/// Continue collecting more segments and blocks.
325+
Continue,
326+
/// Hit the block threshold — take one more segment before stopping.
327+
ReachedBlockLimit,
328+
/// Hit the segment threshold — stop immediately.
329+
ReachedSegmentLimit,
330+
}
331+
306332
pub struct SegmentCompactChecker {
307333
thresholds: BlockThresholds,
308334
segments: Vec<(SegmentIndex, Arc<CompactSegmentInfo>)>,
309335
total_block_count: u64,
310336
cluster_key_id: Option<u32>,
311337

312338
compacted_segment_cnt: usize,
313-
compacted_block_cnt: u64,
339+
compacted_imperfect_block_cnt: u64,
314340
}
315341

316342
impl SegmentCompactChecker {
@@ -320,8 +346,8 @@ impl SegmentCompactChecker {
320346
total_block_count: 0,
321347
thresholds,
322348
cluster_key_id,
323-
compacted_block_cnt: 0,
324349
compacted_segment_cnt: 0,
350+
compacted_imperfect_block_cnt: 0,
325351
}
326352
}
327353

@@ -360,9 +386,9 @@ impl SegmentCompactChecker {
360386
}
361387

362388
self.compacted_segment_cnt += segments.len();
363-
self.compacted_block_cnt += segments
389+
self.compacted_imperfect_block_cnt += segments
364390
.iter()
365-
.map(|(_, info)| info.summary.block_count)
391+
.map(|(_, info)| info.summary.block_count - info.summary.perfect_block_count)
366392
.sum::<u64>();
367393
true
368394
}
@@ -415,15 +441,31 @@ impl SegmentCompactChecker {
415441
self.generate_part(final_segments, parts);
416442
}
417443

418-
pub fn is_limit_reached(&self, num_segment_limit: usize, num_block_limit: usize) -> bool {
419-
let residual_segment_cnt = self.segments.len();
420-
let residual_block_cnt: u64 = self
421-
.segments
422-
.iter()
423-
.map(|(_, info)| info.summary.block_count)
424-
.sum();
425-
self.compacted_segment_cnt + residual_segment_cnt >= num_segment_limit
426-
|| self.compacted_block_cnt + residual_block_cnt >= num_block_limit as u64
444+
/// Check if compaction limit is reached.
445+
pub fn is_limit_reached(
446+
&self,
447+
num_segment_limit: usize,
448+
num_block_limit: usize,
449+
) -> CompactLimitState {
450+
// Stop immediately if the number of compacted segments reaches limit
451+
if self.compacted_segment_cnt + self.segments.len() >= num_segment_limit {
452+
return CompactLimitState::ReachedSegmentLimit;
453+
}
454+
455+
// Count the total number of imperfect blocks (those that still need compaction).
456+
let compacted_imperfect_block_cnt =
457+
self.segments
458+
.iter()
459+
.fold(self.compacted_imperfect_block_cnt, |mut acc, (_, info)| {
460+
acc += info.summary.block_count - info.summary.perfect_block_count;
461+
acc
462+
});
463+
// If the imperfect block count exceeds the limit, signal "take one more".
464+
if compacted_imperfect_block_cnt >= num_block_limit as u64 {
465+
CompactLimitState::ReachedBlockLimit
466+
} else {
467+
CompactLimitState::Continue
468+
}
427469
}
428470
}
429471

src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use opendal::Operator;
5252

5353
use crate::io::MetaReaders;
5454
use crate::operations::common::BlockMetaIndex as BlockIndex;
55+
use crate::operations::mutation::mutator::block_compact_mutator::CompactLimitState;
5556
use crate::operations::mutation::SegmentCompactChecker;
5657
use crate::operations::BlockCompactMutator;
5758
use crate::operations::CompactLazyPartInfo;
@@ -413,17 +414,27 @@ impl ReclusterMutator {
413414
let mut parts = Vec::new();
414415
let mut checker =
415416
SegmentCompactChecker::new(self.block_thresholds, Some(self.cluster_key_id));
416-
417+
let mut stop_after_next = false;
417418
for (loc, compact_segment) in compact_segments.into_iter() {
418419
recluster_blocks_count += compact_segment.summary.block_count;
419420
let segments_vec = checker.add(loc.segment_idx, compact_segment);
420421
for segments in segments_vec {
421422
checker.generate_part(segments, &mut parts);
422423
}
423424

424-
if checker.is_limit_reached(num_segment_limit, num_block_limit) {
425+
if stop_after_next {
425426
break;
426427
}
428+
429+
match checker.is_limit_reached(num_segment_limit, num_block_limit) {
430+
CompactLimitState::Continue => {}
431+
CompactLimitState::ReachedBlockLimit => {
432+
stop_after_next = true;
433+
}
434+
CompactLimitState::ReachedSegmentLimit => {
435+
break;
436+
}
437+
}
427438
}
428439
// finalize the compaction.
429440
checker.finalize(&mut parts);

tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,3 +108,35 @@ select info:average_depth from clustering_information('i15760', 't1')
108108

109109
statement ok
110110
drop table t1 all;
111+
112+
#ISSUE 18859
113+
statement ok
114+
create table t2(a int) row_per_block=5;
115+
116+
statement ok
117+
insert into t2 select number from numbers(2);
118+
119+
statement ok
120+
insert into t2 select number from numbers(12);
121+
122+
statement ok
123+
insert into t2 select number from numbers(2);
124+
125+
query T
126+
select block_count, row_count from fuse_segment('i15760', 't2');
127+
----
128+
1 2
129+
2 12
130+
1 2
131+
132+
statement ok
133+
insert into t2 select number from numbers(2);
134+
135+
# after auto compaction
136+
query T
137+
select block_count, row_count from fuse_segment('i15760', 't2');
138+
----
139+
3 18
140+
141+
statement ok
142+
drop table t2 all;

0 commit comments

Comments
 (0)