본문으로 바로가기

FastAPI SQLAlchemy Session 객체 연동하기

category Coding/Python 2021. 2. 15. 15:18
반응형

개요

동기적으로 돌아가는 서버의 경우 Thread-local을 적용하면 각 Request별로 다른 Context를 가지도록 할 수 있다. 하지만 asyncio를 통해 이벤트 루프를 사용하는 비동기 서버의 경우(FastAPI, Sanic...) Thread-local로는 이러한 문제를 해결할 수 없다. 따라서 Python 3.7 버전부터 제공되는 ContextVar를 사용해야하는데 이에 대해 간략하게 기술한다. 참고로 예제 서버는 FastAPI이다.

발생할 수 있는 문제

그렇다면 어떠한 문제가 발생할 수 있을까? 일반적으로 파이썬의 코루틴은 싱글 스레드로 동작하기때문에 하나의 스레드에서 여러가지의 일을 Concurrency하게 번갈아가며 수행한다. 그렇기에 SQLAlchemy의 세션 객체 또한 공유하면서 사용하는데, 예를 들어 A Request의 특정 함수에서 세션에 모델을 add()하고 await하는 도중에 B Request의 특정 함수에서 예외가 발생한다면 A Request에서 add()한 모델또한 디비에 반영되지 않을 것이다. 한마디로 하나의 세션 객체를 통해 일을 진행하기때문에 여러가지의 작업들이 겹칠 가능성이 존재하고, 하나의 작업에서 문제가 발생하면 타 작업에도 영향이 전파된다.

따라서 각 Context별로 Thread-local과 동일하게 다른 세션 객체를 사용하게해야 위와 같은 문제를 피할 수 있다.

사전 지식

  1. 예제 서버는 scoped_session() 을 사용하여 Thread-local이 적용된 세션 객체를 사용하고 있다.

  2. SQLAlchemy는 세션을 가져올 때 registry라는 내부 딕셔너리를 사용한다. (ex. session.registry)

  3. scopefunc에 콜백 함수를 등록함으로써 세션을 가져올 때 어떠한 형태로 키값에 접근하는지에 대해 정의해줄 수 있다. (ex. session.registry.scopefunc = get_request_id)

  4. 최초 scoped_session() 를 통해 Thread-local로 감쌀 때 scopefunc라는 인자로 더미 함수를 넣어줘야한다. 물론 아래에 설명한 것 처럼 ContextVar를 가져오는 함수를 바로 넣어주면 편리하겠지만 모듈을 세분화하여 사용하는경우 마땅치가 않기 때문이다. (여기서 더미함수란 함수의 내용에 pass 또는 raise NotImplementedError 가 작성된 함수를 말한다)

예제는 github.com/teamhide/fastapi-boilerplate/blob/master/core/middlewares/sqlalchemy.py 에서 확인해볼 수 있다.

세션 생성

사전 지식 4번에 설명한 것 처럼 sessionmaker()로 세션을 생성하고, 해당 세션을 Thread-local형태로 만들기위해 scoped_session()로 감싸줄 때 scopefunc 인자에 함수를 넣어줘야한다.

def scopefunc():
    raise NotImplementedError


engine = create_engine(get_config().DB_URL, pool_recycle=3600)
session: Union[Session, scoped_session] = scoped_session(
    sessionmaker(autocommit=False, autoflush=False, bind=engine),
    scopefunc=scopefunc  # HERE
)

그렇지 않으면 나중에 registry를 확인해봤을 때 단순히 Thread-local 객체가 담기게되어 원하는대로 동작하지 않는다.

SQLAlchemyMiddleware 미들웨어 클래스

우리 서버에서는 미들웨어 형태로 구현하여 Request가 올 때 마다 특정 루틴을 거치도록 구성해놓았다. 따라서 SQLAlchemyMiddleware 클래스를 하나하나 살펴보며 설명한다.

12번 라인

session_context = ContextVar("session_context")

내부적으로 사용할 컨텍스트 변수를 정의한 부분이다.

15번 라인

def get_request_id():
    return session_context.get()

위에서 설명한 scopefunc 에 넣어줄 콜백 함수이다. 컨텍스트 변수에 세팅되어있는 값을 가져오도록 되어있다.

27번 라인

request_id = str(uuid4())
token = session_context.set(request_id)

uuid를 통해 고유한 값을 생성해주고 컨텍스트 변수에 해당 값을 세팅하는 부분이다.

19번 라인

class SQLAlchemyMiddleware(BaseHTTPMiddleware):
    def __init__(self, app):
        super().__init__(app)
        session.registry.scopefunc = get_request_id

미들웨어의 생성자 부분이다. scopefunc에 위에서 생성한 콜백 함수를 넣어줌으로써 추후 세션이 사용될 때 어떠한 방식으로 키값을 결정할 지 정의해줬다.

따라서 위 미들웨어 코드는 아래와 같은 방식으로 동작한다.

  1. Request가 들어오면 SQLAlchemyMiddleware 에서 현재 Request의 세션을 SQLAlchemy의 registry부분에 저장시킨다. ({uuid4: SQLAlchemy 세션} 형태의 딕셔너리)
  2. 추후 세션을 사용할 때 registry.scopefunc 에 저장되어있는 함수를 통해 키값을 얻어내고 해당 키값을 기반으로 registry 에 접근하여 현재의 Request에 맞는 세션을 가져와서 사용한다.

따라서 각 Request별로 다른 세션 객체를 가지고 사용할 수 있다.

테스트

@home_router.get("/")
async def home():
    from app.models import User
    session.query(User).first()
    print(f"[*] registry: {session.registry.registry}")

    key = session.registry.scopefunc()
    print(f"[*] key: {key}")

    print(f"[*] session: {session.registry.registry[key]}")
    print()

    import asyncio
    await asyncio.sleep(3)
    return {"status": True}

위처럼 코드를 작성하고 연속으로 2번 요청을 보내면 아래와 같은 결과가 출력된다.

[*] registry: {'0085ee4d-598d-43b3-8122-403d77f25ef7': <sqlalchemy.orm.session.Session object at 0x108d09cd0>}
[*] key: 0085ee4d-598d-43b3-8122-403d77f25ef7
[*] session: <sqlalchemy.orm.session.Session object at 0x108d09cd0>

[*] registry: {'0085ee4d-598d-43b3-8122-403d77f25ef7': <sqlalchemy.orm.session.Session object at 0x108d09cd0>, '4d5d1ac3-fb88-41bb-bcfb-77de7811cb25': <sqlalchemy.orm.session.Session object at 0x108d67bb0>}
[*] key: 4d5d1ac3-fb88-41bb-bcfb-77de7811cb25
[*] session: <sqlalchemy.orm.session.Session object at 0x108d67bb0>
  • 최초 요청 시 registry에는 하나의 키/값만 존재했다.
  • 최초 요청의 sleep으로 인해 아직 요청이 끝나지 않은 상황에서 두번째 요청이 들어갔기 때문에 registry에 두번째 요청의 키/값이 추가되었다.
  • 결과적으로 첫번째 요청과 두번째 요청의 키가 다르며 세션의 id값 또한 다르다.

FastAPI boilerplate 전체 소스는 github.com/teamhide/fastapi-boilerplate 를 참고한다.

반응형

댓글을 달아 주세요