輕量級消息隊列 Django-Q 輕度體驗

前言

最近做的這個項目(基於 Django),需要做個功能,實現定時採集車輛定位。

這讓我想起來幾年前那個 OneCat 項目,當時我用的是 Celery 這個很重的組件

Celery 實在是太重了,後來我做公衆號採集平臺的時候,又接觸了 Django-RQ 和 Django-Q 這倆,前者是對 RQ 的封裝,讓 RQ 和 Django 更好的結合在一起;後者是一個全新的「多進程任務隊列」組件,相比起 celery 很輕量,當時使用的時候就給我留下不錯的印象。

於是這個項目我決定繼續使用 Django-Q 來實現一些異步操作和定時任務。

關於 Django-Q

官方介紹:

A multiprocessing task queue for Django

快速開始

安裝

pip install django-q

添加到 INSTALLED_APPS

INSTALLED_APPS = (
    # other apps
    'django_q',
)

數據庫遷移

由於 Django-Q 會把執行結果放到數據庫裏,所以要執行一下數據庫遷移的操作

python manage.py migrate

這個操作會生成 django_q_ormqdjango_q_scheduledjango_q_task 三個表

配置

因爲本身項目用的緩存就是 Redis,所以我直接用 Redis 作爲消息隊列的後端(broker)

Django-Q 支持很多種後端,除了 Redis 還有 Disque、IronMQ、Amazon SQS、MongoDB 或者是 Django 的 ORM~

settings.py 中添加以下配置:

Q_CLUSTER = {
    'name''project_name',
    'workers': 4,
    'recycle': 500,
    'timeout': 60,
    'compress': True,
    'cpu_affinity': 1,
    'save_limit': 250,
    'queue_limit': 500,
    'label''Django Q',
    'redis'{
        'host': 127.0.0.1',
        'port': 6379,
        'db': 0,
    }
}

啓動服務

python manage.py qcluster

搞定,現在消息隊列服務已經跑起來了

我們可以添加異步任務或者定時任務

異步任務

最簡單的方式是使用它提供的 async_task 方法,添加一個新的異步任務到隊列中

來寫個例子,輸入一個數,求階乘之後開平方

import math

def demo_task(number: int):
    return math.sqrt(math.factorial(number))

啓動任務

然後來添加一個異步任務

from django_q.tasks import async_task, Task

def task_finish(task: Task):
    print(f'任務 {task.name}(ID:{task.id})完成!')

task_id = async_task(
    demo_task, 10,
    task_name='任務名稱',
    hook=task_finish,
)

可以看到,直接調用 async_task 方法就行

這個方法的定義是

async_task(func: Any, *args: Any, **kwargs: Any)

傳入要異步執行的方法之後,可以把該方法的參數跟在後面傳進去,也可以用 kwargs 的方式傳入

這兩種方式都可以的:

我個人比較喜歡第一種,因爲 Django-Q 本身有幾個命名參數,比如 task_namehooktimeout之類的,用第一種方式傳參不容易和 Django-Q 默認的命名參數衝突。

獲取執行結果

有兩種方式獲取任務的執行結果:

第一種方式無需贅述,在安裝 Django-Q 組件後執行了數據庫遷移,就會生成 Failed tasksScheduled tasksSuccessful tasks 三個 admin 模塊,顧名思義,在 Failed tasksSuccessful tasks 中可以看到任務的執行結果,也就是我們寫在 demo_task 裏的返回值。

第二種方式,代碼如下:

from django_q.tasks import result

task_result = result(task_id)

task_id 傳入就可以查詢任務執行的結果,如果任務還沒執行完,那結果就是 None

這個 result 方法還有個 wait 參數,可以設置等待時間,單位是毫秒

執行完成回調

上面代碼中,我們還設置了 hook 參數

作用就是任務執行完成之後,執行 task_finish 這個函數

task_finish 裏可以通過 task 參數獲取任務信息

就是這樣~

async_task 的其他參數

創建異步任務的這個方法還有很多參數,官網文檔寫得還算可以,很多參數都是 Q_CLUSTER 配置裏面有的,在 async_task 裏設置這些參數就會覆蓋默認的配置。

我直接搬運一波,權當翻譯文檔了~

除了上面介紹到的 task_namehook 還有這些參數:

q_options 參數

根據前面啓動任務的部分,我們啓動異步任務的時候,可以通過命名參數向任務方法傳遞參數,比如:

async_task(demo_task, number=10)

async_task 這個方法本身又有很多參數,如果這個參數名稱和我們要執行的任務 demo_task 參數重名的話,這些參數就被 async_task 拿走了,我們的任務 demo_task 就拿不到這些參數了。

怎麼辦?

q_options 參數就是爲了解決這個問題

可以把要傳給 async_task 的參數都包裝在一個 dict 裏面,然後通過 q_options 參數傳入

假如我們的 demo_task 是這樣的:

def demo_task(number: int, timeout: int):
  ...

