You've already forked DataMate
- 简化catch语句移除不必要的异常变量 - 删除无用的FormData条件判断代码 - 将变量声明从let改为const提升代码质量 - 移除响应拦截器中的冗余参数传递 - 在数据集模式模块中添加PDF文本提取相关的请求响应模型 - 更新模块导出列表包含新的PDF提取接口类型定义
191 lines
7.6 KiB
Python
191 lines
7.6 KiB
Python
import datetime
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from fastapi import HTTPException
|
|
from langchain_community.document_loaders import PyPDFLoader
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.logging import get_logger
|
|
from app.db.models import Dataset, DatasetFiles
|
|
from app.module.dataset.schema.pdf_extract import PdfTextExtractResponse
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
PDF_FILE_TYPE = "pdf"
|
|
TEXT_FILE_TYPE = "txt"
|
|
TEXT_FILE_EXTENSION = ".txt"
|
|
DERIVED_METADATA_KEY = "derived_from_file_id"
|
|
DERIVED_METADATA_NAME_KEY = "derived_from_file_name"
|
|
DERIVED_METADATA_TYPE_KEY = "derived_from_file_type"
|
|
DERIVED_METADATA_PARSER_KEY = "parser"
|
|
DERIVED_METADATA_PARSER_VALUE = "PyPDFLoader"
|
|
|
|
|
|
class PdfTextExtractService:
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def extract_pdf_to_text(self, dataset_id: str, file_id: str) -> PdfTextExtractResponse:
|
|
dataset = await self._get_dataset(dataset_id)
|
|
file_record = await self._get_file_record(dataset_id, file_id)
|
|
self._validate_dataset_and_file(dataset, file_record)
|
|
|
|
source_path = self._resolve_source_path(file_record)
|
|
dataset_path = self._resolve_dataset_path(dataset)
|
|
target_path = self._resolve_target_path(dataset_path, source_path, file_record, file_id)
|
|
|
|
existing_record = await self._find_existing_text_record(dataset_id, target_path)
|
|
if existing_record:
|
|
return self._build_response(dataset_id, file_id, existing_record)
|
|
|
|
if target_path.exists():
|
|
file_size = self._get_file_size(target_path)
|
|
record = await self._create_text_file_record(dataset, file_record, target_path, file_size)
|
|
return self._build_response(dataset_id, file_id, record)
|
|
|
|
text_content = self._parse_pdf(source_path)
|
|
self._write_text_file(target_path, text_content)
|
|
file_size = self._get_file_size(target_path)
|
|
record = await self._create_text_file_record(dataset, file_record, target_path, file_size)
|
|
return self._build_response(dataset_id, file_id, record)
|
|
|
|
async def _get_dataset(self, dataset_id: str) -> Dataset:
|
|
result = await self.db.execute(select(Dataset).where(Dataset.id == dataset_id))
|
|
dataset = result.scalar_one_or_none()
|
|
if not dataset:
|
|
raise HTTPException(status_code=404, detail=f"Dataset not found: {dataset_id}")
|
|
return dataset
|
|
|
|
async def _get_file_record(self, dataset_id: str, file_id: str) -> DatasetFiles:
|
|
result = await self.db.execute(
|
|
select(DatasetFiles).where(
|
|
DatasetFiles.id == file_id,
|
|
DatasetFiles.dataset_id == dataset_id,
|
|
)
|
|
)
|
|
file_record = result.scalar_one_or_none()
|
|
if not file_record:
|
|
raise HTTPException(status_code=404, detail=f"File not found: {file_id}")
|
|
return file_record
|
|
|
|
@staticmethod
|
|
def _validate_dataset_and_file(dataset: Dataset, file_record: DatasetFiles) -> None:
|
|
dataset_type = str(getattr(dataset, "dataset_type", "") or "").upper()
|
|
if dataset_type != "TEXT":
|
|
raise HTTPException(status_code=400, detail="Only TEXT datasets are supported")
|
|
file_type = str(getattr(file_record, "file_type", "") or "").lower()
|
|
if file_type != PDF_FILE_TYPE:
|
|
raise HTTPException(status_code=400, detail="Only PDF files are supported")
|
|
|
|
@staticmethod
|
|
def _resolve_source_path(file_record: DatasetFiles) -> Path:
|
|
source_path = Path(str(file_record.file_path)).expanduser().resolve()
|
|
if not source_path.exists():
|
|
raise HTTPException(status_code=404, detail="PDF file not found on disk")
|
|
return source_path
|
|
|
|
@staticmethod
|
|
def _resolve_dataset_path(dataset: Dataset) -> Path:
|
|
dataset_path_value = str(getattr(dataset, "path", "") or "").strip()
|
|
if not dataset_path_value:
|
|
raise HTTPException(status_code=500, detail="Dataset path is empty")
|
|
dataset_path = Path(dataset_path_value).expanduser().resolve()
|
|
dataset_path.mkdir(parents=True, exist_ok=True)
|
|
return dataset_path
|
|
|
|
@staticmethod
|
|
def _build_output_filename(file_record: DatasetFiles, file_id: str) -> str:
|
|
original_name = str(getattr(file_record, "file_name", "") or "").strip()
|
|
if not original_name:
|
|
original_name = f"{file_id}.pdf"
|
|
return f"{original_name}{TEXT_FILE_EXTENSION}"
|
|
|
|
def _resolve_target_path(
|
|
self,
|
|
dataset_path: Path,
|
|
source_path: Path,
|
|
file_record: DatasetFiles,
|
|
file_id: str,
|
|
) -> Path:
|
|
output_name = self._build_output_filename(file_record, file_id)
|
|
if dataset_path in source_path.parents:
|
|
target_dir = source_path.parent
|
|
else:
|
|
target_dir = dataset_path
|
|
target_dir = target_dir.resolve()
|
|
if target_dir != dataset_path and dataset_path not in target_dir.parents:
|
|
raise HTTPException(status_code=400, detail="Target path is outside dataset path")
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
return target_dir / output_name
|
|
|
|
async def _find_existing_text_record(self, dataset_id: str, target_path: Path) -> DatasetFiles | None:
|
|
result = await self.db.execute(
|
|
select(DatasetFiles).where(
|
|
DatasetFiles.dataset_id == dataset_id,
|
|
DatasetFiles.file_path == str(target_path),
|
|
)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
def _parse_pdf(source_path: Path) -> str:
|
|
loader = PyPDFLoader(str(source_path))
|
|
docs = loader.load()
|
|
contents = [doc.page_content for doc in docs if doc.page_content]
|
|
return "\n\n".join(contents)
|
|
|
|
@staticmethod
|
|
def _write_text_file(target_path: Path, content: str) -> None:
|
|
with open(target_path, "w", encoding="utf-8") as handle:
|
|
handle.write(content or "")
|
|
|
|
@staticmethod
|
|
def _get_file_size(path: Path) -> int:
|
|
try:
|
|
return int(os.path.getsize(path))
|
|
except OSError:
|
|
return 0
|
|
|
|
async def _create_text_file_record(
|
|
self,
|
|
dataset: Dataset,
|
|
source_file: DatasetFiles,
|
|
target_path: Path,
|
|
file_size: int,
|
|
) -> DatasetFiles:
|
|
metadata = {
|
|
DERIVED_METADATA_KEY: str(getattr(source_file, "id", "")),
|
|
DERIVED_METADATA_NAME_KEY: str(getattr(source_file, "file_name", "")),
|
|
DERIVED_METADATA_TYPE_KEY: str(getattr(source_file, "file_type", "")),
|
|
DERIVED_METADATA_PARSER_KEY: DERIVED_METADATA_PARSER_VALUE,
|
|
}
|
|
record = DatasetFiles(
|
|
dataset_id=dataset.id, # type: ignore[arg-type]
|
|
file_name=target_path.name,
|
|
file_path=str(target_path),
|
|
file_type=TEXT_FILE_TYPE,
|
|
file_size=file_size,
|
|
dataset_filemetadata=metadata,
|
|
last_access_time=datetime.datetime.now(datetime.UTC),
|
|
)
|
|
self.db.add(record)
|
|
dataset.file_count = (dataset.file_count or 0) + 1
|
|
dataset.size_bytes = (dataset.size_bytes or 0) + file_size
|
|
dataset.status = "ACTIVE"
|
|
await self.db.commit()
|
|
await self.db.refresh(record)
|
|
return record
|
|
|
|
@staticmethod
|
|
def _build_response(dataset_id: str, file_id: str, record: DatasetFiles) -> PdfTextExtractResponse:
|
|
return PdfTextExtractResponse(
|
|
datasetId=dataset_id,
|
|
sourceFileId=file_id,
|
|
textFileId=str(record.id),
|
|
textFileName=str(record.file_name),
|
|
textFilePath=str(record.file_path),
|
|
textFileSize=int(record.file_size or 0),
|
|
)
|