DB

[Redis] redis 스터디 - WebSocket 을 이용해 채팅기능을 구현하기

죵욜이 2024. 12. 22. 04:16

redis 스터디의 3번째 목표는 채팅기능 구현이었다.

 

채팅의 핵심은 실시간 양방향 통신인데

 

다른분들은 RESTAPI 도 사용하고, redis 의 Pub/Sub 을 활용하여 다양한 방식으로 구현하셨고

 

소켓대신  gRPC 를 이용하신분도 있었다.

 

하지만 나같은초보는 하나라도 제대로못해보고 이것저것 하지말고 하나라도 제대로 완성해보기로 마음을먹었고

 

소켓을 이용해보기로 하였다.


REST 방식은 포스트맨으로 내가 제대로한것인가 확인을 하였지만

 

채팅같은경우는 내두는으로 확인을 하고싶어 브라우저에서 탭을 여러개 띄워두고 새로고침하면서 확인을 하기로하였다.

 

이와 관련해서 프론트작업을 해야하지만 나는 몰?루? 기에 역시나 지피티의 도움을 받았다.

 

먼저 코드부터 살펴보자

파일이름은 chat.py 이다

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import redis
import asyncio

# FastAPI 모듈에서 FastAPI클래스 가져와서 객체 app 생성
app = FastAPI()

# Redis 클라이언트 초기화
redis_client = redis.StrictRedis(host='localhost', port=6379, decode_responses=True)
CHANNEL_NAME = "chat"  # Redis 채널 이름 (chat 채널에 메시지를 발행하고 구독)

# 연결된 WebSocket 클라이언트 관리
class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []  # 연결된 WebSocket 클라이언트를 저장하는 리스트

    # WebSocket 연결을 수락하고 클라이언트를 활성 연결 목록에 추가
    async def connect(self, websocket: WebSocket):
        await websocket.accept()  # WebSocket 연결 수락
        self.active_connections.append(websocket)  # 연결된 클라이언트를 리스트에 추가

    # WebSocket 연결을 끊고 클라이언트를 연결 목록에서 제거
    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)  # 연결된 클라이언트 제거

    # 연결된 모든 클라이언트에게 메시지를 브로드캐스트
    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)  # 모든 클라이언트에게 메시지 전송

manager = ConnectionManager()  # ConnectionManager 클래스의 인스턴스 생성

# 채팅 내역을 Redis 리스트에 저장
def save_message(message: str):
    redis_client.lpush('chat_history', message)  # Redis 리스트에 새로운 메시지를 추가 (최신 메시지가 앞에 위치)

# 채팅 내역을 Redis 리스트에서 불러오기
def get_chat_history():
    # Redis 리스트에서 최신 50개의 메시지를 가져옵니다 (최신 메시지 순으로)
    return redis_client.lrange('chat_history', 0, 49)  # Redis 리스트에서 최대 50개 메시지 가져오기

# Redis Pub/Sub Listener (채널 구독)
async def redis_listener():  # async 는 비동기함수를 정의할 때 사용, -> 기다리는 작업할때 CPU 가 다른작업 처리할수있게 해줌 (await 와 같이써야함)
    pubsub = redis_client.pubsub()  # Redis Pub/Sub 객체 생성
    pubsub.subscribe(CHANNEL_NAME)  # 지정한 채널에 구독 시작
    # Redis로부터 메시지가 발행될 때마다 처리
    for message in pubsub.listen():
        if message['type'] == 'message':  # 'message' 타입의 메시지가 발행되면
            await manager.broadcast(message['data'])  
# 모든 WebSocket 연결에 메시지 전송(28번째줄) -> await - 비동기 함수 내부에서 비동기 작업을 호출할때 사용(해당 작업이 완료될 때까지 기다렸다가 이후의 코드 실행)

# 앱이 시작될 때 Redis Pub/Sub 리스너를 백그라운드에서 실행
@app.on_event("startup") # FastAPI 에서 제공하는 이벤트 핸들러 데코레이터중 하나, 애플리케이션이 시작될 때 실행해야 할 초기화 작업을 정의
async def startup_event():
    asyncio.create_task(redis_listener())  # redis_listener 함수 비동기적으로 실행
    try:
        redis_client.ping()  # Redis 서버에 연결 테스트
        print("Redis 연결 성공!")
    except redis.ConnectionError:
        print("Redis 연결 실패!")  # 성공,실패 이거는 레디스 연결이 잘된건지 확인할라고 집어넣은거임 로직상 필요는 X

