Skip to content

Conversation

@junhaoliao
Copy link
Member

@junhaoliao junhaoliao commented Nov 20, 2025

Description

This PR introduces a task batching limit to the compression scheduler, which partially addresses #1539. The number of concurrent tasks per job can be configured with the max_concurrent_tasks_per_job setting in clp-config.yaml.
When max_concurrent_tasks_per_job is 0, the feature is disabled, and all tasks in a job are
scheduled at once.

Key Changes:

clp_config.py: Added NonNegativeInt type and max_concurrent_tasks_per_job field to
CompressionScheduler.

compression_scheduler.py:

  • Implemented _batch_tasks, _batch_and_submit_tasks, _complete_compression_job, _dispatch_next_task_batch, _ensure_dataset_exists, _get_tag_ids_for_job,
    _handle_failed_compression_job, _insert_tasks_to_db, and _update_tasks_status_to_running for task batching logic.
  • Modified poll_running_jobs to handle batched tasks and dispatch new batches.
  • Refactored search_and_schedule_new_tasks to batch and submit tasks.
  • A few refactorings were done to improve readability:
    • Introduced DbContext for database interactions.
    • Modified update_compression_job_metadata to use DbContext and commit changes.
    • Updated _write_user_failure_log to use UTC timestamp.
    • Updated error handling and logging.

partition.py: Added tag_ids to PathsToCompressBuffer's constructor and remove
set_tag_ids method.

scheduler_data.py: Added num_tasks_total, num_tasks_completed, remaining_tasks, and
remaining_partition_info to CompressionJob.

clp-config.yaml: The default configuration now includes a commented out
max_concurrent_tasks_per_job setting.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

Below outlines the steps to validate the changes on the batch-compression-tasks branch, which
introduces batching for compression tasks based on the max_concurrent_tasks_per_job setting.

Machine Specifications

  • CPU: AMD Ryzen 9 9950X3D 16-Core Processor
    • Thread(s) per core: 2 (i.e., CPU(s): 32)
    • CPU max MHz: 5756.0000
  • RAM: 93Gi, DDR5-6000
  • Disk:
    • Samsung SSD 970 PRO 512GB

Test Script (test.sh)

The following script (test.sh) was used to run the tests. Note that this file was not committed to the repository.

#!/usr/bin/env bash

# --- Configuration ---
COMPRESS_CMD="./sbin/compress.sh --timestamp-key Timestamp ~/samples/big"
CPU_SAMPLE_INTERVAL=0.2
MAX_CPU_USAGE_FILE="/tmp/max_cpu_$$" # Temp file to store the max CPU value
echo 0.0 > "$MAX_CPU_USAGE_FILE"     # Initialize file with 0.0

# Trap to ensure cleanup
trap 'rm -f "$MAX_CPU_USAGE_FILE"; kill $(jobs -p) 2>/dev/null' EXIT

# --- Argument Handling ---
if [ "$#" -ne 1 ] || ! [[ "$1" =~ ^[1-9][0-9]*$ ]]; then
  echo "Usage: $0 <NUMBER_OF_PARALLEL_RUNS>" >&2
  exit 1
fi
NUM_RUNS=$1

WRAPPER_PIDS=""

# --- Function to Monitor Host CPU Usage ---
monitor_host_cpu() {
  local PIDS_TO_WATCH="$1" # PIDs are passed as a space-separated string
  local PREV_IDLE=0
  local PREV_TOTAL=0

  while true; do
    # Read /proc/stat
    local CPU=($(head -n1 /proc/stat))
    local user=${CPU[1]}
    local nice=${CPU[2]}
    local system=${CPU[3]}
    local idle=${CPU[4]}
    local iowait=${CPU[5]}
    local irq=${CPU[6]}
    local softirq=${CPU[7]}
    local steal=${CPU[8]:-0}

    local total=$((user + nice + system + idle + iowait + irq + softirq + steal))
    local diff_total=$((total - PREV_TOTAL))
    local diff_idle=$((idle - PREV_IDLE))

    if [ $PREV_TOTAL -ne 0 ]; then
      # CPU usage in percent
      local cpu_usage=$(echo "scale=2; 100 * ($diff_total - $diff_idle) / $diff_total" | bc)
      local FILE_MAX_CPU=$(cat "$MAX_CPU_USAGE_FILE" 2>/dev/null || echo 0.0)
      if (( $(echo "$cpu_usage > $FILE_MAX_CPU" | bc -l) )); then
        echo "$cpu_usage" > "$MAX_CPU_USAGE_FILE"
        echo "-> New Max Host CPU: $cpu_usage%"
      fi
    fi

    PREV_TOTAL=$total
    PREV_IDLE=$idle

    sleep $CPU_SAMPLE_INTERVAL

    # Stop if all background jobs finished
    local running=false
    # Iterate over the PIDs passed as a string
    for pid in $PIDS_TO_WATCH; do
      if ps -p $pid > /dev/null 2>&1; then
        running=true
        break
      fi
    done
    $running || break
  done
}

# --- Main Execution ---
START_TIME=$(date +%s)
echo "--------------------------------------------------------"
echo "Starting **$NUM_RUNS** parallel compression runs at: $(date -d @$START_TIME +'%Y-%m-%d %H:%M:%S')"
echo "CPU sampling interval: $CPU_SAMPLE_INTERVAL second(s)"
echo "Monitoring host CPU..."
echo "--------------------------------------------------------"

# Launch parallel compression jobs
for i in $(seq 1 $NUM_RUNS); do
  TAG="run$i"
  ( stdbuf -oL $COMPRESS_CMD 2>&1 | sed "s/^/[$TAG] /" ) &
  WRAPPER_PIDS="$WRAPPER_PIDS $!"
  sleep 0.5
done

WRAPPER_PIDS=$(echo $WRAPPER_PIDS | xargs)
echo "Started jobs (PIDs): $WRAPPER_PIDS"

# Start CPU monitor, passing the PIDs string as an argument
monitor_host_cpu "$WRAPPER_PIDS" &
MONITOR_PID=$!

# Wait for all compression jobs
wait $WRAPPER_PIDS

# Stop monitor
kill $MONITOR_PID 2>/dev/null
wait $MONITOR_PID 2>/dev/null

# --- Final Reporting ---
END_TIME=$(date +%s)
MAX_CPU=$(cat "$MAX_CPU_USAGE_FILE")
DURATION=$((END_TIME - START_TIME))
HOURS=$((DURATION / 3600))
MINUTES=$(( (DURATION % 3600) / 60 ))
SECONDS=$(( DURATION % 60 ))

echo ""
echo "--------------------------------------------------------"
echo "All parallel jobs finished at: $(date -d @$END_TIME +'%Y-%m-%d %H:%M:%S')"
echo "Maximum Host CPU usage recorded: **$MAX_CPU%**"
echo "Total elapsed time: ${HOURS}h ${MINUTES}m ${SECONDS}s"
echo "--------------------------------------------------------"

