Trong dự án công trình hiện tại của mình khi tới phần scaling khối hệ thống thì bản vẽ xây dựng hiện tại theo hướng microservice gặp mặt phải một vấn đề: đông đảo service trong hệ thống đều shop trực tiếp với database nên xẩy ra vấn đề càng những service thì càng nhiều kết nối tới database dẫn mang đến tình trạng xẩy ra deadlock, performance cũng khá chậm do những kết nối tới database từ những service phải chờ nhau giải phóng.

Bạn đang xem: Celery

Sau lúc được gợi nhắc về câu hỏi chuyển sang cần sử dụng hàng chờ thay vì để các service thao tác trực tiếp với database, mình bao gồm dành thời gian xem thêm về kiến trúc Queue. Do dự án chạy hầu hết bằng python nên tech lead gợi ý sử dụng Celery, một hệ thống thống trị queue phổ biến.

Kiến trúc sau khoản thời gian chuyển sang sử dụng queue trong hệ thống của chính mình sẽ như sau. Một nội dung bài viết khá cụ thể về một dạng xây dựng queue là message queue mọi người dân có thể đọc thêm ở toidicodedao

*

Về CeleryLà một hệ thống làm chủ hàng đợi xử lý task thời hạn thực. Trong hệ thống Celery bọn họ sẽ sử dụng khái niệm task y như job ở một số framework khác như Sidekiq.Input của celery cần kết nối với một nhiều loại message broker còn output có thể kết nối tới một khối hệ thống backend để tàng trữ kết quả

Mọi người có thể tham khảo một bài viết khác về Celery bên trên infokazanlak.com ngơi nghỉ đây. Hình như Celery cũng có một khối hệ thống document chi tiết và dễ đọc ở trang chủ https://docs.celeryproject.org/en/latest/getting-started/introduction.html.

Các câu hỏi nên áp dụng CeleryChạy background jobsChạy các job lập lịchTính toán phân tánXử lý tuy vậy songCác chức năng chính Celery cung cấpMonitor: thống kê giám sát các job/task được đưa vào queueScheduling: chạy các task lập lịch (giống cronjob)Workflows: tạo thành một luồng xử trí taskTime và Rate Limits: kiểm soát và điều hành số lượng task được triển khai trong một khoảng tầm thời gian, thời gian một task được chạy,...Resource Leak Protection: điều hành và kiểm soát tài nguyên trong quá trình xử lý taskUser Component: có thể chấp nhận được người dùng tự customize các worker.Cơ chế của CeleryCelery vận động dựa trên có mang task queue. Đây là chính sách queue dùng để điều phối các job/work giữa những máy không giống nhau. Những worker sẽ nhận task, chạy task và trả về kết quả.Input của queue:TaskCác process bên trên từng worker sẽ theo dõi queue để thực thi các task bắt đầu được đẩy vào queueCelery hay được dùng một message broker nhằm điều phối task giữa những clients cùng worker. Để chế tạo một task new client đã thêm một message vào queue, broker tiếp nối sẽ chuyển message này tới worker. Celery cung cấp 3 nhiều loại broker:RabbitMQRedisSQSMột hệ thống sử dụng celery gồm thể có khá nhiều workers với brokers, dựa vào vậy việc scale theo chiều ngang sẽ rất dễ dàng.Các module chủ yếu của Celery

Application

Một instance được khởi tạo nên từ thư viện Celery được hotline là application

Nhiều Celery application hoàn toàn có thể cùng mãi sau trong một process

Khởi sản xuất một celery application:

from celery import Celeryapp = Celery()Khi gởi một message cho tới queue, message này sẽ chỉ chứa tên của task đề xuất thực thi.

Các celery worker sẽ bản đồ giữa thương hiệu của task với hàm triển khai task đó, việc mapping do đó được điện thoại tư vấn là task registry


app.taskdef add(x, y):return x + y

Tasks