# '/' 경로에 대한 HTTP GET 요청을 처리하는 함수
@app.get("/")
async def get():
    # HTML 페이지를 반환 (간단한 채팅 UI) -> GPT 가 짜줫어요
    html_content = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>Redis Chat</title>
        <script>
            // WebSocket 연결 생성
            let ws = new WebSocket("ws://127.0.0.1:8000/ws");
            // 메시지가 수신되면 이 함수가 실행되어 메시지를 화면에 추가
            ws.onmessage = function(event) {
                let messages = document.getElementById('messages');  // 채팅 메시지를 출력할 div
                let message = document.createElement('div');  // 새로운 div 요소 생성
                message.textContent = event.data;  // 수신된 메시지를 div의 텍스트로 설정
                messages.appendChild(message);  // div 요소를 화면에 추가
                messages.scrollTop = messages.scrollHeight;  // 스크롤을 가장 아래로 설정
            };
            // 메시지를 WebSocket 서버로 전송하는 함수
            function sendMessage() {
                let input = document.getElementById("message");  // 입력 필드
                ws.send(input.value);  // WebSocket을 통해 메시지 전송
                input.value = '';  // 입력 필드 비우기
            }
        </script>
    </head>
    <body>
        <h1>Redis Chat</h1>
        <!-- 메시지 출력 영역 (스크롤이 가능한 영역) -->
        <div id="messages" style="border: 1px solid black; height: 300px; overflow-y: scroll; padding: 10px;"></div>
        <!-- 사용자 입력 필드와 버튼 -->
        <input type="text" id="message" placeholder="Enter message"/>
        <button onclick="sendMessage()">Send</button>
    </body>
    </html>
    """
    return HTMLResponse(content=html_content)  # HTML 페이지 반환

# WebSocket 엔드포인트 '/ws' (클라이언트와 실시간 통신을 위한 WebSocket 처리)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)  # WebSocket 연결을 수락하고 연결 리스트에 추가 (19번째 줄)

    # 클라이언트가 WebSocket에 연결될 때 이전 채팅 내역을 전송
    chat_history = get_chat_history()  # Redis에서 최근 채팅 내역 가져오기 (39번째 줄)
    for message in chat_history:
        await websocket.send_text(message)  # 이전 채팅 내역을 클라이언트에게 전송

    try:
        while True:
            # 클라이언트로부터 메시지를 받음
            data = await websocket.receive_text()  # 클라이언트에서 전송한 텍스트 메시지 수신
            save_message(data)  # 새로운 메시지를 Redis에 저장 (35번째 줄)
            redis_client.publish(CHANNEL_NAME, data)  # Redis 채널에 메시지 발행
    except WebSocketDisconnect:
        # WebSocket 연결이 끊어지면 연결 관리에서 해당 클라이언트를 제거
        manager.disconnect(websocket)

레디스의 Pub/Sub 을 활용하였고 눈으로 확인하기쉽게 html 페이지도 추가하였다.

 

주석으로 설명을 달아두긴했지만 FastAPI 가뭔지 동기/비동기가 뭔지 개념이 잡히지 않은상태여서 더욱 어려웠다.


관련내용을 살짝 정리하자면

import asyncio

임포트를 하고

 

함수 선언할때 async 를 앞에 붙히게되는데

async def redis_listener(): 
    pass

이때 async 는 비동기함수를 정의할 때 사용된다

기다리는 작업할때 CPU 가 다른작업을 처리할수 있게 해주는 역할이다.

 

그리고 await 와 같이 사용한다

async def redis_listener():  
    pubsub = redis_client.pubsub()  
    pubsub.subscribe(CHANNEL_NAME)  
    
    for message in pubsub.listen():
        if message['type'] == 'message':  
            await manager.broadcast(message['data'])

이런식으로 사용하였는데

await 는 비동기 함수 내부에서 비동기 작업을 호출할 때 사용한다 

이는 해당 작업이 완료될 때까지 기다렸다가 이후의 코드를 실행하게 한다.


아무튼 코드를 작성한후 테스트를 해보자면 

전에 만든 Todo리스트때와 같다

 

먼저 redis 서버를 실행한후, uvicorn 으로 app 을 실행시킨다

$ redis-server

서버가 잘 실행되었고

$ uvicorn chat:app --reload

redis가 잘 연결된지 확인하기위해 ping 을보내면 print("Redis 연결 성공!") 까지 잘 확인하였다.

 

이후 html 페이지를 만들어놨으니 브라우저에서 탭을 2개 띄워본다

 

그후 메시지를 보내면

 

잘 들어오는걸 볼 수 있었다.

redis DB 도 확인하기위해

$ redis-cli

로 연결한후 

 

나는 채팅방을 "chat" 하나만 만들고 Pub/Sub 을 사용하였으니

chat을 구독해서 채팅내역을 받아보기로 하였다.

127.0.0.1:6379> SUBSCRIBE chat

그후

pandas is shit 이라는 메시지를 보내면

이렇게 잘 들어온걸 볼 수 있었다.


후 진짜 인공지능 공부와함께 레디스와 소켓을 이용해보자니 너무나도 빡센것 같았다.

 

이번에도 스터디원분들의 도움이 없었다면 나는 중간에 포기했을지도 모른다,,,

 

하지만 하다보니 재미있고 안되던걸 찾아보다가 성공했을때의 쾌감이 진짜 미친도파민이다

 

이것으로 레디스 스터디는 끝나게되었지만

 

분명히 추후에 프로젝트를 하게되면 관계형DB 의 보조역할로 무조건 사용할것같았다

 

그때가서 헤메지않게 지금 맛보기를 해본게 너무 만족스럽다

 

정말 지옥같은 일정이지만 그만큼 경험치를 많이 먹은것 같다