You've already forked DataMate
feat(auth): 为数据管理和RAG服务增加资源访问控制
- 在DatasetApplicationService中注入ResourceAccessService并添加所有权验证 - 在KnowledgeSetApplicationService中注入ResourceAccessService并添加所有权验证 - 修改DatasetRepository接口和实现类,增加按创建者过滤的方法 - 修改KnowledgeSetRepository接口和实现类,增加按创建者过滤的方法 - 在RAG索引器服务中添加知识库访问权限检查和作用域过滤 - 更新实体元对象处理器以使用请求用户上下文获取当前用户 - 在前端设置页面添加用户权限管理功能和角色权限控制 - 为Python标注服务增加用户上下文和数据集访问权限验证
This commit is contained in:
69
runtime/datamate-python/app/module/annotation/security.py
Normal file
69
runtime/datamate-python/app/module/annotation/security.py
Normal file
@@ -0,0 +1,69 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Tuple
|
||||
|
||||
from fastapi import HTTPException, Request
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.db.models.dataset_management import Dataset
|
||||
|
||||
HEADER_USER_ID = "X-User-Id"
|
||||
HEADER_USER_NAME = "X-User-Name"
|
||||
HEADER_USER_ROLES = "X-User-Roles"
|
||||
ADMIN_ROLE_CODE = "ROLE_ADMIN"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RequestUserContext:
|
||||
user_id: str
|
||||
username: str | None
|
||||
roles: Tuple[str, ...]
|
||||
|
||||
@property
|
||||
def is_admin(self) -> bool:
|
||||
return any(role.upper() == ADMIN_ROLE_CODE for role in self.roles)
|
||||
|
||||
|
||||
def get_request_user_context(request: Request) -> RequestUserContext:
|
||||
user_id = (request.headers.get(HEADER_USER_ID) or "").strip()
|
||||
username = (request.headers.get(HEADER_USER_NAME) or "").strip() or None
|
||||
role_header = request.headers.get(HEADER_USER_ROLES) or ""
|
||||
roles = tuple(
|
||||
role.strip()
|
||||
for role in role_header.split(",")
|
||||
if role and role.strip()
|
||||
)
|
||||
if not user_id:
|
||||
raise HTTPException(status_code=403, detail="权限不足:缺少用户身份")
|
||||
return RequestUserContext(user_id=user_id, username=username, roles=roles)
|
||||
|
||||
|
||||
def ensure_dataset_owner_access(
|
||||
user_context: RequestUserContext,
|
||||
dataset_owner_user_id: str | None,
|
||||
dataset_id: str,
|
||||
) -> None:
|
||||
if user_context.is_admin:
|
||||
return
|
||||
if not dataset_owner_user_id or dataset_owner_user_id != user_context.user_id:
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail=f"无权访问数据集: {dataset_id}",
|
||||
)
|
||||
|
||||
|
||||
async def assert_dataset_access(
|
||||
db: AsyncSession,
|
||||
dataset_id: str,
|
||||
user_context: RequestUserContext,
|
||||
) -> None:
|
||||
owner_result = await db.execute(
|
||||
select(Dataset.created_by).where(Dataset.id == dataset_id)
|
||||
)
|
||||
dataset_owner = owner_result.scalar_one_or_none()
|
||||
if dataset_owner is None:
|
||||
raise HTTPException(status_code=404, detail=f"数据集不存在: {dataset_id}")
|
||||
ensure_dataset_owner_access(user_context, str(dataset_owner), dataset_id)
|
||||
|
||||
Reference in New Issue
Block a user