Task trong Celery gồm hai trọng trách chính:định nghĩa phần đông gì đang xảy ra sau khi một task được call (gửi đi message)định nghĩa phần lớn gì sẽ xảy ra khi một worker nhận thấy message đóMỗi task bao gồm một thương hiệu riêng ko trùng lặp, tên này sẽ tiến hành refer vào message nhằm worker hoàn toàn có thể tìm được đúng hàm nhằm thực thi. Còn nếu không định nghĩa tên cho task thì task đó sẽ tiến hành tự đặt tên dựa vào module mà lại task được quan niệm và tên function của task.Các message của task sẽ không trở nên xóa khỏi queue chừng làm sao message đó không được một worker xử lý. Một worker hoàn toàn có thể xử lý nhiều message, giả dụ worker bị crash cơ mà chưa cách xử lý hết các message kia thì bọn chúng vẫn hoàn toàn có thể được nhờ cất hộ lại tới một worker khácCác function của task yêu cầu ở tâm trạng idempotent: function không gây ra tác động gì bao gồm cả khi có bị gọi các lần với cùng 1 tham số => một task đã thực thi sẽ đảm bảo an toàn không bị chạy lại lần nữa.

Tạo task

Để chế tạo ra task họ dùng decorator
app.task(name="create_new_user")def create_user(username, password):User.objects.create(username=username, password=password)Để task có thể retry chúng ta cũng có thể bound task vào thiết yếu instance của nó


task(bind=True)def add(self, x, y):logger.info(self.request.id)Task cũng rất có thể kế thừa

import celeryclass MyTask(celery.Task):def on_failure(self, exc, task_id, args, kwargs, einfo): print("0!r failed: 1!r".format(task_id, exc))
task(base=MyTask)def add(x, y):raise KeyError()Để biết thêm tin tức và trạng thái của task bạn có thể sử dụng Task.request


app.task(bind=True)def dump_context(self, x, y):print("Executing task id 0.id, args: 0.args!r kwargs: 0.kwargs!r".format( self.request))Celery cai quản trạng thái của tasks và rất có thể lưu chúng trong các hệ thống gọi là result backend. Vòng đời mặc định của task vào Celery gồm:

PENDING: task ngóng được thực thi.

STARTED: task đang khởi chạy

SUCCESS: task đang chạy thành công

FAILURE: task gặp mặt lỗi sau thời điểm khởi chạy

RETRY: task đang rất được chạy lại

REVOKED: task được thu hồi lại

Ngoài các trạng thái mang định trên bạn có thể tự quan niệm thêm tâm lý và update trạng thái mang lại task bởi method update_state


app.task(bind=True)def upload_files(self, filenames):for i, file in enumerate(filenames): if not self.request.called_directly: self.update_state(state="PROGRESS", meta="current": i, "total": len(filenames))

Gọi task

Celery cung cấp các API để điện thoại tư vấn task sau khoản thời gian đã định nghĩa bọn chúng ở trên.

3 method chính:

apply_async: gửi task message.delay: giữ hộ task messagecalling: task message sẽ không được gửi đi tới worker cơ mà task sẽ tiến hành thực thi luôn bởi process hiện tại.

Có một task như sau:


app.taskdef add(x, y):return x + yĐể điện thoại tư vấn task này chúng ta sẽ thử cần sử dụng 2 method là apply_async cùng delay

Với delay họ sẽ viết như sau:

# task.delay(arg1, arg2, kwarg1="x", kwarg2="y")add.delay(10, 5)add.delay(a=10, b=5)Dùng apply_async thì cần viết phức hợp hơn một chút# task.apply_async(args=, kwargs="kwarg1": "x", "kwarg2": "y")add.apply_async(queue="low_priority", args=(10, 5))add.apply_async(queue="high_priority", kwargs="a": 10, "b": 5)Về thực chất delay và apply_async là tương đồng nhưng delay đã có sẵn các thiết lập mặc định và bọn họ chỉ có thể truyền vào mọi tham số phải đã định nghĩa trong function của task, còn với apply_async bạn có thể truyền thêm các tham số khác ví như queue bọn họ muốn nhờ cất hộ message vào,.... Best practice là nên thực hiện apply_async để tiện vấn đề config chạy task tùy theo yêu cầu sử dụng.

