From 9c3f346280efeaac7dcc88a197f2a47dbc7fef55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Tue, 25 Feb 2025 15:02:51 +0100 Subject: [PATCH] Allow several job runners to run in parallel (#2281) * Allow several job runners to run in parallel * Fix test * fix: job_runner --------- Co-authored-by: Carlos Quintana --- job_runner.py | 55 ++++++++++++++++++++++++++--------- tests/jobs/test_job_runner.py | 3 +- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/job_runner.py b/job_runner.py index 85b32ad0..9bdee06c 100644 --- a/job_runner.py +++ b/job_runner.py @@ -6,6 +6,7 @@ import time from typing import List, Optional import arrow +from sqlalchemy.orm.exc import ObjectDeletedError from sqlalchemy.sql.expression import or_, and_ from app import config @@ -23,6 +24,7 @@ from app.log import LOG from app.models import User, Job, BatchImport, Mailbox, CustomDomain, JobState from app.monitor_utils import send_version_event from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction +from events.event_sink import HttpEventSink from server import create_light_app @@ -308,16 +310,15 @@ def process_job(job: Job): elif job.name == config.JOB_SEND_EVENT_TO_WEBHOOK: send_job = SendEventToWebhookJob.create_from_job(job) if send_job: - send_job.run() + send_job.run(HttpEventSink()) else: LOG.e("Unknown job name %s", job.name) -def get_jobs_to_run() -> List[Job]: +def get_jobs_to_run(taken_before_time: arrow.Arrow) -> List[Job]: # Get jobs that match all conditions: # - Job.state == ready OR (Job.state == taken AND Job.taken_at < now - 30 mins AND Job.attempts < 5) # - Job.run_at is Null OR Job.run_at < now + 10 mins - taken_at_earliest = arrow.now().shift(minutes=-config.JOB_TAKEN_RETRY_WAIT_MINS) run_at_earliest = arrow.now().shift(minutes=+10) query = Job.filter( and_( @@ -325,7 +326,7 @@ def get_jobs_to_run() -> List[Job]: Job.state == JobState.ready.value, and_( Job.state == JobState.taken.value, - Job.taken_at < taken_at_earliest, + Job.taken_at < taken_before_time, Job.attempts < config.JOB_MAX_ATTEMPTS, ), ), @@ -335,23 +336,51 @@ def get_jobs_to_run() -> List[Job]: return query.all() +def take_job(job: Job, taken_before_time: arrow.Arrow) -> bool: + sql = """ + UPDATE job + SET + taken_at = :taken_time, + attempts = attempts + 1, + state = :taken_state + WHERE id = :job_id + AND (state = :ready_state OR (state=:taken_state AND taken_at < :taken_before_time)) + """ + args = { + "taken_time": arrow.now().datetime, + "job_id": job.id, + "ready_state": JobState.ready.value, + "taken_state": JobState.taken.value, + "taken_before_time": taken_before_time.datetime, + } + try: + res = Session.execute(sql, args) + Session.commit() + except ObjectDeletedError: + return False + + return res.rowcount > 0 + + if __name__ == "__main__": send_version_event("job_runner") while True: # wrap in an app context to benefit from app setup like database cleanup, sentry integration, etc with create_light_app().app_context(): - for job in get_jobs_to_run(): - LOG.d("Take job %s", job) + taken_before_time = arrow.now().shift( + minutes=-config.JOB_TAKEN_RETRY_WAIT_MINS + ) - # mark the job as taken, whether it will be executed successfully or not - job.taken = True - job.taken_at = arrow.now() - job.state = JobState.taken.value - job.attempts += 1 - Session.commit() + jobs_done = 0 + for job in get_jobs_to_run(taken_before_time): + if not take_job(job, taken_before_time): + continue + LOG.d("Take job %s", job) process_job(job) job.state = JobState.done.value Session.commit() + jobs_done += 1 - time.sleep(10) + if jobs_done == 0: + time.sleep(10) diff --git a/tests/jobs/test_job_runner.py b/tests/jobs/test_job_runner.py index 65422e64..7e80bf30 100644 --- a/tests/jobs/test_job_runner.py +++ b/tests/jobs/test_job_runner.py @@ -66,7 +66,8 @@ def test_get_jobs_to_run(flask_client): ), ) Session.commit() - jobs = get_jobs_to_run() + taken_before_time = arrow.now().shift(minutes=-config.JOB_TAKEN_RETRY_WAIT_MINS) + jobs = get_jobs_to_run(taken_before_time) assert len(jobs) == len(expected_jobs_to_run) job_ids = [job.id for job in jobs] for job in expected_jobs_to_run: