Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions libs/libqueue/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ include ../../tools/Common.mk

.PHONY: test
test:
MONGO_QUEUE_DATABASE="datasets_server_queue_test" poetry run python -m pytest -x tests
docker-compose -f tests/docker-compose.yml up -d --remove-orphans
MONGO_URL="mongodb://localhost:27020" MONGO_QUEUE_DATABASE="datasets_server_queue_test" poetry run python -m pytest -x tests
docker-compose -f tests/docker-compose.yml down

.PHONY: coverage
coverage:
MONGO_QUEUE_DATABASE="datasets_server_queue_test" poetry run python -m pytest -s --cov --cov-report xml:coverage.xml --cov-report=term tests
docker-compose -f tests/docker-compose.yml up -d --remove-orphans
MONGO_URL="mongodb://localhost:27020" MONGO_QUEUE_DATABASE="datasets_server_queue_test" poetry run python -m pytest -s --cov --cov-report xml:coverage.xml --cov-report=term tests
docker-compose -f tests/docker-compose.yml down
35 changes: 20 additions & 15 deletions libs/libqueue/src/libqueue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,28 @@ def get_finished(jobs: QuerySet[AnyJob]) -> QuerySet[AnyJob]:
return jobs(status__nin=[Status.WAITING, Status.STARTED])


def get_excluded_dataset_names(jobs: QuerySet[AnyJob], max_jobs_per_dataset: Optional[int] = None) -> List[str]:
if max_jobs_per_dataset is None:
return []
dataset_names = [job.dataset_name for job in jobs(status=Status.STARTED).only("dataset_name")]
return list(
{dataset_name for dataset_name in dataset_names if dataset_names.count(dataset_name) >= max_jobs_per_dataset}
)


def start_job(jobs: QuerySet[AnyJob], max_jobs_per_dataset: Optional[int] = None) -> AnyJob:
waiting_jobs = get_waiting(jobs).order_by("+created_at").no_cache()
excluded_dataset_names = get_excluded_dataset_names(jobs, max_jobs_per_dataset)
next_waiting_job = (
jobs(status=Status.WAITING, dataset_name__nin=excluded_dataset_names)
.order_by("+created_at")
.no_cache()
.first()
)
# ^ no_cache should generate a query on every iteration, which should solve concurrency issues between workers
for job in waiting_jobs:
if job.status is not Status.WAITING:
logger.warning(f"waiting job {job.to_id()} has a not the WAITING status. Ignoring it.")
continue
if job.started_at is not None:
logger.warning(f"waiting job {job.to_id()} has a non empty started_at field. Ignoring it.")
continue
if job.finished_at is not None:
logger.warning(f"waiting job {job.to_id()} has a non empty started_at field. Ignoring it.")
continue
if max_jobs_per_dataset is None or get_num_started_for_dataset(jobs, job.dataset_name) < max_jobs_per_dataset:
job.update(started_at=datetime.utcnow(), status=Status.STARTED)
return job
raise EmptyQueue(f"no job available (within the limit of {max_jobs_per_dataset} started jobs per dataset)")
if next_waiting_job is None:
raise EmptyQueue("no job available (within the limit of {max_jobs_per_dataset} started jobs per dataset)")
next_waiting_job.update(started_at=datetime.utcnow(), status=Status.STARTED)
return next_waiting_job


def get_dataset_job(max_jobs_per_dataset: Optional[int] = None) -> Tuple[str, str]:
Expand Down
10 changes: 10 additions & 0 deletions libs/libqueue/tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: "3.9"
services:
mongodb-test-libqueue:
image: mongo
volumes:
- mongo-test-libqueue:/data/db:rw
ports:
- 27020:27017
volumes:
mongo-test-libqueue:
19 changes: 19 additions & 0 deletions libs/libqueue/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
connect_to_queue,
finish_dataset_job,
get_dataset_job,
get_split_job,
is_dataset_in_queue,
is_split_in_queue,
)
Expand Down Expand Up @@ -50,6 +51,24 @@ def test_add_job() -> None:
finish_dataset_job(job_id, success=True)


def test_max_jobs_per_dataset() -> None:
add_split_job("dataset", "config", "split1")
add_split_job("dataset", "config", "split2")
add_split_job("dataset", "config", "split3")
_, dataset_name, config_name, split_name = get_split_job()
assert dataset_name == "dataset"
assert config_name == "config"
assert split_name == "split1"
with pytest.raises(EmptyQueue):
get_split_job(0)
with pytest.raises(EmptyQueue):
get_split_job(1)
_, dataset_name, config_name, split_name = get_split_job(2)
assert split_name == "split2"
with pytest.raises(EmptyQueue):
get_split_job(2)


def test_is_dataset_in_queue() -> None:
dataset_name = "test_dataset"
dataset_name_2 = "test_dataset_2"
Expand Down
51 changes: 26 additions & 25 deletions services/api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading