Skip to content

Commit

Permalink
fix the lock release when finishing a job (#3039)
Browse files Browse the repository at this point in the history
  • Loading branch information
severo authored Aug 22, 2024
1 parent 3ab176b commit 9e221e0
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 20 deletions.
9 changes: 4 additions & 5 deletions libs/libcommon/src/libcommon/queue/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
)
from libcommon.dtos import FlatJobInfo, JobInfo, Priority, Status, WorkerSize
from libcommon.queue.dataset_blockages import DATASET_STATUS_BLOCKED, DATASET_STATUS_NORMAL, get_blocked_datasets
from libcommon.queue.lock import lock, release_lock, release_locks
from libcommon.queue.lock import lock, release_lock
from libcommon.queue.metrics import (
decrease_metric,
decrease_worker_size_metrics,
Expand Down Expand Up @@ -147,7 +147,7 @@ class JobDocument(Document):
config (`str`, *optional*): The config on which to apply the job.
split (`str`, *optional*): The split on which to apply the job.
unicity_id (`str`): A string that identifies the job uniquely. Only one job with the same unicity_id can be in
the started state. The revision is not part of the unicity_id.
the started state.
namespace (`str`): The dataset namespace (user or organization) if any, else the dataset name (canonical name).
priority (`Priority`, *optional*): The priority of the job. Defaults to Priority.LOW.
status (`Status`, *optional*): The status of the job. Defaults to Status.WAITING.
Expand Down Expand Up @@ -260,7 +260,7 @@ class Queue:
the jobs. You can create multiple Queue objects, it has no effect on the database.
It's a FIFO queue, with the following properties:
- a job is identified by its input arguments: unicity_id (type, dataset, config and split, NOT revision)
- a job is identified by its input arguments: unicity_id (type, dataset, config and split, revision)
- a job can be in one of the following states: waiting, started
- a job can be in the queue only once (unicity_id) in the "started" state
- a job can be in the queue multiple times in the other states
Expand Down Expand Up @@ -698,8 +698,7 @@ def finish_job(self, job_id: str) -> Optional[Priority]:
)
job_priority = job.priority
job.delete()
release_locks(owner=job_id)
# ^ bug: the lock owner is not set to the job id anymore when calling start_job()!
release_lock(key=job.unicity_id)
if was_blocked:
pending_jobs = self.get_pending_jobs_df(dataset=job.dataset)
for _, pending_job in pending_jobs.iterrows():
Expand Down
15 changes: 0 additions & 15 deletions libs/libcommon/src/libcommon/queue/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,21 +180,6 @@ def git_branch(
return cls(key=key, owner=owner, sleeps=sleeps, ttl=_TTL.LOCK_TTL_SECONDS_TO_WRITE_ON_GIT_BRANCH)


def release_locks(owner: str) -> None:
"""
Release all locks owned by the given owner
Args:
owner (`str`): the current owner that holds the locks
"""
Lock.objects(owner=owner).update(
write_concern={"w": "majority", "fsync": True},
read_concern={"level": "majority"},
owner=None,
updated_at=get_datetime(),
)


def release_lock(key: str) -> None:
"""
Release the lock for a specific key
Expand Down

0 comments on commit 9e221e0

Please sign in to comment.