Celery 란?
공식 문서에선 Asynchronous Task Queue 라고 정의함
즉 백그라운드에서 실행할 작업을 큐에 넣고, 비동기적으로 실행하는 분산 시스템이다.
주요특징
- 비동기 작업 처리: 시간이 오래 걸리는 작업을 백그라운드에서 실행
- 분산 처리: 여러 워커(worker)를 실행해서 작업 부하를 분산
- 확장성: 여러 개의 작업을 동시에 처리 가능
- 재시도 기능: 실패한 작업을 자동으로 다시 실행 가능
웹서버에선 오래 걸리는 작업을 동기적으로 실행하면 사용자가 오래 기달려야 한다.
그래서 Celery 를 사용해 오래걸리는 작업을 백그라운드에서 실행시켜 빠른 응답속도와 높은 성능을 유지하는 것이다
더보기
백그라운드란?
- 사용자가 직접 상호작용하지 않는 상태에서 실행되는 작업을 의미
- 반대로 사용자가 직접 상호작용하는 작업은 → 포그라운드(Foreground)
백그라운드 작업을 실행하는 방식은
- 멀티스레딩 Multi-threading
- 멀티프로세싱 Multi-processing
- 테스크 큐 Task Queue
가 있는데 Celery 는 테스크 큐 방식을 사용
그래서 보통
- 이메일 전송
- 대용량 데이터 처리
- 웹 크롤링
- 머신러닝 모델 학습
등 대용량 작업을 동시에 처리하거나 요청에(HTTP) 무거운 연산이 포함되어서
즉각적인 응답을 제공하기 어려울때 Celery 를 활용함
기본개념
Celery 주요 구성 요소
- 메시지 브로커(Message Broker)
- 작업을 저장하고 전달하는 메시지 큐 역할
- 보통 Redis 또는 RabbitMQ 를 사용
- Celery 는 브로커를 통해 작업을 워커(worker)에게 전달함
- 워커(Worker)
- Celery 에서 실행 대기 중인 작업을 받아서 실행하는 프로세스
- 쉽게말하면 브로커에서 작업을 가져와 실행하는 프로세스
- 여러 개의 워커를 띄워서 분산 처리 가능
- 태스크(Task)
- Celery 가 실행할 개별 작업
- Python 함수 또는 메서드로 정의됨 → @task 데코레이터 사용해서 등록
- 비동기로 실행 가능 (.delay(), .apply_async())
- 비트(Beat) [선택 사항]
- 주기적으로 실행할 작업을 스케줄링하는 역할 (ex: 매일 자정 데이터 백업)
- celery beat를 실행하면 cron 처럼 특정 시간마다 자동으로 실행 가능
사용법
Celery 설치 및 앱 등록
pip install celery
만약 Redis 를 브로커로 사용하면
pip install redis
Celery 기본 설정
- 공식문서에서 Celery 설정을 별도 파일 (celery.py) 에 두는것을 권장함
폴더구조
ainfo-backend/
│── config/
│ │── __init__.py
│ │── celery.py # Celery 설정 파일
│ └── settings.py
│── app/
│ └── tasks.py # Celery 태스크 정의
celery.py
- 공식문서에서 권장하는 기본 설정
import os
from celery import Celery
# Django 설정 파일을 Celery에서 사용할 수 있도록 환경 변수 설정
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('config')
# Django settings.py에서 'CELERY_'로 시작하는 설정을 Celery가 가져오도록 설정
app.config_from_object('django.conf:settings', namespace='CELERY')
# Django 앱에서 task 자동 탐색
app.autodiscover_tasks()
settings.py
- 공식문서에서 Redis 를 브로커로 설정하는 방법
# Redis를 메시지 브로커로 설정
CELERY_BROKER_URL = 'redis://localhost:6379/0'
# 메시지 직렬화 방식 설정
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
Celery 테스크 작성 및 실행
테스크 작성 → app/tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def send_email(email):
print(f"📧 {email} 로 이메일을 보냈습니다!")
return f"Email sent to {email}"
Celery 워커 실행
- Redis 실행 → 우리는 지금 도커컴포즈에서 하니까 필요 X
redis-server
- Celery 워커 실행
celery -A config worker --loglevel=info
or
celery -A config worker --loglevel=info --pool=solo
# 백그라운드 실행
celery -A config worker --loglevel=info &
# 종료
celery -A config control shutdown
- -A config: Celery 설정이 있는 프로젝트 이름
- worker: Celery 워커 실행
- -loglevel=info: 로그 출력
Celery 테스크 실행
비동기 실행 → .delay()
from app.tasks import add, send_email
# 덧셈 태스크 실행
result = add.delay(10, 20)
print(result.id) # 태스크 ID 확인
# 이메일 전송 태스크 실행
send_email.delay("test@example.com")
주기적인 사용을 원하면? → Celery Beat 사용
주기적인 작업을 할때 스케줄링 기능을 하는데
경튜님이 말한 crontab 이나 저번조에서 자동 크롤링을 위해 내가한 apscheduler 같은
스케쥴링 기능을 Celery Beat 가 제공함
공식문서에선 django-celery-beat 패키지를 사용해 스케쥴링 하는걸 권장함
설치
pip install django-celery-beat
설정 → settings.py
INSTALLED_APPS = [
'django_celery_beat',
]
마이그레이션
python manage.py migrate django_celery_beat
주기적 실행을 위한 Celery Beat 실행
celery -A config beat --loglevel=info
Celery 모니터링 → Flower 사용
- 공식문서에서 Flower 를 사용해 Celery 작업을 실시간으로 모니터링하는걸 권장
설치
pip install flower
Flower 실행
celery -A proj flower
- 브라우저에서 http://localhost:5555/ 에 접속하면 테스크 상태 확인 가능
공식문서에서 강조하는것
- 작업은 가능한 한 짧고 가볍게 유지
- 너무 무거운 작업은 메시지브로커나 워커에 부하를 줄 수 있다
- 실패한 작업은 자동 재시도 하도록 설정
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=3)
def my_task(self):
pass
- 데이터 직렬화는 JSON 사용
CELERY_TASK_SERIALIZER = 'json'
- 비동기 실행을 적극 활용 → .delay()
- 주기적 작업이 필요하다면 → Celery Beat 사용
'Project > AInfo' 카테고리의 다른 글
[WebSocket] WebSocket 이란? (Django Channels) (0) | 2025.03.14 |
---|---|
[SMTP] 메일기능 활용 (0) | 2025.03.03 |
[SMTP] SMTP 란? (0) | 2025.03.02 |