除了 number 這個參數,還要接收一個跟 async_task 自有參數重名的 timeout 參數,使用 q_options 的解決方案如下

opts = {
    'hook''hooks.print_result',
    'group''math',
    'timeout'30
}

async_task(demo_task, number=10, timeout=100, q_options=opts)

這樣既能…… 又能……,完美啊~

當然我還是建議用 *args 的方式傳參,這樣就沒有參數重名的問題了。

定時任務

有兩種方式添加定時任務

在代碼中添加

比較簡單,直接上代碼

from django_q.tasks import schedule

schedule(
  'demo_task',
  schedule_type=Schedule.MINUTES,
  minutes=1,
  task_name='任務名稱',
)

有一點注意的是,因爲添加後的定時任務是要保存在數據庫中的

所以需要把要執行的方法(包含完整包名),以字符串的形式傳入

假如在我們的 Django 項目中,要執行的是在 apps/test/tasks.py 文件中的 demo_task 方法

那麼需要把 apps.test.tasks.demo_task 這個完整的名稱傳入

在 admin 中添加也是一樣

時間間隔設置

Django-Q 的定時任務有很多類型:

注意,即使是 Cron 表達式,定時任務執行的最短間隔也是 1 分鐘

這點我一開始不知道,用 Cron 表達式寫了個 15 秒的任務,但執行時間根本不對,然後我翻了一下 github 上的 issues,看到作者的解答才知道~

那個 Issues 的地址:https://github.com/Koed00/django-q/issues/179

作者的回覆:

The current design has a heartbeat of 30 seconds, which means the schedule table can't have schedules below that. Most of this is explained in the architecture docs. Because of the way the internal loop is set up, a resolution under a dozen seconds or so, quickly becomes unreliable.

I always imagined tasks that need accuracy measured in seconds, would use a delayed tasks strategy where a few seconds delay is either added through the broker or inside the task itself.

The problem with all this, is that a task is currently always added to the back of the queue. So even with a 1 second resolution on the schedule, the task still has to wait it's execution time. Which can of course vary wildly depending on the broker type, worker capacity and current workload.

這點感覺有些雞肋,如果要高頻執行的任務,那隻能選擇 Celery 了

在 admin 後臺添加

這個更簡單,傻瓜式操作

所以這部分略過了~

docker 部署

現在後端服務基本是用 docker 部署的

爲了能在 docker 中使用 Django-Q

我們需要在原有 Django 容器的基礎上,再起一個同樣的容器,然後入口改成 qcluster 的啓動命令

這裏有個 issues 也有討論這個問題:https://github.com/Koed00/django-q/issues/513

來個 docker-compose.yml 的例子

version: "3.9"
services:  
  redis:
    image: redis:alpine
    ports:
      - 6379:6379
  web:
    build: .
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - redis
      - django_q
  django_q:
    build: .
    command: python manage.py qcluster
    volumes:
      - .:/code
    depends_on:
      - redis

一個簡單的例子

其他的類似環境變量這些,根據實際情況來

注意:

其他

命令行工具

Django-Q 還提供了一些命令行工具

除了使用命令監控,還可以在代碼裏做監控,不過我暫時沒用到,所以還沒研究,有需要的同學可以直接看文檔

admin 自定義

安裝完 Django-Q 後,會在 admin 出現三個菜單,跟普通的 Django app 一樣,這些也是通過 admin 註冊進去的,因此我們可以重新註冊這些 ModelAdmin 來自定義 admin 上的操作界面

來一段官方關於失敗任務界面的代碼:

from django_q import models as q_models
from django_q import admin as q_admin

admin.site.unregister([q_models.Failure])
@admin.register(q_models.Failure)
class ChildClassAdmin(q_admin.FailAdmin):
    list_display = (
        'name',
        'func',
        'result',
        'started',
        # add attempt_count to list_display
        'attempt_count'
    )

跟普通的 ModelAdmin 是一樣的

我們可以自行添加搜索框、過濾字段之類的。記得要先執行 admin.site.unregister([q_models.Failure]) 取消之前 Django-Q 自己註冊的 ModelAdmin 對象。

信號

Django 內置信號系統,我之前有寫過一篇簡單的文章介紹:3 分鐘看懂 Python 後端必須知道的 Django 的信號機制

Django-Q 提供了兩類信號:

例子代碼如下:

from django.dispatch import receiver
from django_q.signals import pre_enqueue, pre_execute

@receiver(pre_enqueue)
def my_pre_enqueue_callback(sender, task, **kwargs):
    print("Task {} will be enqueued".format(task["name"]))

@receiver(pre_execute)
def my_pre_execute_callback(sender, func, task, **kwargs):
    print("Task {} will be executed by calling {}".format(
          task["name"], func))

有需要的話可以註冊消息接收器,做一些處理。(不過我暫時是沒用上)

小結

搞定~

Django-Q 使用下來的體驗還是不錯的,足夠輕量,部署足夠方便,足以應付大部分場景了~

參考資料

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ciHJnOOO0GXkRihWTiX-fw