본문으로 바로가기

FastAPI에서 이벤트 다루는 방법

category Coding/Python 2021. 12. 15. 20:41
반응형

개요

상황에 따라서 이벤트를 발생시킬 필요성을 느낄 때가 있다. 하나의 메소드가 너무 많은 책임을 가지고 있어서 책임을 분리하고 싶을 경우, 너무 많은 객체와 연관되어 있어서 의존성을 분리하고 싶을 경우 등 이벤트로 처리해야하는 상황이 심심치 않게 발생한다. 스프링의 경우 ApplicationEventPublisher라는 인터페이스가 존재하고 해당 인터페이스를 사용하면 개발자는 별다른 설정을 해줄 필요 없이 이벤트를 발행할 수 있다. (참고: https://hides.tistory.com/1084)

요즘 스프링을 꾸준히 공부하고 있는데, 스프링에는 위처럼 이벤트를 다루는 인터페이스뿐만 아니라 엔터프라이즈 레벨의 아키텍처를 설계할 때 도움이 될만한 기능이 기본적으로, 또는 라이브러리 형태로 제공해주는 경우가 상당히 많은 것 같다. 따라서 파이썬에서도 비슷한 기능을 구현해보면 어떨까 생각이 들었고 현재 서버 개발로 사용하고 있는 FastAPI 프레임워크 위에 러프하게나마 구현해 본 경험에 대해 포스팅한다.

구조

지금부터 설명할 내용을 간단하게 그림으로 표현해봤다. 하나하나 설명하면 다음과 같다.

1. Request가 들어오면 Middleware를 거친다. 해당 Middleware에서 EventHandler를 Context-local하게 세팅한다.

2. Router(FastAPI에서는 view/controller를 router라고 부른다) 에서 EventHandler를 통해 이벤트를 store(저장)한다.

3. 최종적으로 @EventListener 데코레이터가 모든 이벤트를 publish(실행)한다.

Event

먼저 이벤트부터 만들어본다. 이벤트는 간단하게 run() 이라는 메소드 하나만을 가지고 있고 일관성있는 규격을 제공하기 위해 BaseEvent라는 인터페이스부터 만들어준다.

from abc import ABC, abstractmethod
from typing import Type, Union

from pydantic import BaseModel


class BaseEvent(ABC):
    @abstractmethod
    async def run(self, parameter: Union[Type[BaseModel], None]) -> None:
        pass

다음으로 해당 인터페이스를 상속받은 실 구현 객체를 만든다.

class SlackEvent(BaseEvent):
    async def run(self, parameter: SlackEventParameter) -> None:
        print(f"SLACK EVENT / {parameter.channel} / {parameter.message}")

(본 예제에서는 간단하게 같이 입력받은 파라미터를 print()하는 기능만을 넣어줬다) 다음으로 파라미터를 구현해주자. 이벤트는 아무런 파라미터가 필요없을수도 있지만 때에 따라 파라미터가 필요한 경우가 있기 때문이다.

from pydantic import BaseModel


class SlackEventParameter(BaseModel):
    channel: str
    message: str

EventHandler

이벤트 핸들러는 우리가 입력한 이벤트를 객체 내부에 저장하는 store() 메소드와 모두 실행하는 publish() 라는 총 2개의 메소드를 가진다.

def __init__(self):
    self.events: Dict[Type[BaseEvent], Union[BaseModel, None]] = {}

먼저 생성자부터 살펴보자. 내부적으로 events라는 딕셔너리를 가지고 있으며 해당 딕셔너리의 key는 위에서 생성한 Event를 상속받은 객체이다. 그리고 value는 마찬가지로 위에서 생성한 파라미터 객체가 될 것이다.

async def store(
    self, event: Type[BaseEvent], parameter: BaseModel = None,
) -> None:
    if not issubclass(event, BaseEvent):
        raise InvalidEventTypeException

    if parameter and not isinstance(parameter, BaseModel):
        raise InvalidParameterTypeException

    self.events[event] = parameter

store() 메소드는 우리가 입력한 이벤트를 객체 내부에 저장한다고 했다. 단순히 {이벤트 객체: 파라미터 객체} 형태로 내부에 저장시키며 이벤트는 BaseEvent를 상속받았는지, 파라미터또한 BaseModel을 상속받았는지에 대해 검사한다.

async def publish(self) -> None:
    event: Type[BaseEvent]
    for event, parameter in self.events.items():
        await event().run(parameter=parameter)

    self.events.clear()

publish() 메소드는 store() 메소드를 통해 저장한 이벤트를 모두 실행시킨다. 물론 파라미터도 같이 넘겨준다. 코드를 보면 이벤트 객체에 있는 run() 메소드를 통해 이벤트를 실행시키는 모습을 확인할 수 있는데, BaseEvent라는 인터페이스를 맞춰 규격을 잡아준 이유가 바로 이 때문이다.

class EventHandler:
    def __init__(self):
        self.events: Dict[Type[BaseEvent], Union[BaseModel, None]] = {}

    async def store(
        self, event: Type[BaseEvent], parameter: BaseModel = None,
    ) -> None:
        if not issubclass(event, BaseEvent):
            raise InvalidEventTypeException

        if parameter and not isinstance(parameter, BaseModel):
            raise InvalidParameterTypeException

        self.events[event] = parameter

    async def publish(self) -> None:
        event: Type[BaseEvent]
        for event, parameter in self.events.items():
            await event().run(parameter=parameter)

        self.events.clear()

EventHandler의 전체 코드를 보면 위와 같다.

Context local

FastAPI는 비동기 프레임워크로써 내부적으로 코루틴을 사용한다. 코루틴은 단일 스레드에서 Concurrency하게 돌아가기 때문에 작업들 간에 제어권을 주고받으면서 동작한다. 이는 A라는 작업을 수행하다가 완료하지 않았음에도 불구하고 B라는 또다른 작업이 실행될 수 있음을 말한다. 만약 A작업이 실행되다가 제어권을 B작업에 제어권을 넘겨줬을 때 A작업이 사용중인 변수에 B작업이 접근한다면 의도치않은 오류가 발생할 수 있다. 

예를 하나 들어보자. 위에서 만든 EventHandler 클래스는 작업을 저장하고 실행하는 클래스이다. 만약 A작업에서 EventHandlerstore() 메소드를 통해 1이라는 이벤트를 발생시킨다면 현재 A작업의 EventHandler내부 events 딕셔너리에는 1이라는 이벤트가 담기게 된다. 그 이후 A작업이 완료되기 전에 제어권을 B작업으로 넘긴다고 가정한다. 이 때 B작업에서 EventHandler를 접근하면 어떻게 될까? A작업에서 저장한 1이라는 메소드가 내부 events 딕셔너리에 그대로 들어있는 문제가 발생할 것이다. 

따라서 이를 방지하기 위해 각 작업별로 다른 컨텍스트를 가지게 함으로써 Context local을 구현해야하고, 이는 파이썬에서 제공하는 Contextvar를 사용하여 해결할 수 있다.

event_context = ContextVar("event_context")

먼저 위처럼 컨텍스트 변수를 생성해준다.

def set_event_handler(handler: EventHandler) -> Token:
    return event_context.set(handler)

다음으로 생성한 변수에 값을 담아주기 위한 함수를 생성한다.

def reset_event_handler(token: Token) -> None:
    event_context.reset(token)

마지막으로 변수를 초기화하는 함수도 생성해준다.

EventMiddleware

class EventMiddleware(BaseHTTPMiddleware):
    async def dispatch(
        self, request: Request, call_next: RequestResponseEndpoint,
    ):
        token = set_event_handler(handler=EventHandler())

        try:
            response = await call_next(request)
        except Exception as e:
            raise e from None
        finally:
            reset_event_handler(token=token)

        return response

코드는 간단하다 위에서 만든 set_event_handler() 메소드를 통해 EventHandler를 생성하여 저장해주고 최종적으로 reset_event_handler() 메소드를 통해 해당 변수를 초기화시켜주는 코드이다. 

app.add_middleware(EventMiddleware)

마지막으로 FastAPI app에 우리의 미들웨어를 등록해준다. 이제 Request가 들어올 때 미들웨어부터 실행이 되고 해당 미들웨어에서 각 컨텍스트에 맞는 EventHandler를 생성해준 상태까지 왔다.

EventListener

EventListener는 실제 이벤트를 발생시킬 메소드에서 데코레이터로 사용하는 클래스이다.

from core.event import get_event_handler


class EventListener:
    def __call__(self, func):
        async def _inner(*args, **kwargs):
            event_handler = get_event_handler()

            try:
                result = await func(*args, **kwargs)
            except Exception as e:
                raise e from None

            await event_handler.publish()
            return result

        return _inner

get_event_handler() 를 통해 미들웨어에서 설정했던 EventHandler를 가져오고 메소드를 실행시킨 후 최종적으로 publish()를 통해 모든 이벤트를 실행하는 코드이다.

Usage

마지막으로 어떻게 사용해야 하는지 간단하게 알아보자.

@EventListener()
async def test():
    event_handler = get_event_handler()
    await event_handler.store(
        event=SlackEvent,
        parameter=SlackEventParameter(channel="channel", message="message"),
    )

get_event_handler() 를 통해 컨텍스트 변수에 담긴 핸들러를 얻어오고 해당 핸들러에 이벤트를 저장한다. 그러면 test 함수가 종료될 때 @EventListener 데코레이터가 핸들러에 저장된 모든 이벤트를 실행시킨다.

Pytest로 테스트 시 문제점

현재 EventMiddleware의 경우 BaseHTTPMiddleware를 상속받아 구현했는데 pytest로 테스트를 실행하면 해당 미들웨어를 거치지않는다. 그런데 우리는 미들웨어를 통해 EventHandler를 컨텍스트 변수에 담아주는 작업을 진행했다. 따라서 테스트를 실행하면 get_event_handler() 메소드에서 컨텍스트 변수에 담긴 EventHandler를 꺼내올 때 LookupError가 발생한다. 

이는 간단하게 해결할 수 있는데 아래의 코드처럼 현재 pytest가 실행중인지를 검사하고 맞다면 임의적으로 객체를 생성해서 리턴해주는 코드를 추가해주면 된다.

def get_event_handler() -> EventHandler:
    import os

    if "PYTEST_CURRENT_TEST" in os.environ:
        return EventHandler()

    return event_context.get()

소스 코드

모든 소스코드는 https://github.com/teamhide/fastapi-boilerplate 에 구현해놨으며 core/event폴더로 들어가면 본 포스팅에서 기술한 모든 소스코드를 확인할 수 있다. 또한 오픈소스로 배포했기 때문에 https://github.com/teamhide/fastapi-event 또는 

pip3 install fastapi-event

명령어를 통해 설치한 후 사용할 수 있다.

반응형

댓글을 달아 주세요