SQLAlchemy ошибка с транзакцией. Cannot use Connection.transaction() in a manually started transaction

Код используется во многих проектах и все работает. Но в одном сервисе иногда на проде происходит ошибка:

sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot use Connection.transaction() in a manually started transaction

Причем на деве и локально все ок.

Проблема в том что я не могу понять что такого натыкивают на проде что приводит к такой ошибке и следовательно у меня нет идей как решить проблему ведь в остальных сервисах и в этом сервисе на деве все ок.

Я уверен что нигде в коде нет ручного управления транзакциями кроме как в IoC контейнере и консьюмере.

Вот код IoC контейнера:

from dishka import Provider, Scope, provide
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.orm import sessionmaker

from config import Config, get_config
from infrastructure.db.sqlalchemy.setup import create_engine, create_session_pool
from infrastructure.uow.sqlalchemy import SQLAlchemyUnitOfWork


class BaseProvider(Provider):
    @provide(scope=Scope.APP)
    def config(self) -> Config:
        return get_config()

    @provide(scope=Scope.APP)
    def sqlalchemy_engine(self, config: Config) -> AsyncEngine:
        return create_engine(config.pg_config)

    @provide(scope=Scope.APP)
    def session_pool(self, sqlalchemy_engine: AsyncEngine) -> sessionmaker:
        return create_session_pool(sqlalchemy_engine)

    @provide(scope=Scope.REQUEST)
    def sqlalchemy_uow(self, session_factory: sessionmaker) -> SQLAlchemyUnitOfWork:
        return SQLAlchemyUnitOfWork(_session=session_factory())

Вот код для сетапа бд (setup.py):

from typing import Callable, AsyncContextManager

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, AsyncEngine
from sqlalchemy.orm import sessionmaker

from config import PGConfig
from infrastructure.db.sqlalchemy.models import BaseModel


def create_engine(db: PGConfig, echo: bool = False) -> AsyncEngine:
    engine = create_async_engine(
        db.pg_database_url,
        echo=echo
    )
    return engine


async def create_tables(engine: AsyncEngine) -> None:
    async with engine.begin() as conn:
        await conn.run_sync(BaseModel.metadata.create_all)


def create_session_pool(engine: AsyncEngine) -> Callable[[], AsyncContextManager[AsyncSession]]:
    session_pool = sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession)
    return session_pool

И вот код UoW (В нем я пытался решить эту проблему убрав строчку await self._session.begin(), но это не помогло):

from dataclasses import dataclass
from types import TracebackType
from typing import Self

from sqlalchemy.ext.asyncio import AsyncSession

from infrastructure.repositories.invite_links.sqlalchemy import SQLAlchemyInviteLinksRepository
from infrastructure.repositories.tags.sqlalchemy import SQLAlchemyTagsRepository, SQLAlchemyTagsTypesRepository
from infrastructure.uow.base import BaseUnitOfWork


@dataclass
class SQLAlchemyUnitOfWork(BaseUnitOfWork):
    _session: AsyncSession = None

    async def __aenter__(self) -> Self:
        # await self._session.begin()
        return self

    async def __aexit__(
            self,
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None
    ) -> None:
        try:
            if exc_type is None:
                await self._session.commit()
            else:
                await self._session.rollback()
        finally:
            await self._session.close()

    @property
    def invite_links(self) -> SQLAlchemyInviteLinksRepository:
        self._session_check()
        return SQLAlchemyInviteLinksRepository(self._session)

    @property
    def tags(self) -> SQLAlchemyTagsRepository:
        self._session_check()
        return SQLAlchemyTagsRepository(self._session)

    @property
    def tags_types(self) -> SQLAlchemyTagsTypesRepository:
        self._session_check()
        return SQLAlchemyTagsTypesRepository(self._session)

    def _session_check(self) -> None:
        if not self._session:
            raise ValueError("Session is not initialized")

Используется UoW в FastAPI поинтах (все сделано по доке Dishka).

И так же я прокидываю фабрику сессий в консьюмер:

lifespan.py:

from contextlib import asynccontextmanager

import aiojobs
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.orm import sessionmaker

from app.api.background import consume_in_background
from config import Config
from infrastructure.db.sqlalchemy.setup import create_tables
from infrastructure.message_brokers.kafka import KafkaMessageBroker


@asynccontextmanager
async def lifespan(app: FastAPI) -> FastAPI:
    container = app.state.dishka_container
    engine = await container.get(AsyncEngine)
    scheduler = aiojobs.Scheduler()
    broker = await container.get(KafkaMessageBroker)
    config = await container.get(Config)
    if config.app.is_dev or config.app.is_production:
        session_factory = await container.get(sessionmaker)
        job = await scheduler.spawn(consume_in_background(broker, session_factory, config))
    await create_tables(engine)
    yield
    if config.app.is_dev or config.app.is_production:
        await job.close()
    await scheduler.close()
    await container.close()

background.py

import logging

from dishka import AsyncContainer

from config import Config
from domain.entities.invite_links import InviteLink
from infrastructure.message_brokers.base import BaseMessageBroker
from infrastructure.repositories.filters.invite_links import InviteLinksFilter
from infrastructure.uow.sqlalchemy import SQLAlchemyUnitOfWork


async def consume_in_background(
    broker: BaseMessageBroker,
    config: Config,
    container: AsyncContainer,
) -> None:
    logger = logging.getLogger("LinkConsumer")
    async for message_dict in broker.start_consuming():
        message = message_dict["value"]
        topic = message_dict["topic"]
        link = message.get("invite_link")
        channel_id = message.get("channel_id")
        try:
            if topic == config.rp_config.CONSUME_TOPIC:
                if not link:
                    continue
                link = InviteLink(
                    channel_id=channel_id,
                    **link
                )
                async with container() as request_container:
                    uow: SQLAlchemyUnitOfWork = await request_container.get(SQLAlchemyUnitOfWork)
                    async with uow as unit_of_work:
                        if not await unit_of_work.invite_links.is_exists(link.id, link.channel_id):
                            await unit_of_work.invite_links.create_link(link)
            elif topic == config.rp_config.CONSUME_MERGE_ACCOUNTS_TOPIC:
                async with container() as request_container:
                    uow: SQLAlchemyUnitOfWork = await request_container.get(SQLAlchemyUnitOfWork)
                    async with uow as unit_of_work:
                        links = await unit_of_work.invite_links.get_all_links(
                            links_filter=InviteLinksFilter(
                                page_size=-1,
                                creator_id=message["old_account_id"],
                            )
                        )
                        links = links[0]
                        for link in links:
                            link.creator_id = message["new_account_id"]
                            await unit_of_work.invite_links.update_link(link)
            await broker.commit()
        except Exception as e:
            logger.error(f"Error while consuming message: {e}", exc_info=True)

Ответы (0 шт):