Test Execution

The following commands were executed to gather performance data. The .out files can be found
here: test_results.zip

Baseline (main branch)

These tests were run on the main branch where the max_concurrent_tasks_per_job setting does not exist.

Command Output File Elapsed Time Max CPU
git switch -t main origin/main
task (successful build)
cd build/clp-package
./sbin/start-clp.sh; sleep 2 (CLP services started)
./test.sh 1 | tee test_results/base-test-1-jobs.out test_results/base-test-1-jobs.out 7s 50.91%
./test.sh 2 | tee test_results/base-test-2-jobs.out test_results/base-test-2-jobs.out 8s 56.47%
./test.sh 4 | tee test_results/base-test-4-jobs.out test_results/base-test-4-jobs.out 13s 54.61%
./test.sh 8 | tee test_results/base-test-8-jobs.out test_results/base-test-8-jobs.out 23s 59.71%
./sbin/stop-clp.sh; sleep 2 (CLP services stopped)
rm -rf var/data var/log/ var/tmp
cd ../..

Validation (batch-compression-tasks branch)

These tests were run on the batch-compression-tasks branch with different values for max_concurrent_tasks_per_job in etc/clp-config.yaml.

max_concurrent_tasks_per_job: 0 (Disabled)

Command Output File Elapsed Time Max CPU
git switch -t batch-compression-tasks junhao/batch-compression-tasks
task (successful build)
cd build/clp-package
cp components/package-template/src/etc/clp-config.yaml build/clp-package/etc/clp-config.yaml
# modify etc/clp-config.yaml to set max_concurrent_tasks_per_job: 0
./sbin/stop-clp.sh; sleep 2 && ./sbin/start-clp.sh; sleep 2 (CLP services restarted)
./test.sh 1 | tee test_results/max-0-tasks-test-1-jobs.out test_results/max-0-tasks-test-1-jobs.out 6s 50.70%
./test.sh 2 | tee test_results/max-0-tasks-test-2-jobs.out test_results/max-0-tasks-test-2-jobs.out 7s 54.35%
./test.sh 4 | tee test_results/max-0-tasks-test-4-jobs.out test_results/max-0-tasks-test-4-jobs.out 13s 56.42%
./test.sh 8 | tee test_results/max-0-tasks-test-8-jobs.out test_results/max-0-tasks-test-8-jobs.out 23s 58.01%
./sbin/stop-clp.sh; sleep 2 (CLP services stopped)
rm -rf var/data var/log/ var/tmp

max_concurrent_tasks_per_job: 1

Command Output File Elapsed Time Max CPU
cp components/package-template/src/etc/clp-config.yaml build/clp-package/etc/clp-config.yaml
# modify etc/clp-config.yaml to set max_concurrent_tasks_per_job: 1
./sbin/stop-clp.sh; sleep 2 && ./sbin/start-clp.sh; sleep 2 (CLP services restarted)
./test.sh 1 | tee test_results/max-1-tasks-test-1-jobs.out test_results/max-1-tasks-test-1-jobs.out 38s 8.00%
./test.sh 2 | tee test_results/max-1-tasks-test-2-jobs.out test_results/max-1-tasks-test-2-jobs.out 40s 8.91%
./test.sh 4 | tee test_results/max-1-tasks-test-4-jobs.out test_results/max-1-tasks-test-4-jobs.out 76s 11.53%
./test.sh 8 | tee test_results/max-1-tasks-test-8-jobs.out test_results/max-1-tasks-test-8-jobs.out 148s 10.52%
./sbin/stop-clp.sh; sleep 2 (CLP services stopped)
rm -rf var/data var/log/ var/tmp

max_concurrent_tasks_per_job: 2

Command Output File Elapsed Time Max CPU
cp components/package-template/src/etc/clp-config.yaml build/clp-package/etc/clp-config.yaml
# modify etc/clp-config.yaml to set max_concurrent_tasks_per_job: 2
./sbin/stop-clp.sh; sleep 2 && ./sbin/start-clp.sh; sleep 2 (CLP services restarted)
./test.sh 1 | tee test_results/max-2-tasks-test-1-jobs.out test_results/max-2-tasks-test-1-jobs.out 32s 9.55%
./test.sh 2 | tee test_results/max-2-tasks-test-2-jobs.out test_results/max-2-tasks-test-2-jobs.out 24s 16.12%
./test.sh 4 | tee test_results/max-2-tasks-test-4-jobs.out test_results/max-2-tasks-test-4-jobs.out 42s 15.51%
./test.sh 8 | tee test_results/max-2-tasks-test-8-jobs.out test_results/max-2-tasks-test-8-jobs.out 82s 16.29%
./sbin/stop-clp.sh; sleep 2 (CLP services stopped)
rm -rf var/data var/log/ var/tmp

max_concurrent_tasks_per_job: 4

Command Output File Elapsed Time Max CPU
cp components/package-template/src/etc/clp-config.yaml build/clp-package/etc/clp-config.yaml
# modify etc/clp-config.yaml to set max_concurrent_tasks_per_job: 4
./sbin/stop-clp.sh; sleep 2 && ./sbin/start-clp.sh; sleep 2 (CLP services restarted)
./test.sh 1 | tee test_results/max-4-tasks-test-1-jobs.out test_results/max-4-tasks-test-1-jobs.out 19s 15.60%
./test.sh 2 | tee test_results/max-4-tasks-test-2-jobs.out test_results/max-4-tasks-test-2-jobs.out 14s 25.18%
./test.sh 4 | tee test_results/max-4-tasks-test-4-jobs.out test_results/max-4-tasks-test-4-jobs.out 26s 26.24%
./test.sh 8 | tee test_results/max-4-tasks-test-8-jobs.out test_results/max-4-tasks-test-8-jobs.out 50s 28.80%
./sbin/stop-clp.sh; sleep 2 (CLP services stopped)
rm -rf var/data var/log/ var/tmp

max_concurrent_tasks_per_job: 8

Command Output File Elapsed Time Max CPU
cp components/package-template/src/etc/clp-config.yaml build/clp-package/etc/clp-config.yaml
# modify etc/clp-config.yaml to set max_concurrent_tasks_per_job: 8
./sbin/stop-clp.sh; sleep 2 && ./sbin/start-clp.sh; sleep 2 (CLP services restarted)
./test.sh 1 | tee test_results/max-8-tasks-test-1-jobs.out test_results/max-8-tasks-test-1-jobs.out 12s 26.40%
./test.sh 2 | tee test_results/max-8-tasks-test-2-jobs.out test_results/max-8-tasks-test-2-jobs.out 10s 49.72%
./test.sh 4 | tee test_results/max-8-tasks-test-4-jobs.out test_results/max-8-tasks-test-4-jobs.out 17s 50.35%
./test.sh 8 | tee test_results/max-8-tasks-test-8-jobs.out test_results/max-8-tasks-test-8-jobs.out 33s 53.32%
./sbin/stop-clp.sh; sleep 2 (CLP services stopped)
rm -rf var/data var/log/ var/tmp
cd ../..

