AI 파이프라인의 동시성 문제, Message Queue로 해결하기 (Redis + Celery)
이번 포스트에서는 진행중인 사이드 프로젝트의 AI 파이프라인을 개발하면서 겪었던 동시성(Concurrency) 문제와 이를 Message Queue 아키텍처로 해결한 과정을 공유하고자 합니다.
AI는 리소스를 많이 먹는다
기존 시스템은 Flask 서버가 요청을 받으면 threading 모듈을 사용해 백그라운드 스레드에서 무거운 AI 작업(Whisper STT, Ollama 번역, 요약 등)을 직접 처리했습니다.

하지만 이 방식에는 치명적인 문제가 있었습니다.
- 리소스 고갈: 음성 처리, 번역 작업은 cpu와 메모리를 엄청나게 잡아먹습니다. (macOS M3 Pro 에서 cup는 그리 많이 사용하지 않지만 약 60% 내외, 메모리 사용량은 90%가 기본이었습니다.)
- 병목 현상: 동시에 3개 이상의 요청이 들어오면 서버의 리소스가 바닥나면서 프로세스가 강제 종료(OOM Kill)되거나 응답 불능 상태에 빠졌습니다.
줄을 세우자 (Message Queue)
이 문제를 해결하기 위해 비동기 메시지 큐(Message Queue) 아키텍처를 도입하기로 결정했습니다.핵심 아이디어는 간단합니다. "요청을 받는 곳"과 "일을 처리하는 곳"을 분리하고, 그 사이에 "대기열(Queue)"을 두는 것입니다.
아키텍처 다이어그램
graph LR
User[Client] -->|POST /process| Server[Flask API]
Server -->|Enqueue| Redis[Redis Queue]
subgraph Worker Nodes
Worker1[Celery Worker]
end
Redis -->|Fetch Task| Worker1
Worker1 -->|Result Upload| DB[Backend DB]
기술 스택 선정
- Broker (Queue): Redis
- 가볍고 빠르며, 메모리 기반이라 처리 속도가 매우 우수합니다.
- Docker 컨테이너로 쉽게 띄울 수 있어 유지보수가 간편합니다.
- Worker: Celery
- Python 생태계에서 가장 표준적인 분산 작업 큐 라이브러리입니다.
- Redis와의 궁합이 좋고, 다양한 설정(재시도, 스케줄링 등)을 지원합니다.
- Monitoring: Flower
- 터미널에서 로그만 보는 것은 한계가 있습니다.
- 웹 대시보드를 통해 실시간으로 작업 상태와 워커의 리소스 사용량을 확인할 수 있습니다.
구현 과정
Docker Compose 구성
인프라 구성은 Docker Compose 하나로 정리했습니다.
version: "3.8"
services:
# 1. broker
redis:
image: redis:alpine
ports:
- "6379:6379"
# 2. API server
app:
build: .
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
depends_on:
- redis
# 3. Worker
worker:
build: .
command: celery -A celery_task.celery_app worker --concurrency=1
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
depends_on:
- redis
# 4. dashboard
dashboard:
image: mher/flower
ports:
- "5555:5555"여기서 핵심은 worker 서비스의 --concurrency=1 옵션입니다.이 옵션을 통해 워커가 한 번에 하나의 영상만 처리하도록 강제했습니다.(테스트해보니 2개까지 요청이 가능하지만 다른작업이 불가능해 1개로 설정했습니다. M3 Macbook Pro 18GB 기준.) 이제 여러개의 요청이 동시에 들어와도, Redis 큐에서 얌전히 기다리고 워커는 하나씩 차근차근 처리합니다.
Flask <-> Celery 연동
server.py에서는 더 이상 스레드를 생성하지 않고, Celery에 작업을 던지기만(delay) 합니다.
# server.py
from celery_task.tasks import process_video_task
@app.route('/process', methods=['POST'])
def process_video():
video_id = request.form.get('video_id')
# 즉시 리턴 (기다리지 않음)
task = process_video_task.delay(video_id)
return jsonify({
"message": "작업이 대기열에 추가되었습니다.",
"task_id": task.id
})결과 및 성과
- 서버 안정성 향상 :더 이상 과부하로 서버가 죽지 않습니다.
- 관측 가능성(Observability) 향상: 기존 터미널에서 확인하는 수고로움이 줄어들고 Flower 대시보드를 통해 "현재 남은 작업이 몇 개인지", "누가 실패했는지"를 한눈에 볼 수 있습니다.


마치며
처음에는 "그냥 스레드 쓰면 되지 않나?"라고 생각했지만, 운영 환경(특히 llm 같은 리소스가 큰 작업)에서의 안정성을 위해서는 비동기 큐 도입이 필수적임을 깨달았습니다.비슷한 고민을 하고 계신 분들께 이 글이 조금이나마 도움이 되기를 바랍니다.
Member discussion