You've already forked DataMate
feat: enhance backend deployment, frontend file selection and synthesis task management (#129)
* feat: Implement data synthesis task management with database models and API endpoints * feat: Update Python version requirements and refine dependency constraints in configuration * fix: Correctly extract file values from selectedFilesMap in AddDataDialog * feat: Refactor synthesis task routes and enhance file task management in the API * feat: Enhance SynthesisTaskTab with tooltip actions and add chunk data retrieval in API
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
from fastapi import APIRouter
|
||||
|
||||
router = APIRouter(
|
||||
prefix="/synth",
|
||||
tags = ["synth"]
|
||||
prefix="/synthesis",
|
||||
tags = ["synthesis"]
|
||||
)
|
||||
|
||||
# Include sub-routers
|
||||
|
||||
@@ -18,7 +18,14 @@ from app.db.session import get_db
|
||||
from app.module.generation.schema.generation import (
|
||||
CreateSynthesisTaskRequest,
|
||||
DataSynthesisTaskItem,
|
||||
PagedDataSynthesisTaskResponse, SynthesisType)
|
||||
PagedDataSynthesisTaskResponse,
|
||||
SynthesisType,
|
||||
DataSynthesisFileTaskItem,
|
||||
PagedDataSynthesisFileTaskResponse,
|
||||
DataSynthesisChunkItem,
|
||||
PagedDataSynthesisChunkResponse,
|
||||
SynthesisDataItem,
|
||||
)
|
||||
from app.module.generation.service.generation_service import GenerationService
|
||||
from app.module.generation.service.prompt import get_prompt
|
||||
from app.module.shared.schema import StandardResponse
|
||||
@@ -219,19 +226,26 @@ async def delete_synthesis_task(
|
||||
data=None,
|
||||
)
|
||||
|
||||
|
||||
@router.delete("/task/{task_id}/{file_id}", response_model=StandardResponse[None])
|
||||
async def delete_synthesis_file_task(
|
||||
task_id: str,
|
||||
file_id: str,
|
||||
db: AsyncSession = Depends(get_db)
|
||||
):
|
||||
"""删除数据合成任务中的文件任务"""
|
||||
"""删除数据合成任务中的文件任务,同时刷新任务表中的文件/切片数量"""
|
||||
# 先获取任务和文件任务记录
|
||||
task = await db.get(DataSynthesisInstance, task_id)
|
||||
if not task:
|
||||
raise HTTPException(status_code=404, detail="Synthesis task not found")
|
||||
|
||||
file_task = await db.get(DataSynthesisFileInstance, file_id)
|
||||
if not file_task:
|
||||
raise HTTPException(status_code=404, detail="Synthesis file task not found")
|
||||
|
||||
# 删除 SynthesisData(根据文件任务ID)
|
||||
await db.execute(delete(SynthesisData).where(
|
||||
await db.execute(
|
||||
delete(SynthesisData).where(
|
||||
SynthesisData.synthesis_file_instance_id == file_id
|
||||
)
|
||||
)
|
||||
@@ -243,11 +257,28 @@ async def delete_synthesis_file_task(
|
||||
)
|
||||
|
||||
# 删除文件任务记录
|
||||
await db.execute(delete(DataSynthesisFileInstance).where(
|
||||
await db.execute(
|
||||
delete(DataSynthesisFileInstance).where(
|
||||
DataSynthesisFileInstance.id == file_id
|
||||
)
|
||||
)
|
||||
|
||||
# 刷新任务级别统计字段:总文件数、总文本块数、已处理文本块数
|
||||
if task.total_files and task.total_files > 0:
|
||||
task.total_files -= 1
|
||||
if task.total_files < 0:
|
||||
task.total_files = 0
|
||||
|
||||
await db.commit()
|
||||
await db.refresh(task)
|
||||
|
||||
return StandardResponse(
|
||||
code=200,
|
||||
message="success",
|
||||
data=None,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/prompt", response_model=StandardResponse[str])
|
||||
async def get_prompt_by_type(
|
||||
synth_type: SynthesisType,
|
||||
@@ -258,3 +289,157 @@ async def get_prompt_by_type(
|
||||
message="Success",
|
||||
data=prompt,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/task/{task_id}/files", response_model=StandardResponse[PagedDataSynthesisFileTaskResponse])
|
||||
async def list_synthesis_file_tasks(
|
||||
task_id: str,
|
||||
page: int = 1,
|
||||
page_size: int = 10,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
"""分页获取某个数据合成任务下的文件任务列表"""
|
||||
# 先校验任务是否存在
|
||||
task = await db.get(DataSynthesisInstance, task_id)
|
||||
if not task:
|
||||
raise HTTPException(status_code=404, detail="Synthesis task not found")
|
||||
|
||||
base_query = select(DataSynthesisFileInstance).where(
|
||||
DataSynthesisFileInstance.synthesis_instance_id == task_id
|
||||
)
|
||||
|
||||
count_q = select(func.count()).select_from(base_query.subquery())
|
||||
total = (await db.execute(count_q)).scalar_one()
|
||||
|
||||
if page < 1:
|
||||
page = 1
|
||||
if page_size < 1:
|
||||
page_size = 10
|
||||
|
||||
result = await db.execute(
|
||||
base_query.offset((page - 1) * page_size).limit(page_size)
|
||||
)
|
||||
rows = result.scalars().all()
|
||||
|
||||
file_items = [
|
||||
DataSynthesisFileTaskItem(
|
||||
id=row.id,
|
||||
synthesis_instance_id=row.synthesis_instance_id,
|
||||
file_name=row.file_name,
|
||||
source_file_id=row.source_file_id,
|
||||
target_file_location=row.target_file_location,
|
||||
status=row.status,
|
||||
total_chunks=row.total_chunks,
|
||||
processed_chunks=row.processed_chunks,
|
||||
created_at=row.created_at,
|
||||
updated_at=row.updated_at,
|
||||
created_by=row.created_by,
|
||||
updated_by=row.updated_by,
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
paged = PagedDataSynthesisFileTaskResponse(
|
||||
content=file_items,
|
||||
totalElements=total,
|
||||
totalPages=(total + page_size - 1) // page_size,
|
||||
page=page,
|
||||
size=page_size,
|
||||
)
|
||||
|
||||
return StandardResponse(
|
||||
code=200,
|
||||
message="Success",
|
||||
data=paged,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/file/{file_id}/chunks", response_model=StandardResponse[PagedDataSynthesisChunkResponse])
|
||||
async def list_chunks_by_file(
|
||||
file_id: str,
|
||||
page: int = 1,
|
||||
page_size: int = 10,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
"""根据文件任务 ID 分页查询 chunk 记录"""
|
||||
# 校验文件任务是否存在
|
||||
file_task = await db.get(DataSynthesisFileInstance, file_id)
|
||||
if not file_task:
|
||||
raise HTTPException(status_code=404, detail="Synthesis file task not found")
|
||||
|
||||
base_query = select(DataSynthesisChunkInstance).where(
|
||||
DataSynthesisChunkInstance.synthesis_file_instance_id == file_id
|
||||
)
|
||||
|
||||
count_q = select(func.count()).select_from(base_query.subquery())
|
||||
total = (await db.execute(count_q)).scalar_one()
|
||||
|
||||
if page < 1:
|
||||
page = 1
|
||||
if page_size < 1:
|
||||
page_size = 10
|
||||
|
||||
result = await db.execute(
|
||||
base_query.order_by(DataSynthesisChunkInstance.chunk_index.asc())
|
||||
.offset((page - 1) * page_size)
|
||||
.limit(page_size)
|
||||
)
|
||||
rows = result.scalars().all()
|
||||
|
||||
chunk_items = [
|
||||
DataSynthesisChunkItem(
|
||||
id=row.id,
|
||||
synthesis_file_instance_id=row.synthesis_file_instance_id,
|
||||
chunk_index=row.chunk_index,
|
||||
chunk_content=row.chunk_content,
|
||||
chunk_metadata=getattr(row, "chunk_metadata", None),
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
paged = PagedDataSynthesisChunkResponse(
|
||||
content=chunk_items,
|
||||
totalElements=total,
|
||||
totalPages=(total + page_size - 1) // page_size,
|
||||
page=page,
|
||||
size=page_size,
|
||||
)
|
||||
|
||||
return StandardResponse(
|
||||
code=200,
|
||||
message="Success",
|
||||
data=paged,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/chunk/{chunk_id}/data", response_model=StandardResponse[list[SynthesisDataItem]])
|
||||
async def list_synthesis_data_by_chunk(
|
||||
chunk_id: str,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
"""根据 chunk ID 查询所有合成结果数据"""
|
||||
# 可选:校验 chunk 是否存在
|
||||
chunk = await db.get(DataSynthesisChunkInstance, chunk_id)
|
||||
if not chunk:
|
||||
raise HTTPException(status_code=404, detail="Chunk not found")
|
||||
|
||||
result = await db.execute(
|
||||
select(SynthesisData).where(SynthesisData.chunk_instance_id == chunk_id)
|
||||
)
|
||||
rows = result.scalars().all()
|
||||
|
||||
items = [
|
||||
SynthesisDataItem(
|
||||
id=row.id,
|
||||
data=row.data,
|
||||
synthesis_file_instance_id=row.synthesis_file_instance_id,
|
||||
chunk_instance_id=row.chunk_instance_id,
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
return StandardResponse(
|
||||
code=200,
|
||||
message="Success",
|
||||
data=items,
|
||||
)
|
||||
|
||||
@@ -70,6 +70,67 @@ class PagedDataSynthesisTaskResponse(BaseModel):
|
||||
page: int
|
||||
size: int
|
||||
|
||||
|
||||
class DataSynthesisFileTaskItem(BaseModel):
|
||||
"""数据合成任务下的文件任务项"""
|
||||
id: str
|
||||
synthesis_instance_id: str
|
||||
file_name: str
|
||||
source_file_id: str
|
||||
target_file_location: str
|
||||
status: Optional[str] = None
|
||||
total_chunks: int
|
||||
processed_chunks: int
|
||||
created_at: Optional[datetime] = None
|
||||
updated_at: Optional[datetime] = None
|
||||
created_by: Optional[str] = None
|
||||
updated_by: Optional[str] = None
|
||||
|
||||
class Config:
|
||||
orm_mode = True
|
||||
|
||||
|
||||
class PagedDataSynthesisFileTaskResponse(BaseModel):
|
||||
"""分页数据合成任务文件任务响应"""
|
||||
content: List[DataSynthesisFileTaskItem]
|
||||
totalElements: int
|
||||
totalPages: int
|
||||
page: int
|
||||
size: int
|
||||
|
||||
|
||||
class DataSynthesisChunkItem(BaseModel):
|
||||
"""数据合成文件下的 chunk 记录"""
|
||||
id: str
|
||||
synthesis_file_instance_id: str
|
||||
chunk_index: Optional[int] = None
|
||||
chunk_content: Optional[str] = None
|
||||
chunk_metadata: Optional[Dict[str, Any]] = None
|
||||
|
||||
class Config:
|
||||
orm_mode = True
|
||||
|
||||
|
||||
class PagedDataSynthesisChunkResponse(BaseModel):
|
||||
"""分页 chunk 列表响应"""
|
||||
content: List[DataSynthesisChunkItem]
|
||||
totalElements: int
|
||||
totalPages: int
|
||||
page: int
|
||||
size: int
|
||||
|
||||
|
||||
class SynthesisDataItem(BaseModel):
|
||||
"""合成结果数据项"""
|
||||
id: str
|
||||
data: Optional[Dict[str, Any]] = None
|
||||
synthesis_file_instance_id: str
|
||||
chunk_instance_id: str
|
||||
|
||||
class Config:
|
||||
orm_mode = True
|
||||
|
||||
|
||||
class ChatRequest(BaseModel):
|
||||
"""聊天请求参数"""
|
||||
model_id: str
|
||||
|
||||
@@ -168,11 +168,11 @@ class GenerationService:
|
||||
self.db.add(chunk_record)
|
||||
|
||||
# 更新文件任务的分块数量
|
||||
file_task.chunk_count = len(chunks)
|
||||
file_task.total_chunks = len(chunks)
|
||||
file_task.status = "processing"
|
||||
|
||||
await self.db.refresh(file_task)
|
||||
await self.db.commit()
|
||||
await self.db.refresh(file_task)
|
||||
|
||||
async def _invoke_llm_for_chunks(
|
||||
self,
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from app.module.generation.schema.generation import SynthesisType
|
||||
|
||||
QA_PROMPT="""
|
||||
# 角色
|
||||
QA_PROMPT="""# 角色
|
||||
你是一位专业的AI助手,擅长从给定的文本中提取关键信息并创建用于教学和测试的问答对。
|
||||
|
||||
# 任务
|
||||
@@ -11,7 +10,7 @@ QA_PROMPT="""
|
||||
{document}
|
||||
|
||||
# 要求与指令
|
||||
1. **问题类型**:生成{synthesis_count - 1}-{synthesis_count + 1}个问答对。问题类型应多样化,包括但不限于:
|
||||
1. **问题类型**:生成 {synthesis_count} 个左右的问答对。问题类型应多样化,包括但不限于:
|
||||
* **事实性**:基于文本中明确提到的事实。
|
||||
* **理解性**:需要理解上下文和概念。
|
||||
* **归纳性**:需要总结或归纳多个信息点。
|
||||
@@ -30,8 +29,7 @@ QA_PROMPT="""
|
||||
"""
|
||||
|
||||
|
||||
COT_PROMPT="""
|
||||
# 角色
|
||||
COT_PROMPT="""# 角色
|
||||
你是一位专业的数据合成专家,擅长基于给定的原始文档和 COT(Chain of Thought,思维链)逻辑,生成高质量、符合实际应用场景的 COT 数据。COT 数据需包含清晰的问题、逐步推理过程和最终结论,能完整还原解决问题的思考路径。
|
||||
|
||||
# 任务
|
||||
@@ -41,7 +39,7 @@ COT_PROMPT="""
|
||||
{document}
|
||||
|
||||
# 要求与指令
|
||||
1. **数量要求**:生成 {min\_count}-{max\_count} 条 COT 数据(min\_count={synthesis\_count-1},max\_count={synthesis\_count+1})。
|
||||
1. **数量要求**:生成 {synthesis_count} 条左右的 COT 数据。
|
||||
2. **内容要求**:
|
||||
* 每条 COT 数据需包含 “问题”“思维链推理”“最终结论” 三部分,逻辑闭环,推理步骤清晰、连贯,不跳跃关键环节。
|
||||
* 问题需基于文档中的事实信息、概念关联或逻辑疑问,是读完文档后自然产生的有价值问题(避免无意义或过于简单的问题)。
|
||||
|
||||
Reference in New Issue
Block a user