SQLAlchemy ошибка с транзакцией. Cannot use Connection.transaction() in a manually started transaction
Код используется во многих проектах и все работает. Но в одном сервисе иногда на проде происходит ошибка:
sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot use Connection.transaction() in a manually started transaction
Причем на деве и локально все ок.
Проблема в том что я не могу понять что такого натыкивают на проде что приводит к такой ошибке и следовательно у меня нет идей как решить проблему ведь в остальных сервисах и в этом сервисе на деве все ок.
Я уверен что нигде в коде нет ручного управления транзакциями кроме как в IoC контейнере и консьюмере.
Вот код IoC контейнера:
from dishka import Provider, Scope, provide
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.orm import sessionmaker
from config import Config, get_config
from infrastructure.db.sqlalchemy.setup import create_engine, create_session_pool
from infrastructure.uow.sqlalchemy import SQLAlchemyUnitOfWork
class BaseProvider(Provider):
@provide(scope=Scope.APP)
def config(self) -> Config:
return get_config()
@provide(scope=Scope.APP)
def sqlalchemy_engine(self, config: Config) -> AsyncEngine:
return create_engine(config.pg_config)
@provide(scope=Scope.APP)
def session_pool(self, sqlalchemy_engine: AsyncEngine) -> sessionmaker:
return create_session_pool(sqlalchemy_engine)
@provide(scope=Scope.REQUEST)
def sqlalchemy_uow(self, session_factory: sessionmaker) -> SQLAlchemyUnitOfWork:
return SQLAlchemyUnitOfWork(_session=session_factory())
Вот код для сетапа бд (setup.py):
from typing import Callable, AsyncContextManager
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, AsyncEngine
from sqlalchemy.orm import sessionmaker
from config import PGConfig
from infrastructure.db.sqlalchemy.models import BaseModel
def create_engine(db: PGConfig, echo: bool = False) -> AsyncEngine:
engine = create_async_engine(
db.pg_database_url,
echo=echo
)
return engine
async def create_tables(engine: AsyncEngine) -> None:
async with engine.begin() as conn:
await conn.run_sync(BaseModel.metadata.create_all)
def create_session_pool(engine: AsyncEngine) -> Callable[[], AsyncContextManager[AsyncSession]]:
session_pool = sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession)
return session_pool
И вот код UoW (В нем я пытался решить эту проблему убрав строчку await self._session.begin(), но это не помогло):
from dataclasses import dataclass
from types import TracebackType
from typing import Self
from sqlalchemy.ext.asyncio import AsyncSession
from infrastructure.repositories.invite_links.sqlalchemy import SQLAlchemyInviteLinksRepository
from infrastructure.repositories.tags.sqlalchemy import SQLAlchemyTagsRepository, SQLAlchemyTagsTypesRepository
from infrastructure.uow.base import BaseUnitOfWork
@dataclass
class SQLAlchemyUnitOfWork(BaseUnitOfWork):
_session: AsyncSession = None
async def __aenter__(self) -> Self:
# await self._session.begin()
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None
) -> None:
try:
if exc_type is None:
await self._session.commit()
else:
await self._session.rollback()
finally:
await self._session.close()
@property
def invite_links(self) -> SQLAlchemyInviteLinksRepository:
self._session_check()
return SQLAlchemyInviteLinksRepository(self._session)
@property
def tags(self) -> SQLAlchemyTagsRepository:
self._session_check()
return SQLAlchemyTagsRepository(self._session)
@property
def tags_types(self) -> SQLAlchemyTagsTypesRepository:
self._session_check()
return SQLAlchemyTagsTypesRepository(self._session)
def _session_check(self) -> None:
if not self._session:
raise ValueError("Session is not initialized")
Используется UoW в FastAPI поинтах (все сделано по доке Dishka).
И так же я прокидываю фабрику сессий в консьюмер:
lifespan.py:
from contextlib import asynccontextmanager
import aiojobs
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.orm import sessionmaker
from app.api.background import consume_in_background
from config import Config
from infrastructure.db.sqlalchemy.setup import create_tables
from infrastructure.message_brokers.kafka import KafkaMessageBroker
@asynccontextmanager
async def lifespan(app: FastAPI) -> FastAPI:
container = app.state.dishka_container
engine = await container.get(AsyncEngine)
scheduler = aiojobs.Scheduler()
broker = await container.get(KafkaMessageBroker)
config = await container.get(Config)
if config.app.is_dev or config.app.is_production:
session_factory = await container.get(sessionmaker)
job = await scheduler.spawn(consume_in_background(broker, session_factory, config))
await create_tables(engine)
yield
if config.app.is_dev or config.app.is_production:
await job.close()
await scheduler.close()
await container.close()
background.py
import logging
from dishka import AsyncContainer
from config import Config
from domain.entities.invite_links import InviteLink
from infrastructure.message_brokers.base import BaseMessageBroker
from infrastructure.repositories.filters.invite_links import InviteLinksFilter
from infrastructure.uow.sqlalchemy import SQLAlchemyUnitOfWork
async def consume_in_background(
broker: BaseMessageBroker,
config: Config,
container: AsyncContainer,
) -> None:
logger = logging.getLogger("LinkConsumer")
async for message_dict in broker.start_consuming():
message = message_dict["value"]
topic = message_dict["topic"]
link = message.get("invite_link")
channel_id = message.get("channel_id")
try:
if topic == config.rp_config.CONSUME_TOPIC:
if not link:
continue
link = InviteLink(
channel_id=channel_id,
**link
)
async with container() as request_container:
uow: SQLAlchemyUnitOfWork = await request_container.get(SQLAlchemyUnitOfWork)
async with uow as unit_of_work:
if not await unit_of_work.invite_links.is_exists(link.id, link.channel_id):
await unit_of_work.invite_links.create_link(link)
elif topic == config.rp_config.CONSUME_MERGE_ACCOUNTS_TOPIC:
async with container() as request_container:
uow: SQLAlchemyUnitOfWork = await request_container.get(SQLAlchemyUnitOfWork)
async with uow as unit_of_work:
links = await unit_of_work.invite_links.get_all_links(
links_filter=InviteLinksFilter(
page_size=-1,
creator_id=message["old_account_id"],
)
)
links = links[0]
for link in links:
link.creator_id = message["new_account_id"]
await unit_of_work.invite_links.update_link(link)
await broker.commit()
except Exception as e:
logger.error(f"Error while consuming message: {e}", exc_info=True)