Как сделать ручки на FastAPI, работающие после авторизации?
Пожалуйста, подскажите, как сделать ручки, которые будут работать только после авторизации пользователя? Нужно, чтобы так работали только ручки, которые находятся ниже строчки files = []. На данный момент всё работает и без авторизации, хотя так не должно работать
import uvicorn
from fastapi import FastAPI, HTTPException, Response, Depends, UploadFile
from fastapi.responses import StreamingResponse, FileResponse
from pydantic import BaseModel, Field
from typing import Annotated, List
from authx import AuthX, AuthXConfig
from sqlalchemy import select
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from passlib.context import CryptContext
# Хэширование паролей
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Конфигурация FastAPI и создание сессии БД
app = FastAPI() # Объявляем приложение FastAPI
engine = create_async_engine("sqlite+aiosqlite:///users.db", echo=True) # Объявляем асинхронный движок нашей БД
new_session = async_sessionmaker(engine, expire_on_commit=False) # Запускаем асинхронную сессию с сохранением объектов
async def get_session():
"Обращение к сессии БД"
async with new_session() as session:
yield session
SessionDep = Annotated[AsyncSession, Depends(get_session)] # Создаём зависимость сессии БД от FastAPI
class Base(DeclarativeBase):
pass
class UserModel(Base): # Создаём таблицу в БД, в которой будут находиться данные пользователей
__tablename__ = "users"
# Привязываем данные к стоблцам таблицы
uid: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, nullable=False)
username: Mapped[str] = mapped_column(unique=True, index=True)
password: Mapped[str]
class UserSchema(BaseModel):
username: str = Field(max_length=10)
password: str = Field(min_length=8, max_length=16)
# Конфигурация AuthX
config = AuthXConfig() # Конфигурация AuthX
config.JWT_SECRET_KEY = "kirieshki" # Секретный ключ для создания JWT-токенов
config.JWT_ACCESS_COOKIE_NAME = "access_token" # Название куки-токена
config.JWT_TOKEN_LOCATION = ['cookies'] # Расположение куки-токенов
security = AuthX(config=config) # Обозначаем конфиг для модуля AuthX
@app.post("/register",
tags=["Авторизация"],
summary="Зарегистрировать аккаунт")
async def register(creds: UserSchema,
response: Response,
session: SessionDep):
"Регистрация пользователя"
# Проверка на существование пользователя с таким же username
check_user = select(UserModel).where(UserModel.username == creds.username)
existing_user = await session.execute(check_user)
if existing_user.scalar_one_or_none():
raise HTTPException(status_code=400, detail="User already exist")
# Хэширование паролей перед сохранением
hashed_password = password_context.hash(creds.password)
# Регистрация нового пользователя, если уже существующй в БД username не найден
new_user = UserModel(username=creds.username, password=hashed_password) # Создание нового пользователя
session.add(new_user) # Добавление нового пользователя в БД
await session.commit() # Сохранение данных в БД
await session.refresh(new_user) # Обновление информации в БД
# Создание JWT-токена с payload
access_token = security.create_access_token(uid=str(new_user.uid))
security.set_access_cookies(access_token, response)
return {"message": "User has been registered"}
@app.post("/login",
tags=["Авторизация"],
summary="Войти в аккаунт")
async def login(creds: UserSchema,
response: Response,
session: SessionDep):
"Вход пользователя"
# Проверка на существование в БД пользователя с таким же именем и паролем
check_user = select(UserModel).where(UserModel.username == creds.username)
existing_user = await session.execute(check_user)
user = existing_user.scalar_one_or_none()
# Проверяем, существует ли пользователь и совпадает ли хэш пароля с паролем
if not user or not password_context.verify(creds.password, user.password):
raise HTTPException(status_code=401, detail="Invalid username or password")
# Создание куки-токена
access_token = security.create_access_token(uid=str(user.uid))
security.set_access_cookies(access_token, response)
return {"Success": True}
@app.post("/reset",
tags=["Работа с БД"],
summary="Сброс БД")
async def reset_database():
"Сброс БД"
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
return {"База данных успешно сброшена": True}
files = [] # Список файлов, загружаемых на сервер
@app.get("/get_filenames",
tags=["Получение файлов"],
summary="Получить названия файлов")
async def get_filenames():
if len(files) == 0: # Проверка на наличие загруженных файлов
return {"File list is empty"}
else:
return files
def iterfile(filename: str): # Обработчик файлов. Нарезает файлы на чанки для постепенной загрузки
"Обработка файлов / нарезка на чанки"
with open(filename, "rb") as file:
while chunk := file.read(1024 * 1024):
yield chunk
@app.get("/get_files",
tags=["Получение файлов"],
summary="Получить файл")
async def get_file(filename: str):
return FileResponse(filename)
@app.get("/streaming/{filename}",
tags=["Получение файлов"],
summary="Получение файла в стриминге")
async def get_streaming_file(filename: str):
return StreamingResponse(iterfile(filename))
@app.post("/upload",
tags=["Добавление файлов"],
summary="Загрузка файла")
async def upload_file(uploaded_file: UploadFile):
"Загрузка файла"
file = uploaded_file.file
filename = uploaded_file.filename
with open(f"1_{filename}", "wb") as f:
f.write(file.read())
files.append(f"1_{filename}")
return {"File was uploaded": True}
@app.post("/upload_multiple",
tags=["Добавление файлов"],
summary="Загрузка нескольких файлов")
async def upload_files(uploaded_files: list[UploadFile]):
for uploaded_file in upload_files:
file = uploaded_file.file
filename = uploaded_file.filename
with open(f"1_{filename}", "wb") as f:
f.write(file.read())
files.append(f"1_{filename}")
return {"Multiple files were uploaded": True}
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)
Ответы (1 шт):
Автор решения: SVBazuev
→ Ссылка
Если Вы взгляните на аргументы,
которые могут быть переданы декоратору @app.post():
(method) def post(
path: str,
*,
response_model: Any = Default(None),
status_code: int | None = None,
tags: List[str | Enum] | None = None,
dependencies: Sequence[Depends] | None = None,
summary: str | None = None,
description: str | None = None,
response_description: str = "Successful Response",
responses: Dict[int | str, Dict[str, Any]] | None = None,
deprecated: bool | None = None,
operation_id: str | None = None,
response_model_include: IncEx | None = None,
response_model_exclude: IncEx | None = None,
response_model_by_alias: bool = True,
response_model_exclude_unset: bool = False,
response_model_exclude_defaults: bool = False,
response_model_exclude_none: bool = False,
include_in_schema: bool = True,
response_class: type[Response] = Default(JSONResponse),
name: str | None = None,
callbacks: List[BaseRoute] | None = None,
openapi_extra: Dict[str, Any] | None = None,
generate_unique_id_function: (APIRoute) -> str = Default(generate_unique_id)
) -> ((DecoratedCallable@post) -> DecoratedCallable@post)
^^^ то найдёте там dependencies ❕
Вам будет нужна функия для проверки токена:
async def get_current_user(token: str = Security(security)):
try:
payload = security.decode_token(token)
if 'uid' not in payload:
raise HTTPException(status_code=401, detail="Invalid token payload")
return payload['uid']
except Exception as e:
raise HTTPException(status_code=401, detail="Invalid authentication credentials") from e
Далее можете использовать её в декораторе:
@app.get("/get_files",
tags=["Получение файлов"],
summary="Получить файл",
dependencies=[Depends(get_current_user)]) # Добавляем зависимость
async def get_file(filename: str):
return FileResponse(filename)