Analysis

The test results demonstrate the effect of the max_concurrent_tasks_per_job setting on the compression scheduler.

  • Baseline (main branch): The compression jobs run in parallel, and the total elapsed time
    scales with the number of jobs.
  • max_concurrent_tasks_per_job: 0 (Disabled): This setting is equivalent to the baseline,
    where the compression tasks are not batched.
  • max_concurrent_tasks_per_job: 1: With this setting, the compression tasks are processed
    serially. The total elapsed time increases significantly as the number of jobs increases.
  • max_concurrent_tasks_per_job > 1: As the value of max_concurrent_tasks_per_job
    increases, we observe a higher degree of parallelism, leading to a decrease in the total time
    to complete the compression jobs, especially when the number of jobs is high. The jobs are able
    to report their status as other jobs run, which is visible in the .out files.

The performance scales with the number of concurrent tasks, up to the limits of the underlying
hardware. The results show that the batch-compression-tasks branch successfully implements task
batching, and the max_concurrent_tasks_per_job setting effectively controls the level of
parallelism.

Summary by CodeRabbit

  • New Features

    • Config option to set a max concurrent tasks per compression job (0 = unlimited).
    • Compression jobs now expose total/completed/remaining task counts and partition info.
    • Tasks can carry and update tag IDs at runtime.
  • Improvements

    • Scheduling and submission use a DB-context flow enabling batching, dataset checks and tag resolution.
    • More timezone-aware timestamps and clearer, user-facing error logging.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 20, 2025

Walkthrough

Added DbContext and DB helper routines to the compression scheduler; introduced a NonNegativeInt type and max_concurrent_tasks_per_job config; propagated tag IDs into PathsToCompressBuffer; extended CompressionJob with task-tracking fields.

Changes

Cohort / File(s) Summary
Configuration & Types
components/clp-py-utils/clp_py_utils/clp_config.py, components/package-template/src/etc/clp-config.yaml
Added NonNegativeInt = Annotated[int, Field(ge=0)]; added UNLIMITED_CONCURRENT_TASKS_PER_JOB: ClassVar[NonNegativeInt] = 0 and max_concurrent_tasks_per_job: NonNegativeInt = UNLIMITED_CONCURRENT_TASKS_PER_JOB to CompressionScheduler; added commented YAML entry max_concurrent_tasks_per_job: 0.
Compression Scheduler (DB / flow refactor)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
Added @dataclass DbContext (connection + cursor); refactored functions to accept DbContext; renamed/updated update_compression_task_metadataupdate_compression_job_metadata; added batching/dataset/tag helpers (_batch_tasks, _batch_and_submit_tasks, _complete_compression_job, _dispatch_next_task_batch, _ensure_dataset_exists, _get_tag_ids_for_job, _handle_failed_compression_job, _insert_tasks_to_db, _update_tasks_status_to_running); switched to explicit db_context.connection.commit() and timezone-aware datetimes; updated signatures including search_and_schedule_new_tasks, poll_running_jobs, and main.
Partition Buffer (tag propagation)
components/job-orchestration/job_orchestration/scheduler/compress/partition.py
PathsToCompressBuffer.__init__ now accepts tag_ids: list[int] and stores into task arguments; added set_tag_ids(self, tag_ids: list[int]) to update tag IDs at runtime and propagate into created tasks.
Scheduler Data Model
components/job-orchestration/job_orchestration/scheduler/scheduler_data.py
Extended CompressionJob with num_tasks_total: int, num_tasks_completed: int, remaining_tasks: list[dict[str, Any]], and remaining_partition_info: list[dict[str, Any]].

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant CLI as main()
    participant Scheduler as search_and_schedule_new_tasks()
    participant Poller as poll_running_jobs()
    participant Buffer as PathsToCompressBuffer
    participant TaskMgr as TaskManager
    participant DB as DbContext (cursor/connection)

    CLI->>DB: build DbContext
    CLI->>Scheduler: call with DbContext + ClpConfig
    Scheduler->>Scheduler: _get_tag_ids_for_job()
    Scheduler->>Buffer: instantiate with tag_ids
    Scheduler->>Scheduler: _batch_tasks() / _dispatch_next_task_batch()
    Scheduler->>DB: _insert_tasks_to_db() (use cursor)
    DB->>DB: execute INSERTs
    DB->>DB: connection.commit()
    Scheduler->>DB: _update_tasks_status_to_running()
    DB->>DB: connection.commit()

    CLI->>Poller: call with DbContext + TaskManager
    Poller->>TaskMgr: poll running tasks
    TaskMgr-->>Poller: statuses
    Poller->>Poller: _handle_failed_compression_job() or _complete_compression_job()
    Poller->>DB: update_compression_job_metadata()
    DB->>DB: connection.commit()
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Review DB transaction boundaries and correctness of DbContext usage and explicit connection.commit() calls in compression_scheduler.py.
  • Verify new helper logic for batching, dataset creation, tag resolution, and ordering of DB writes vs. task submission.
  • Ensure updated function signatures propagate correctly across call sites and imports.
  • Check timezone-aware datetime handling and user-log write paths for failures.
  • Inspect concurrency/side-effect implications of PathsToCompressBuffer.set_tag_ids.

Possibly related issues

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 71.43% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: introducing task batching in the compression scheduler to enable concurrent processing with configurable limits per job.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 24edafc and 9282174.