Celery cung ứng việc call task theo phương thức chaining, tác dụng của task này có thể được truyền vào task tiếp theo

add.apply_async((2, 2), link=add.s(16)) # 20Nhờ vào cách thức này bạn cũng có thể thiết kế callback đến task như sau
app.taskdef error_handler(uuid):result = AsyncResult(uuid)exc = result.get(propagate=False)print("Task 0 raised exception: 1!r 2!r".format( uuid, exc, result.traceback))add.apply_async((2, 2), link_error=error_handler.s())Sử dụng Celery

Cài đặt

pip install -U Celery

Sử dụng

Lựa chọn một số loại message broker cân xứng với dự án. Như đã nhắc đến ở trên Celery cung cấp 3 một số loại message broker là RabbitMQ, Redis, SQS. Mình sẽ đi sâu vào đối chiếu từng các loại message broker trong phần sau về Celery.

Tạo một celery worker cùng với task add

from celery Import Celeryapp = Celery("name of module", broker="url_of_broker")
app.taskdef add(x, y):return x + yChạy worker

$ celery -A tasks worker --loglevel=infoGọi task

Lưu kết quả

Celery hoàn toàn có thể lưu lại tinh thần của tasks nếu chúng ta cần quan sát và theo dõi tasks sau này. Cùng với các khối hệ thống thực hiện tại task theo phương thức state machine thì việc khối hệ thống cần thay được luồng tâm lý của task là cực kỳ quan trọng.

Xem thêm: Các Câu Hỏi Trắc Nghiệm Lịch Sử 12 Có Đáp Án Và Lời Giải Chi Tiết

Các hệ thống celery dùng để lưu tinh thần task:

SQLAlchemyMemcachedRedis

Để thực hiện cơ chế lưu công dụng trong Celery chúng ta khai báo celery worker bao gồm tham số backend. Ở trên đây mình sử dụng redis cho tất cả việc lưu kết quả task lẫn làm cho message broker

app = Celery("tasks", backend="redis://localhost", broker="redis://localhost:6379/0")

Cấu hình Celery

Cấu hình mang định cơ phiên bản của celery:

## Broker settings.broker_url = "redis://localhost:6379/0"# danh sách of modules lớn import when the Celery worker starts.imports = ("myapp.tasks",)## Using the database to store task state và results.result_backend = "db+sqlite:///results.db"task_annotations = "tasks.add": "rate_limit": "10/s"Best practice: tạo một tệp tin config riêng mang lại celery celeryconfig.py

broker_url = "redis://localhost:6379/0://"result_backend = "rpc://"task_serializer = "json"result_serializer = "json"accept_content = <"json">timezone = "Europe/Oslo"enable_utc = Truetask_routes = "tasks.add": "low-priority", # routing một task cho tới queue hy vọng muốnNgoài cách tạo file config bên trên ra chúng ta cũng rất có thể config trực tiếp bằng application của Celery app.conf

app.conf.update(enable_utc=True, timezone="Europe/London",)Tổng kếtCelery không cần thiết phải config nhiều mà chỉ việc import từ bỏ module thực hiện trực tiếp như sau

from celery Import Celeryapp = Celery("name of module", broker="url_of_broker")Worker và client của Celery hoàn toàn có thể tự retry

Một process của Celery hoàn toàn có thể xử lý hàng triệu task vào một phút cùng với độ trễ chỉ vài ba miligiây

Celery hỗ trợ:

Message brokers:RabbitMQRedisSQSXử lý concurrencymultiprocessingmultithreadsingle threadeventlet, geventLưu trữ tác dụng trên các hệ thống:AmqpRedisMemcachedSQLAlchemyAmazon S3File systemSerializationjsonyaml

Ở phần sau bài viết mình đang đi sâu hơn về worker trong Celery và hai các loại message broker cơ mà Celery hỗ trợ: SQS - Redis, đôi khi dựng một ứng dụng cơ bản sử dụng khối hệ thống này.