Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

7848 Add RQ API #17938

Open
wants to merge 14 commits into
base: feature
Choose a base branch
from
Open

7848 Add RQ API #17938

wants to merge 14 commits into from

Conversation

arthanson
Copy link
Collaborator

@arthanson arthanson commented Nov 5, 2024

Fixes: #7848

Here is a testing command to add some tasks for testing the API response, create a temporary create_task management command within core.

Used APIView on the API's and not ViewSet as they don't fit with a ViewSet / only one operation per API can't Get, List, Delete, etc. and the URL structure doesn't directly conform with the pk used by ViewSet, however the side-effect of this is it doesn't appear in the browsable API.

Note: some of the status on these tasks might not be correct as they haven't been correctly moved to the queues by RQ, we are just directly placing them there.

from django_rq.management.commands.rqworker import Command as _Command

from datetime import datetime, timedelta
from django_rq import get_queue
from rq.registry import DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, ScheduledJobRegistry, StartedJobRegistry


class Command(_Command):
    # Dummy worker functions
    @staticmethod
    def dummy_job_default():
        return "Job finished"

    @staticmethod
    def dummy_job_high():
        return "Job finished"

    @staticmethod
    def dummy_job_failing():
        raise Exception("Job failed")

    def handle(self, *args, **options):
        queue = get_queue('default')

        job = queue.enqueue(self.dummy_job_default)
        registry = FinishedJobRegistry(queue.name, queue.connection)
        registry.add(job, -1)

        job = queue.enqueue(self.dummy_job_default)
        registry = FailedJobRegistry(queue.name, queue.connection)
        registry.add(job, -1)

        job = queue.enqueue(self.dummy_job_default)
        registry = DeferredJobRegistry(queue.name, queue.connection)
        registry.add(job, -1)

        job = queue.enqueue(self.dummy_job_default)
        registry = StartedJobRegistry(queue.name, queue.connection)
        registry.add(job, -1)

        job = queue.enqueue(self.dummy_job_default)
        when = datetime.now() + timedelta(hours=8)
        registry = ScheduledJobRegistry(queue.name, queue.connection)
        registry.schedule(job, when)

@arthanson arthanson changed the title DRAFT: 7848 Add RQ API 7848 Add RQ API Nov 7, 2024
@arthanson arthanson marked this pull request as ready for review November 7, 2024 01:29
return "Background Task"

@extend_schema(responses={200: OpenApiTypes.OBJECT})
def get(self, request, task_id, format=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be POST operations maybe?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; any operation which alters data must use a POST, PATCH, PUT, or DELETE request.

@jeremystretch jeremystretch added this to the v4.2 milestone Nov 7, 2024
return Response(serializer.data)


class DeferredTaskListView(BaseTaskListView):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we could combine all these task lists into a single view and pass the status (deferred, failed, etc.) as a URL path variable.

return "Background Task"

@extend_schema(responses={200: OpenApiTypes.OBJECT})
def get(self, request, task_id, format=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; any operation which alters data must use a POST, PATCH, PUT, or DELETE request.

return task


class TaskDetailView(BaseTaskView):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the individual task views can probably be combined into a single TaskView class with a custom action for each operation. This would eliminate a lot of boilerplate and simplify the URL paths.

@@ -10,5 +13,21 @@
router.register('jobs', views.JobViewSet)
router.register('object-changes', views.ObjectChangeViewSet)

app_name = 'core-api'
urlpatterns = router.urls
urlpatterns = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of these paths appear in the browseable API, though I'm not sure why.

@@ -71,3 +82,251 @@ class ObjectChangeViewSet(ReadOnlyModelViewSet):
queryset = ObjectChange.objects.valid_models()
serializer_class = serializers.ObjectChangeSerializer
filterset_class = filtersets.ObjectChangeFilterSet


class QueueListView(APIView):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These list views should replicate the existing API convention, which enables pagination of the results:

{
    "count": 24,
    "next": null,
    "previous": null,
    "results": [...]
}

for job in jobs:
job.scheduled_at = registry.get_scheduled_time(job)

jobs = get_rq_jobs_from_status(queue, status)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
jobs = get_rq_jobs_from_status(queue, status)
return get_rq_jobs_from_status(queue, status)

)


def get_rq_jobs_from_status(queue, status):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add docstrings for these functions.

try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to move the 404 handling out to the UI & API views, to ensure that these functions can be used cleanly internally e.g. within tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants