import datetime import os from pathlib import Path from fastapi import HTTPException from langchain_community.document_loaders import PyPDFLoader from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.logging import get_logger from app.db.models import Dataset, DatasetFiles from app.module.dataset.schema.pdf_extract import PdfTextExtractResponse logger = get_logger(__name__) PDF_FILE_TYPE = "pdf" TEXT_FILE_TYPE = "txt" TEXT_FILE_EXTENSION = ".txt" DERIVED_METADATA_KEY = "derived_from_file_id" DERIVED_METADATA_NAME_KEY = "derived_from_file_name" DERIVED_METADATA_TYPE_KEY = "derived_from_file_type" DERIVED_METADATA_PARSER_KEY = "parser" DERIVED_METADATA_PARSER_VALUE = "PyPDFLoader" class PdfTextExtractService: def __init__(self, db: AsyncSession): self.db = db async def extract_pdf_to_text(self, dataset_id: str, file_id: str) -> PdfTextExtractResponse: dataset = await self._get_dataset(dataset_id) file_record = await self._get_file_record(dataset_id, file_id) self._validate_dataset_and_file(dataset, file_record) source_path = self._resolve_source_path(file_record) dataset_path = self._resolve_dataset_path(dataset) target_path = self._resolve_target_path(dataset_path, source_path, file_record, file_id) existing_record = await self._find_existing_text_record(dataset_id, target_path) if existing_record: return self._build_response(dataset_id, file_id, existing_record) if target_path.exists(): file_size = self._get_file_size(target_path) record = await self._create_text_file_record(dataset, file_record, target_path, file_size) return self._build_response(dataset_id, file_id, record) text_content = self._parse_pdf(source_path) self._write_text_file(target_path, text_content) file_size = self._get_file_size(target_path) record = await self._create_text_file_record(dataset, file_record, target_path, file_size) return self._build_response(dataset_id, file_id, record) async def _get_dataset(self, dataset_id: str) -> Dataset: result = await self.db.execute(select(Dataset).where(Dataset.id == dataset_id)) dataset = result.scalar_one_or_none() if not dataset: raise HTTPException(status_code=404, detail=f"Dataset not found: {dataset_id}") return dataset async def _get_file_record(self, dataset_id: str, file_id: str) -> DatasetFiles: result = await self.db.execute( select(DatasetFiles).where( DatasetFiles.id == file_id, DatasetFiles.dataset_id == dataset_id, ) ) file_record = result.scalar_one_or_none() if not file_record: raise HTTPException(status_code=404, detail=f"File not found: {file_id}") return file_record @staticmethod def _validate_dataset_and_file(dataset: Dataset, file_record: DatasetFiles) -> None: dataset_type = str(getattr(dataset, "dataset_type", "") or "").upper() if dataset_type != "TEXT": raise HTTPException(status_code=400, detail="Only TEXT datasets are supported") file_type = str(getattr(file_record, "file_type", "") or "").lower() if file_type != PDF_FILE_TYPE: raise HTTPException(status_code=400, detail="Only PDF files are supported") @staticmethod def _resolve_source_path(file_record: DatasetFiles) -> Path: source_path = Path(str(file_record.file_path)).expanduser().resolve() if not source_path.exists(): raise HTTPException(status_code=404, detail="PDF file not found on disk") return source_path @staticmethod def _resolve_dataset_path(dataset: Dataset) -> Path: dataset_path_value = str(getattr(dataset, "path", "") or "").strip() if not dataset_path_value: raise HTTPException(status_code=500, detail="Dataset path is empty") dataset_path = Path(dataset_path_value).expanduser().resolve() dataset_path.mkdir(parents=True, exist_ok=True) return dataset_path @staticmethod def _build_output_filename(file_record: DatasetFiles, file_id: str) -> str: original_name = str(getattr(file_record, "file_name", "") or "").strip() if not original_name: original_name = f"{file_id}.pdf" return f"{original_name}{TEXT_FILE_EXTENSION}" def _resolve_target_path( self, dataset_path: Path, source_path: Path, file_record: DatasetFiles, file_id: str, ) -> Path: output_name = self._build_output_filename(file_record, file_id) if dataset_path in source_path.parents: target_dir = source_path.parent else: target_dir = dataset_path target_dir = target_dir.resolve() if target_dir != dataset_path and dataset_path not in target_dir.parents: raise HTTPException(status_code=400, detail="Target path is outside dataset path") target_dir.mkdir(parents=True, exist_ok=True) return target_dir / output_name async def _find_existing_text_record(self, dataset_id: str, target_path: Path) -> DatasetFiles | None: result = await self.db.execute( select(DatasetFiles).where( DatasetFiles.dataset_id == dataset_id, DatasetFiles.file_path == str(target_path), ) ) return result.scalar_one_or_none() @staticmethod def _parse_pdf(source_path: Path) -> str: loader = PyPDFLoader(str(source_path)) docs = loader.load() contents = [doc.page_content for doc in docs if doc.page_content] return "\n\n".join(contents) @staticmethod def _write_text_file(target_path: Path, content: str) -> None: with open(target_path, "w", encoding="utf-8") as handle: handle.write(content or "") @staticmethod def _get_file_size(path: Path) -> int: try: return int(os.path.getsize(path)) except OSError: return 0 async def _create_text_file_record( self, dataset: Dataset, source_file: DatasetFiles, target_path: Path, file_size: int, ) -> DatasetFiles: metadata = { DERIVED_METADATA_KEY: str(getattr(source_file, "id", "")), DERIVED_METADATA_NAME_KEY: str(getattr(source_file, "file_name", "")), DERIVED_METADATA_TYPE_KEY: str(getattr(source_file, "file_type", "")), DERIVED_METADATA_PARSER_KEY: DERIVED_METADATA_PARSER_VALUE, } record = DatasetFiles( dataset_id=dataset.id, # type: ignore[arg-type] file_name=target_path.name, file_path=str(target_path), file_type=TEXT_FILE_TYPE, file_size=file_size, dataset_filemetadata=metadata, last_access_time=datetime.datetime.now(datetime.UTC), ) self.db.add(record) dataset.file_count = (dataset.file_count or 0) + 1 dataset.size_bytes = (dataset.size_bytes or 0) + file_size dataset.status = "ACTIVE" await self.db.commit() await self.db.refresh(record) return record @staticmethod def _build_response(dataset_id: str, file_id: str, record: DatasetFiles) -> PdfTextExtractResponse: return PdfTextExtractResponse( datasetId=dataset_id, sourceFileId=file_id, textFileId=str(record.id), textFileName=str(record.file_name), textFilePath=str(record.file_path), textFileSize=int(record.file_size or 0), )