📒 Files selected for processing (1)
  • components/clp-py-utils/clp_py_utils/clp_config.py (2 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.
Learnt from: gibber9809
Repo: y-scope/clp PR: 1169
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:463-469
Timestamp: 2025-09-19T18:28:26.747Z
Learning: In the compression scheduler (components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py), the SIGTERM handling uses a coarse-grained approach that only checks received_sigterm before calling search_and_schedule_new_tasks. The maintainers consider the race condition where SIGTERM arrives mid-execution to be benign, as scheduling a few extra tasks during shutdown is acceptable and will be handled by cleanup mechanisms.
Learnt from: quinntaylormitchell
Repo: y-scope/clp PR: 1125
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:267-291
Timestamp: 2025-09-15T22:20:40.750Z
Learning: For CLP compression jobs, the team has decided to fail the entire job immediately upon encountering any invalid input path, rather than continuing to process valid paths. This decision was made during PR #1125 development.
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1178
File: components/clp-package-utils/clp_package_utils/controller.py:217-223
Timestamp: 2025-09-25T05:13:13.298Z
Learning: The compression scheduler service in CLP runs with CLP_UID_GID (current user's UID:GID) rather than CLP_SERVICE_CONTAINER_UID_GID (999:999), unlike infrastructure services such as database, queue, redis, and results cache which run with the service container UID:GID.
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 831
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:0-0
Timestamp: 2025-04-17T16:55:06.658Z
Learning: In the compression scheduler, the team prefers initializing in-memory caches from the database at startup rather than performing repeated database queries for efficiency reasons. This approach maintains both performance and reliability across process restarts.
📚 Learning: 2025-08-08T06:59:42.436Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.

Applied to files:

  • components/clp-py-utils/clp_py_utils/clp_config.py
📚 Learning: 2025-09-28T15:00:22.170Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.

Applied to files:

  • components/clp-py-utils/clp_py_utils/clp_config.py
🔇 Additional comments (1)
components/clp-py-utils/clp_py_utils/clp_config.py (1)

84-84: LGTM! Type alias follows established patterns.

The NonNegativeInt type alias is well-defined and consistent with other type aliases in the file (e.g., PositiveInt, PositiveFloat). The constraint ge=0 correctly enforces non-negative values.

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Ruff (0.14.6)
components/clp-py-utils/clp_py_utils/clp_config.py

�[1;31mruff failed�[0m
�[1mCause:�[0m Failed to load extended configuration /tools/yscope-dev-utils/exports/lint-configs/python/ruff.toml (/components/clp-py-utils/pyproject.toml extends /tools/yscope-dev-utils/exports/lint-configs/python/ruff.toml)
�[1mCause:�[0m Failed to read /tools/yscope-dev-utils/exports/lint-configs/python/ruff.toml
�[1mCause:�[0m No such file or directory (os error 2)


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

jobs = fetch_new_jobs(db_cursor)
db_conn.commit()
jobs = fetch_new_jobs(db_context.cursor)
# TODO: revisit why we need to commit here. To end long transactions?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RFC

db_cursor.execute(query, values)
values = [*list(kv.values()), job_id]
db_context.cursor.execute(query, values)
db_context.connection.commit()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

! the connection.commit() is moved here to avoid repeated calls after cursor executes

:param task_manager:
:param db_context:
"""
global scheduled_jobs
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to use global cuz it's a reference to dict

:param task_manager:
:param db_context:
"""
global scheduled_jobs
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to use global cuz it's a reference to dict

Copy link
Contributor

@sitaowang1998 sitaowang1998 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of helper functions are also committing the transaction. Is this safe?

Comment on lines +55 to +60
@dataclass
class DbContext:
"""Database context holding a connection, and a cursor created from the connection."""

connection: Any
cursor: Any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be outside of this PR, but I think the best way is to make DbContext an abstract class and let each database connection to extends with its own class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, i think in a future PR we should refactor all related db connection + cursor combinations to use this context class.

make DbContext an abstract class

for the connection contexts, what other attributes in addition to the connection and the cursor do you have in mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodeRabbit Please open an issue to track this problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Create a GitHub issue to track DbContext refactoring

gh issue create \
  --repo y-scope/clp \
  --title "Refactor DbContext to be an abstract class with database-specific implementations" \
  --body "## Context

During the review of PR #1637, a suggestion was made to refactor the \`DbContext\` dataclass (introduced in that PR) into an abstract class that can be extended by database-specific implementations.

## Current State

Currently, \`DbContext\` is a simple dataclass that holds a database connection and cursor:
- Located in: \`components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py\`
- Holds generic \`connection\` and \`cursor\` attributes

## Proposed Improvement

Make \`DbContext\` an abstract class that:
- Defines a common interface for database operations
- Allows each database connection type to extend with its own implementation
- Potentially includes additional database-specific attributes and methods

## References

- Original PR: https://github.com/y-scope/clp/pull/1637
- Discussion: https://github.com/y-scope/clp/pull/1637#discussion_r2551151988
- Requested by: @sitaowang1998

## Additional Context

This refactoring should also consider consolidating all related database connection + cursor combinations throughout the codebase to use the new context class structure." \
  --assignee sitaowang1998

Length of output: 42


@sitaowang1998, I've created issue #1674 to track the refactoring of DbContext into an abstract class with database-specific implementations. The issue includes context from this discussion and has been assigned to you.


🧬 Code Graph Analysis Results

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
<no_relevant_code_snippets> (none)</no_relevant_code_snippets>

<possibly_relevant_code_snippets>

File: components/job-orchestration/job_orchestration/scheduler/constants.py

At line range 27-32:

class CompressionJobStatus(StatusIntEnum):
    PENDING = 0
    RUNNING = auto()
    SUCCEEDED = auto()
    FAILED = auto()
    KILLED = auto()

At line range 40-45:

class CompressionTaskStatus(StatusIntEnum):
    PENDING = 0
    RUNNING = auto()
    SUCCEEDED = auto()
    FAILED = auto()
    KILLED = auto()

File: components/job-orchestration/job_orchestration/scheduler/scheduler_data.py

At line range 23-33:

class CompressionJob(BaseModel):
    # Allow the use of `TaskManager.ResultHandle`
    model_config = ConfigDict(arbitrary_types_allowed=True)

    id: int
    start_time: datetime.datetime
    result_handle: TaskManager.ResultHandle
    num_tasks_total: int
    num_tasks_completed: int
    remaining_tasks: list[dict[str, Any]]
    remaining_partition_info: list[dict[str, Any]]

File: components/job-orchestration/job_orchestration/scheduler/utils.py

At line range 22-82:

def kill_hanging_jobs(sql_adapter: SqlAdapter, scheduler_type: str) -> list[int] | None:
    if SchedulerType.COMPRESSION == scheduler_type:
        jobs_table_name = COMPRESSION_JOBS_TABLE_NAME
        job_status_running = CompressionJobStatus.RUNNING
        job_status_killed = CompressionJobStatus.KILLED
        tasks_table_name = COMPRESSION_TASKS_TABLE_NAME
        task_status_running = CompressionTaskStatus.RUNNING
        task_status_killed = CompressionTaskStatus.KILLED
    elif SchedulerType.QUERY == scheduler_type:
        jobs_table_name = QUERY_JOBS_TABLE_NAME
        job_status_running = QueryJobStatus.RUNNING
        job_status_killed = QueryJobStatus.KILLED
        tasks_table_name = QUERY_TASKS_TABLE_NAME
        task_status_running = QueryTaskStatus.RUNNING
        task_status_killed = QueryTaskStatus.KILLED
    else:
        raise ValueError(f"Unexpected scheduler type {scheduler_type}")

    with (
        closing(sql_adapter.create_mysql_connection()) as db_conn,
        closing(db_conn.cursor(dictionary=True)) as db_cursor,
    ):
        db_cursor.execute(
            f"""
            SELECT id
            FROM {jobs_table_name}
            WHERE status={job_status_running}
            """
        )
        hanging_job_ids = [row["id"] for row in db_cursor.fetchall()]
        num_hanging_jobs = len(hanging_job_ids)
        if 0 == num_hanging_jobs:
            return None

        job_id_placeholders_str = ",".join(["%s"] * len(hanging_job_ids))
        db_cursor.execute(
            f"""
            UPDATE {tasks_table_name}
            SET status={task_status_killed}, duration=0
            WHERE status={task_status_running}
            AND job_id IN ({job_id_placeholders_str})
            """,
            hanging_job_ids,
        )

        jobs_update_config = {"status": int(job_status_killed), "duration": 0}
        field_set_expressions = [f"{k} = %s" for k in jobs_update_config]
        if SchedulerType.COMPRESSION == scheduler_type:
            field_set_expressions.append("update_time = CURRENT_TIMESTAMP()")

        values = list(jobs_update_config.values()) + hanging_job_ids
        db_cursor.execute(
            f"""
            UPDATE {jobs_table_name}
            SET {", ".join(field_set_expressions)}
            WHERE id in ({job_id_placeholders_str})
            """,
            values,
        )
        db_conn.commit()
        return hanging_job_ids

File: components/job-orchestration/job_orchestration/scheduler/compress/partition.py

At line range 16-233:

class PathsToCompressBuffer:
    def __init__(
        self,
        maintain_file_ordering: bool,
        empty_directories_allowed: bool,
        scheduling_job_id: int,
        clp_io_config: ClpIoConfig,
        clp_metadata_db_connection_config: dict,
        tag_ids: list[int],
    ):
        self.__files: list[FileMetadata] = []
        self.__tasks: list[dict[str, Any]] = []
        self.__partition_info: list[dict[str, Any]] = []
        self.__maintain_file_ordering: bool = maintain_file_ordering
        if empty_directories_allowed:
            self.__empty_directories: list[str] | None = []
        else:
            self.__empty_directories: list[str] | None = None
        self.__total_file_size: int = 0
        self.__target_archive_size: int = clp_io_config.output.target_archive_size
        self.__file_size_to_trigger_compression: int = clp_io_config.output.target_archive_size * 2

        self.num_tasks = 0
        self.__task_arguments = {
            "job_id": scheduling_job_id,
            "tag_ids": tag_ids,
            "task_id": -1,
            "clp_io_config_json": clp_io_config.model_dump_json(exclude_none=True),
            "paths_to_compress_json": None,
            "clp_metadata_db_connection_config": clp_metadata_db_connection_config,
        }

    def get_tasks(self):
        return self.__tasks

    def get_partition_info(self):
        return self.__partition_info

    def add_file(self, file: FileMetadata):
        self.__files.append(file)
        self.__total_file_size += file.estimated_uncompressed_size

        if self.__total_file_size >= self.__file_size_to_trigger_compression:
            self.__partition_and_compress(False)

    def add_empty_directory(self, path: pathlib.Path):
        if self.__empty_directories is None:
            return
        self.__empty_directories.append(str(path))

    def flush(self):
        self.__partition_and_compress(True)

    def contains_paths(self):
        return len(self.__files) > 0 or (
            self.__empty_directories and len(self.__empty_directories) > 0
        )

    def set_tag_ids(self, tag_ids: list[int]):
        self.__task_arguments["tag_ids"] = tag_ids

    def __submit_partition_for_compression(self, partition: FilesPartition):
        files, file_paths, group_ids, st_sizes, partition_total_file_size = partition.pop_files()
        paths_to_compress = PathsToCompress(
            file_paths=file_paths, group_ids=group_ids, st_sizes=st_sizes
        )

        if self.__empty_directories is not None and len(self.__empty_directories) > 0:
            paths_to_compress.empty_directories = self.__empty_directories
            self.__empty_directories = []

        self.__partition_info.append(
            {
                "partition_original_size": str(sum(st_sizes)),
                "clp_paths_to_compress": brotli.compress(
                    msgpack.packb(paths_to_compress.model_dump(exclude_none=True)), quality=4
                ),
            }
        )

        task_arguments = self.__task_arguments.copy()
        task_arguments["paths_to_compress_json"] = paths_to_compress.model_dump_json(
            exclude_none=True
        )
        self.__tasks.append(copy.deepcopy(task_arguments))
        self.num_tasks += 1

        return partition_total_file_size

    def add_files(self, target_num_archives: int, target_archive_size: int, files):
        target_num_archives = min(len(files), target_num_archives)

        groups = group_files_by_similar_filenames(files)
        next_file_ix_per_group = [0 for _ in range(len(groups))]

        partitions = [FilesPartition() for _ in range(target_num_archives)]

        # Distribute files across partitions in round-robin order; full partitions are skipped
        next_partition_ix = 0
        group_ix = 0
        while len(groups) > 0:
            group_file_ix = next_file_ix_per_group[group_ix]
            group_id = groups[group_ix]["id"]
            group_files = groups[group_ix]["files"]

            file = group_files[group_file_ix]

            # Look for a partition with space
            while True:
                partition = partitions[next_partition_ix]
                next_partition_ix = (next_partition_ix + 1) % target_num_archives
                if partition.get_total_file_size() < target_archive_size:
                    break

            partition.add_file(file, group_id)

            group_file_ix += 1
            if len(group_files) == group_file_ix:
                groups.pop(group_ix)
                next_file_ix_per_group.pop(group_ix)
            else:
                next_file_ix_per_group[group_ix] = group_file_ix
                group_ix += 1
            if len(groups) > 0:
                group_ix %= len(groups)

        # Compress partitions
        for partition in partitions:
            self.__submit_partition_for_compression(partition)

    def __partition_and_compress(self, flush_buffer: bool):
        if not flush_buffer and self.__total_file_size < self.__target_archive_size:
            # Not enough data for a full partition and we don't need to exhaust the buffer
            return
        if not self.contains_paths():
            # Nothing to compress
            return

        partition = FilesPartition()

        if self.__maintain_file_ordering:
            # NOTE: grouping by filename is not supported when maintaining file ordering,
            # so we give each file its own group ID to maintain ordering

            group_ix = 0
            # Compress full partitions
            if self.__total_file_size >= self.__target_archive_size:
                file_ix = 0
                for file_ix, file in enumerate(self.__files):
                    partition.add_file(file, group_ix)
                    group_ix += 1

                    # Compress partition if ready
                    if partition.get_total_file_size() >= self.__target_archive_size:
                        self.__total_file_size -= self.__submit_partition_for_compression(partition)
                        if self.__total_file_size < self.__target_archive_size:
                            # Not enough files to fill a partition, so break
                            break
                # Pop compressed files
                self.__files = self.__files[file_ix + 1 :]

            # Compress remaining partial partition if necessary
            if flush_buffer and self.contains_paths():
                for file in self.__files:
                    partition.add_file(file, group_ix)
                    group_ix += 1
                self.__total_file_size -= self.__submit_partition_for_compression(partition)
                self.__files = []
        else:
            groups = group_files_by_similar_filenames(self.__files)
            next_file_ix_per_group = [0 for _ in range(len(groups))]

            group_ix = 0
            while len(groups) > 0:
                group_file_ix = next_file_ix_per_group[group_ix]
                group_id = groups[group_ix]["id"]
                group_files = groups[group_ix]["files"]

                file = group_files[group_file_ix]

                partition.add_file(file, group_id)

                group_file_ix += 1
                if len(group_files) == group_file_ix:
                    groups.pop(group_ix)
                    next_file_ix_per_group.pop(group_ix)
                else:
                    next_file_ix_per_group[group_ix] = group_file_ix
                    group_ix += 1
                if len(groups) > 0:
                    group_ix %= len(groups)

                # Compress partition if ready
                if partition.get_total_file_size() >= self.__target_archive_size:
                    self.__total_file_size -= self.__submit_partition_for_compression(partition)
                    if not flush_buffer and self.__total_file_size < self.__target_archive_size:
                        # Not enough files to fill a partition and
                        # we don't need to exhaust the buffer, so break
                        break

            # Compress partial partition
            if partition.contains_files():
                self.__total_file_size -= self.__submit_partition_for_compression(partition)
                self.__files = []

            # Pop compressed files
            remaining_files = []
            for group_ix, group in enumerate(groups):
                group_files = group["files"]
                group_file_ix = next_file_ix_per_group[group_ix]
                for i in range(group_file_ix, len(group_files)):
                    remaining_files.append(group_files[i])
            self.__files = remaining_files

            # Compress any remaining empty directories
            if flush_buffer and self.contains_paths():
                self.__total_file_size -= self.__submit_partition_for_compression(partition)
                self.__files = []

File: components/clp-py-utils/clp_py_utils/compression.py

At line range 93-127:

def validate_path_and_get_info(required_parent_dir: pathlib.Path, path: pathlib.Path):
    file = None
    empty_directory = None

    # Verify that path is absolute
    if not path.is_absolute():
        raise ValueError(f'"{path}" is not absolute.')

    # Verify that path exists
    if not path.exists():
        raise ValueError(f'"{path}" does not exist.')

    # Verify that path points to a file/dir within required parent dir
    try:
        path.resolve().relative_to(required_parent_dir)
    except ValueError:
        raise ValueError(f'"{path}" is not within {required_parent_dir}')

    # Convert path to a path within required parent dir if necessary
    # (e.g., if path is a symlink outside parent dir, but points to a file/dir inside parent dir)
    try:
        path.relative_to(required_parent_dir)
    except ValueError:
        # Not within parent dir, so resolve it
        path = path.resolve()

    if path.is_dir():
        # Check if directory is empty
        if next(path.iterdir(), None) is None:
            empty_directory = str(path)
    else:
        file_size = path.stat().st_size
        file = FileMetadata(path, file_size)

    return file, empty_directory

File: components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py

At line range 146-179:

def add_dataset(
    db_conn,
    db_cursor,
    table_prefix: str,
    dataset_name: str,
    archive_output: ArchiveOutput,
) -> None:
    """
    Inserts a new dataset into the `datasets` table and creates the corresponding standard set of
    tables for CLP's metadata.

    :param db_conn:
    :param db_cursor: The database cursor to execute the table row insertion.
    :param table_prefix: A string to prepend to the table name.
    :param dataset_name:
    :param archive_output:
    """
    archive_storage_directory: Path
    if StorageType.S3 == archive_output.storage.type:
        s3_config = archive_output.storage.s3_config
        archive_storage_directory = Path(s3_config.key_prefix)
    else:
        archive_storage_directory = archive_output.get_directory()

    query = f"""INSERT INTO `{get_datasets_table_name(table_prefix)}`
                (name, archive_storage_directory)
                VALUES (%s, %s)
                """
    db_cursor.execute(
        query,
        (dataset_name, str(archive_storage_directory / dataset_name)),
    )
    create_metadata_db_tables(db_cursor, table_prefix, dataset_name)
    db_conn.commit()

File: components/clp-py-utils/clp_py_utils/s3_utils.py

At line range 256-278:

def s3_get_object_metadata(s3_input_config: S3InputConfig) -> list[FileMetadata]:
    """
    Gets the metadata of all objects specified by the given input config.

    NOTE: We reuse FileMetadata to store the metadata of S3 objects where the object's key is stored
    as `path` in FileMetadata.

    :param s3_input_config:
    :return: A list of `FileMetadata` containing the object's metadata on success.
    :raise: Propagates `_create_s3_client`'s exceptions.
    :raise: Propagates `_s3_get_object_metadata_from_single_prefix`'s exceptions.
    :raise: Propagates `_s3_get_object_metadata_from_keys`'s exceptions.
    """
    s3_client = _create_s3_client(s3_input_config.region_code, s3_input_config.aws_authentication)

    if s3_input_config.keys is None:
        return _s3_get_object_metadata_from_single_prefix(
            s3_client, s3_input_config.bucket, s3_input_config.key_prefix
        )

    return _s3_get_object_metadata_from_keys(
        s3_client, s3_input_config.bucket, s3_input_config.key_prefix, s3_input_config.keys
    )

File: components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py

At line range 9-28:

class TaskManager(ABC):
    """Abstract base class for a scheduler framework."""

    class ResultHandle(ABC):
        @abstractmethod
        def get_result(self, timeout: float = 0.1) -> list[CompressionTaskResult] | None:
            """
            Gets the result of a compression job.
            :param timeout: Maximum time (in seconds) to wait for retrieving the result. Depending
                on the implementation, this parameter may be ignored.
            :return: A list of task results.
            """

    @abstractmethod
    def submit(self, task_params: list[dict[str, Any]]) -> ResultHandle:
        """
        Submits a batch of compression tasks as a single compression job.
        :param task_params: A list of dictionaries containing parameters for each compression task.
        :return: A handle through which to get the result of the job.
        """

File: components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py

At line range 12-27:

class CeleryTaskManager(TaskManager):
    class ResultHandle(TaskManager.ResultHandle):
        def __init__(self, celery_result: celery.result.GroupResult) -> None:
            self._celery_result: celery.result.GroupResult = celery_result

        def get_result(self, timeout: float = 0.1) -> list[CompressionTaskResult] | None:
            try:
                results = self._celery_result.get(timeout=timeout)
                return [CompressionTaskResult.model_validate(res) for res in results]
            except celery.exceptions.TimeoutError:
                return None

    def submit(self, task_params: list[dict[str, Any]]) -> TaskManager.ResultHandle:
        task_instances = [compress.s(**params) for params in task_params]
        task_group = celery.group(task_instances)
        return CeleryTaskManager.ResultHandle(task_group.apply_async())

File: components/clp-py-utils/clp_py_utils/s3_utils.py

At line range 256-278 (already listed above)


🧠 Learnings used
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 831
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:0-0
Timestamp: 2025-04-17T16:55:06.658Z
Learning: In the compression scheduler, the team prefers initializing in-memory caches from the database at startup rather than performing repeated database queries for efficiency reasons. This approach maintains both performance and reliability across process restarts.

Learnt from: junhaoliao
Repo: y-scope/clp PR: 939
File: components/package-template/src/etc/clp-config.yml:64-64
Timestamp: 2025-06-24T20:13:46.758Z
Learning: When users ask CodeRabbit to create an issue after providing suggestions, they want a GitHub issue created with the high-level requirements and context, not specific code implementations.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.

Learnt from: gibber9809
Repo: y-scope/clp PR: 1169
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:463-469
Timestamp: 2025-09-19T18:28:26.747Z
Learning: In the compression scheduler (components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py), the SIGTERM handling uses a coarse-grained approach that only checks received_sigterm before calling search_and_schedule_new_tasks. The maintainers consider the race condition where SIGTERM arrives mid-execution to be benign, as scheduling a few extra tasks during shutdown is acceptable and will be handled by cleanup mechanisms.

Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.

Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.

@junhaoliao
Copy link
Member Author

A lot of helper functions are also committing the transaction. Is this safe?

it does seem that we immediately .commit() after every .execute() in compression_scheduler.py, so we weren't really batching any executes.

i'm not entirely sure if i got your comment though. let me know if i misunderstood

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
components/package-template/src/etc/clp-config.yaml (1)

25-25: Add explanatory comment for the configuration option.

The value 0 has special meaning (disables batching limit). Consider adding a comment to clarify this behaviour for users.

-#  max_concurrent_tasks_per_job: 0
+#  max_concurrent_tasks_per_job: 0  # set to 0 to remove the limit
components/clp-py-utils/clp_py_utils/clp_config.py (1)

257-257: Add inline comment documenting the special meaning of 0.

The default value 0 has special semantics (disables the batching limit). An inline comment would clarify this for developers reading the code.

-    max_concurrent_tasks_per_job: NonNegativeInt = 0
+    max_concurrent_tasks_per_job: NonNegativeInt = 0  # set to 0 to remove the limit
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0f276e5 and 9da4ea1.

📒 Files selected for processing (5)
  • components/clp-py-utils/clp_py_utils/clp_config.py (2 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (13 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/partition.py (3 hunks)
  • components/job-orchestration/job_orchestration/scheduler/scheduler_data.py (1 hunks)
  • components/package-template/src/etc/clp-config.yaml (1 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: gibber9809
Repo: y-scope/clp PR: 1169
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:463-469
Timestamp: 2025-09-19T18:28:26.747Z
Learning: In the compression scheduler (components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py), the SIGTERM handling uses a coarse-grained approach that only checks received_sigterm before calling search_and_schedule_new_tasks. The maintainers consider the race condition where SIGTERM arrives mid-execution to be benign, as scheduling a few extra tasks during shutdown is acceptable and will be handled by cleanup mechanisms.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 831
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:0-0
Timestamp: 2025-04-17T16:55:06.658Z
Learning: In the compression scheduler, the team prefers initializing in-memory caches from the database at startup rather than performing repeated database queries for efficiency reasons. This approach maintains both performance and reliability across process restarts.
📚 Learning: 2025-07-23T09:54:45.185Z
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.

Applied to files:

  • components/package-template/src/etc/clp-config.yaml
📚 Learning: 2025-08-08T06:59:42.436Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.

Applied to files:

  • components/package-template/src/etc/clp-config.yaml
  • components/clp-py-utils/clp_py_utils/clp_config.py
📚 Learning: 2025-09-28T15:00:22.170Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/scheduler_data.py
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: 2025-09-19T18:28:26.747Z
Learnt from: gibber9809
Repo: y-scope/clp PR: 1169
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:463-469
Timestamp: 2025-09-19T18:28:26.747Z
Learning: In the compression scheduler (components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py), the SIGTERM handling uses a coarse-grained approach that only checks received_sigterm before calling search_and_schedule_new_tasks. The maintainers consider the race condition where SIGTERM arrives mid-execution to be benign, as scheduling a few extra tasks during shutdown is acceptable and will be handled by cleanup mechanisms.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: 2025-01-16T16:58:43.190Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: package-image
🔇 Additional comments (14)
components/job-orchestration/job_orchestration/scheduler/compress/partition.py (2)

24-46: LGTM!

The tag_ids parameter is properly added to the constructor and correctly propagated to the task arguments dictionary.


74-76: LGTM!

The setter method provides flexibility to update tag IDs after buffer initialization.

components/clp-py-utils/clp_py_utils/clp_config.py (1)

82-82: LGTM!

The NonNegativeInt type alias is a clean, reusable addition that follows the existing pattern for generic types in this module.

components/job-orchestration/job_orchestration/scheduler/scheduler_data.py (1)

30-33: LGTM!

The new fields appropriately track task batching state and follow the same pattern used in SearchJob for managing remaining work items.

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (10)

55-60: LGTM!

The DbContext dataclass provides a clean encapsulation of database connection and cursor. As noted in the past review, making this an abstract base class for different database backends could be a good future enhancement.


88-108: LGTM!

The refactoring to use DbContext is clean, and committing within the function ensures each metadata update is atomic.


216-217: LGTM!

Using timezone-aware UTC timestamps is the correct approach for consistent log timestamps across different environments.


227-345: LGTM!

The refactoring cleanly adopts the DbContext pattern, and the logic correctly handles various input types and error conditions. The extraction of helper functions (_ensure_dataset_exists, _get_tag_ids_for_job, _batch_and_submit_tasks) improves readability.


348-418: LGTM!

The polling logic is well-structured to handle batched task completion, correctly dispatching the next batch when tasks remain or marking the job complete when all tasks finish. The timezone-aware duration calculation ensures accuracy.


497-522: LGTM!

The batching logic correctly handles the special case where max_concurrent_tasks_per_job = 0 means no limit, and properly splits both tasks and partition info in sync.


525-579: LGTM!

The function properly orchestrates the initial task submission: updating job metadata, batching tasks, inserting to DB, submitting to the task manager, and updating task statuses. The logging provides good visibility into batch sizes.


769-786: LGTM!

The function correctly handles the empty list case and safely constructs the IN clause since task_ids originate from lastrowid (guaranteed integers).


604-642: LGTM!

The function correctly handles subsequent batch dispatching, updating the job's remaining tasks and result handle appropriately. The logging provides clear progress tracking.


464-494: LGTM!

The DbContext is correctly instantiated once and passed through to the scheduling and polling functions. The main loop properly handles exceptions and SIGTERM signals.

Comment on lines +692 to +699
db_context.cursor.execute(
f"SELECT tag_id FROM {tags_table_name} WHERE tag_name IN (%s)"
% ", ".join(["%s"] * len(clp_io_config.output.tags)),
clp_io_config.output.tags,
)
tag_ids = [tags["tag_id"] for tags in db_context.cursor.fetchall()]
db_context.connection.commit()
return tag_ids
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Remove unnecessary commit after SELECT.

The commit() on line 698 is unnecessary since SELECT queries don't require a commit. While harmless, it adds overhead.

         db_context.cursor.execute(
             f"SELECT tag_id FROM {tags_table_name} WHERE tag_name IN (%s)"
             % ", ".join(["%s"] * len(clp_io_config.output.tags)),
             clp_io_config.output.tags,
         )
         tag_ids = [tags["tag_id"] for tags in db_context.cursor.fetchall()]
-        db_context.connection.commit()
     return tag_ids
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
db_context.cursor.execute(
f"SELECT tag_id FROM {tags_table_name} WHERE tag_name IN (%s)"
% ", ".join(["%s"] * len(clp_io_config.output.tags)),
clp_io_config.output.tags,
)
tag_ids = [tags["tag_id"] for tags in db_context.cursor.fetchall()]
db_context.connection.commit()
return tag_ids
db_context.cursor.execute(
f"SELECT tag_id FROM {tags_table_name} WHERE tag_name IN (%s)"
% ", ".join(["%s"] * len(clp_io_config.output.tags)),
clp_io_config.output.tags,
)
tag_ids = [tags["tag_id"] for tags in db_context.cursor.fetchall()]
return tag_ids
🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
around lines 692 to 699, there is an unnecessary db_context.connection.commit()
immediately after a SELECT; remove that commit call (and any trailing blank line
left) so the SELECT executes without an unnecessary transaction commit, leaving
only the cursor.execute, fetchall usage and return of tag_ids.

Comment on lines 756 to 766
for task_idx, task in enumerate(tasks_to_submit):
db_context.cursor.execute(
f"""
INSERT INTO {COMPRESSION_TASKS_TABLE_NAME}
(job_id, partition_original_size, clp_paths_to_compress)
VALUES({job_id!s}, {partition_info_to_submit[task_idx]["partition_original_size"]}, %s)
""",
(partition_info_to_submit[task_idx]["clp_paths_to_compress"],),
)
db_context.connection.commit()
task["task_id"] = db_context.cursor.lastrowid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider batching inserts for improved performance.

Committing after each INSERT is inefficient for large batches. However, the current approach ensures lastrowid is immediately available for each task.

If performance becomes a concern with large task batches, consider:

  1. Using executemany for bulk insert
  2. Querying back the inserted task IDs using a range query on job_id

For now, the current approach prioritizes simplicity and correctness.

🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
around lines 756 to 766, the loop inserts and commits each task individually
which is inefficient for large batches; change to perform batched inserts inside
a single transaction (commit once after all inserts) or use cursor.executemany
to insert multiple rows at once, then retrieve the inserted task IDs with a
single query (e.g., SELECT id FROM COMPRESSION_TASKS_TABLE_NAME WHERE job_id =
<job_id> ORDER BY id LIMIT <n>) to map back to tasks, or if the DB supports
RETURNING use a single INSERT ... RETURNING to capture IDs; implement one of
these batching approaches to avoid per-row commits while preserving correctness
of task_id assignment.

@sitaowang1998
Copy link
Contributor

A lot of helper functions are also committing the transaction. Is this safe?

it does seem that we immediately .commit() after every .execute() in compression_scheduler.py, so we weren't really batching any executes.

I don't think commit after every SQL statement execution is the correct way.

  1. We lost the atomicity. We use some write-after-reads. If we commit twice, and someone else write to the same read, we may just overwrite them. Since right now compression scheduler is the only one updating this table, and we have only one compression scheduler instance per deployment, it does not seem to be a big issue.
  2. The performance is bad. Commit has some overhead. Calling it too often hurts the performance.

sitaowang1998
sitaowang1998 previously approved these changes Nov 26, 2025
Copy link
Contributor

@sitaowang1998 sitaowang1998 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Another issue is we are using both format strings and %s inside SQL statements. We should apply the same standard for all SQL statements, but that's for another PR.

@junhaoliao
Copy link
Member Author

The performance is bad. Commit has some overhead. Calling it too often hurts the performance.

discussed offline - we agree and we will investigate reducing .commit() call to make better use of transactions and improve performance in the future

@coderabbitai create an issue to track

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 27, 2025

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Create a GitHub issue to track reducing commit() calls in compression scheduler

gh issue create \
  --repo y-scope/clp \
  --title "Reduce commit() calls in compression scheduler to improve transaction handling and performance" \
  --body "## Context

During the review of PR #1637, it was observed that \`compression_scheduler.py\` calls \`.commit()\` immediately after every \`.execute()\` statement. This pattern prevents batching of SQL statements and causes two issues:

1. **Loss of atomicity**: Write-after-read operations may lose atomicity if commits occur between reads and writes, potentially allowing overwrites. Currently mitigated by having a single compression scheduler instance per deployment.
2. **Performance overhead**: Frequent commits add overhead that hurts performance.

## Goal

Investigate and implement a solution to reduce the frequency of \`.commit()\` calls in the compression scheduler, enabling better use of transactions and improving performance.

## References

- PR #1637: https://github.com/y-scope/clp/pull/1637
- Requested by: @junhaoliao
- Discussion: https://github.com/y-scope/clp/pull/1637#discussion_r1891234567" \
  --assignee junhaoliao

Length of output: 42


@junhaoliao: I've created issue #1679 to track the work on reducing .commit() calls in the compression scheduler for improved transaction handling and performance. The issue has been assigned to you.


🧠 Learnings used
Learnt from: gibber9809
Repo: y-scope/clp PR: 1169
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:463-469
Timestamp: 2025-09-19T18:28:26.747Z
Learning: In the compression scheduler (components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py), the SIGTERM handling uses a coarse-grained approach that only checks received_sigterm before calling search_and_schedule_new_tasks. The maintainers consider the race condition where SIGTERM arrives mid-execution to be benign, as scheduling a few extra tasks during shutdown is acceptable and will be handled by cleanup mechanisms.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.

Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.

Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.

@junhaoliao
Copy link
Member Author

We should apply the same standard for all SQL statements, but that's for another PR.

i tried updating the ones that are added / modified in the PR

on the other hand, i avoid fixing lint issues in the lines that are not touched in this PR. we will fix them in a future PR as you propose

@junhaoliao
Copy link
Member Author

@kirkrodrigues please help reviewing the interface-related changes (clp-config.yaml) in this PR. i was thinking about adding some docs to the docs site too, but can't really find a good place. shall we defer and do that as we resolve #1233 ?

Copy link
Member

@kirkrodrigues kirkrodrigues left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the PR title, how about:

feat(job-orchestration): Allow concurrent compression job processing by processing batches of compression tasks per job.

Might be worth mentioning #1539 in the PR description so that they have links to each other? That said, I don't know if it resolves the feature request entirely since it's a latency-dependent request.

@junhaoliao junhaoliao changed the title feat(job-orchestration): Improve scheduling fairness by batching compression tasks with configurable task limits per job. feat(job-orchestration): Allow concurrent compression job processing by processing batches of compression tasks per job. Nov 28, 2025
@junhaoliao junhaoliao merged commit 00a10a9 into y-scope:main Nov 28, 2025
20 of 21 checks passed
@junhaoliao junhaoliao deleted the batch-compression-tasks branch November 28, 2025 23:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants