Files
hefanli 63f4e3e447 refactor: modify data collection to python implementation (#214)
* feature: LabelStudio jumps without login

* refactor: modify data collection to python implementation

* refactor: modify data collection to python implementation

* refactor: modify data collection to python implementation

* refactor: modify data collection to python implementation

* refactor: modify data collection to python implementation

* refactor: modify data collection to python implementation

* fix: remove terrabase dependency

* feature: add the collection task executions page and the collection template page

* fix: fix the collection task creation

* fix: fix the collection task creation
2025-12-30 18:48:43 +08:00

121 lines
4.0 KiB
Python

import math
import os
from pathlib import Path
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import FileResponse
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logging import get_logger
from app.db.models.data_collection import TaskExecution
from app.db.session import get_db
from app.module.collection.schema.collection import TaskExecutionBase, converter_execution_to_response
from app.module.shared.schema import StandardResponse, PaginatedData
router = APIRouter(
prefix="/executions",
tags=["data-collection/executions"],
)
logger = get_logger(__name__)
@router.get("", response_model=StandardResponse[PaginatedData[TaskExecutionBase]])
async def list_executions(
page: int = 1,
size: int = 20,
task_id: Optional[str] = Query(None, description="任务ID"),
task_name: Optional[str] = Query(None, description="任务名称模糊查询"),
start_time: Optional[datetime] = Query(None, description="开始执行时间范围-起(started_at >= start_time)"),
end_time: Optional[datetime] = Query(None, description="开始执行时间范围-止(started_at <= end_time)"),
db: AsyncSession = Depends(get_db)
):
"""分页查询归集任务执行记录"""
try:
query = select(TaskExecution)
if task_id:
query = query.where(TaskExecution.task_id == task_id)
if task_name:
query = query.where(TaskExecution.task_name.ilike(f"%{task_name}%"))
if start_time:
query = query.where(TaskExecution.started_at >= start_time)
if end_time:
query = query.where(TaskExecution.started_at <= end_time)
count_query = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_query)).scalar_one()
offset = (page - 1) * size
executions = (await db.execute(
query.order_by(TaskExecution.created_at.desc())
.offset(offset)
.limit(size)
)).scalars().all()
items = [converter_execution_to_response(exe) for exe in executions]
total_pages = math.ceil(total / size) if total > 0 else 0
return StandardResponse(
code=200,
message="Success",
data=PaginatedData(
content=items,
total_elements=total,
total_pages=total_pages,
page=page,
size=size,
)
)
except Exception as e:
logger.error(f"Failed to list task executions: {str(e)}", e)
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/{execution_id}/log")
async def get_execution_log(
execution_id: str,
db: AsyncSession = Depends(get_db)
):
"""获取执行记录对应的日志文件内容"""
try:
execution = await db.get(TaskExecution, execution_id)
if not execution:
raise HTTPException(status_code=404, detail="Execution record not found")
log_path = getattr(execution, "log_path", None)
if not log_path:
raise HTTPException(status_code=404, detail="Log path not found")
path = Path(str(log_path))
if not path.is_absolute():
path = Path(os.getcwd()) / path
path = path.resolve()
if not path.exists() or not path.is_file():
raise HTTPException(status_code=404, detail="Log file not found")
filename = path.name
headers = {
"Content-Disposition": f'inline; filename="{filename}"'
}
return FileResponse(
path=str(path),
media_type="text/plain; charset=utf-8",
filename=filename,
headers=headers,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get execution log: {str(e)}", e)
raise HTTPException(status_code=500, detail="Internal server error")