diff --git a/frontend/src/hooks/useSliceUpload.tsx b/frontend/src/hooks/useSliceUpload.tsx index a43385a..6d722a9 100644 --- a/frontend/src/hooks/useSliceUpload.tsx +++ b/frontend/src/hooks/useSliceUpload.tsx @@ -1,5 +1,5 @@ import { TaskItem } from "@/pages/DataManagement/dataset.model"; -import { calculateSHA256, checkIsFilesExist } from "@/utils/file.util"; +import { calculateSHA256, checkIsFilesExist, streamSplitAndUpload, StreamUploadResult } from "@/utils/file.util"; import { App } from "antd"; import { useRef, useState } from "react"; @@ -9,17 +9,18 @@ export function useFileSliceUpload( uploadChunk, cancelUpload, }: { - preUpload: (id: string, params: any) => Promise<{ data: number }>; - uploadChunk: (id: string, formData: FormData, config: any) => Promise; - cancelUpload: ((reqId: number) => Promise) | null; + preUpload: (id: string, params: Record) => Promise<{ data: number }>; + uploadChunk: (id: string, formData: FormData, config: Record) => Promise; + cancelUpload: ((reqId: number) => Promise) | null; }, - showTaskCenter = true // 上传时是否显示任务中心 + showTaskCenter = true, // 上传时是否显示任务中心 + enableStreamUpload = true // 是否启用流式分割上传 ) { const { message } = App.useApp(); const [taskList, setTaskList] = useState([]); const taskListRef = useRef([]); // 用于固定任务顺序 - const createTask = (detail: any = {}) => { + const createTask = (detail: Record = {}) => { const { dataset } = detail; const title = `上传数据集: ${dataset.name} `; const controller = new AbortController(); @@ -68,7 +69,7 @@ export function useFileSliceUpload( // 携带前缀信息,便于刷新后仍停留在当前目录 window.dispatchEvent( new CustomEvent(task.updateEvent, { - detail: { prefix: (task as any).prefix }, + detail: { prefix: task.prefix }, }) ); } @@ -79,7 +80,7 @@ export function useFileSliceUpload( } }; - async function buildFormData({ file, reqId, i, j }) { + async function buildFormData({ file, reqId, i, j }: { file: { slices: Blob[]; name: string; size: number }; reqId: number; i: number; j: number }) { const formData = new FormData(); const { slices, name, size } = file; const checkSum = await calculateSHA256(slices[j]); @@ -94,7 +95,7 @@ export function useFileSliceUpload( return formData; } - async function uploadSlice(task: TaskItem, fileInfo) { + async function uploadSlice(task: TaskItem, fileInfo: { loaded: number; i: number; j: number; files: { slices: Blob[]; name: string; size: number }[]; totalSize: number }) { if (!task) { return; } @@ -124,7 +125,7 @@ export function useFileSliceUpload( }); } - async function uploadFile({ task, files, totalSize }) { + async function uploadFile({ task, files, totalSize }: { task: TaskItem; files: { slices: Blob[]; name: string; size: number; originFile: Blob }[]; totalSize: number }) { console.log('[useSliceUpload] Calling preUpload with prefix:', task.prefix); const { data: reqId } = await preUpload(task.key, { totalFileNum: files.length, @@ -167,7 +168,7 @@ export function useFileSliceUpload( removeTask(newTask); } - const handleUpload = async ({ task, files }) => { + const handleUpload = async ({ task, files }: { task: TaskItem; files: { slices: Blob[]; name: string; size: number; originFile: Blob }[] }) => { const isErrorFile = await checkIsFilesExist(files); if (isErrorFile) { message.error("文件被修改或删除,请重新选择文件上传"); @@ -193,10 +194,155 @@ export function useFileSliceUpload( } }; + /** + * 流式分割上传处理 + * 用于大文件按行分割并立即上传的场景 + */ + const handleStreamUpload = async ({ task, files }: { task: TaskItem; files: File[] }) => { + try { + console.log('[useSliceUpload] Starting stream upload for', files.length, 'files'); + + // 预上传,获取 reqId + const totalSize = files.reduce((acc, file) => acc + file.size, 0); + const { data: reqId } = await preUpload(task.key, { + totalFileNum: -1, // 流式上传,文件数量不确定 + totalSize, + datasetId: task.key, + hasArchive: task.hasArchive, + prefix: task.prefix, + }); + + console.log('[useSliceUpload] Stream upload preUpload response reqId:', reqId); + + const newTask: TaskItem = { + ...task, + reqId, + isCancel: false, + cancelFn: () => { + task.controller.abort(); + cancelUpload?.(reqId); + if (task.updateEvent) window.dispatchEvent(new Event(task.updateEvent)); + }, + }; + updateTaskList(newTask); + + let totalUploadedLines = 0; + let totalProcessedBytes = 0; + const results: StreamUploadResult[] = []; + + // 逐个处理文件 + for (let i = 0; i < files.length; i++) { + const file = files[i]; + console.log(`[useSliceUpload] Processing file ${i + 1}/${files.length}: ${file.name}`); + + const result = await streamSplitAndUpload( + file, + (formData, config) => uploadChunk(task.key, formData, config), + (currentBytes, totalBytes, uploadedLines) => { + // 更新进度 + const overallBytes = totalProcessedBytes + currentBytes; + const curPercent = Number((overallBytes / totalSize) * 100).toFixed(2); + + const updatedTask: TaskItem = { + ...newTask, + ...taskListRef.current.find((item) => item.key === task.key), + size: overallBytes, + percent: curPercent >= 100 ? 99.99 : curPercent, + streamUploadInfo: { + currentFile: file.name, + fileIndex: i + 1, + totalFiles: files.length, + uploadedLines: totalUploadedLines + uploadedLines, + }, + }; + updateTaskList(updatedTask); + }, + 1024 * 1024, // 1MB chunk size + { + reqId, + hasArchive: task.hasArchive, + prefix: task.prefix, + signal: task.controller.signal, + maxConcurrency: 3, + } + ); + + results.push(result); + totalUploadedLines += result.uploadedCount; + totalProcessedBytes += file.size; + + console.log(`[useSliceUpload] File ${file.name} processed, uploaded ${result.uploadedCount} lines`); + } + + console.log('[useSliceUpload] Stream upload completed, total lines:', totalUploadedLines); + removeTask(newTask); + + message.success(`成功上传 ${totalUploadedLines} 个文件(按行分割)`); + } catch (err) { + console.error('[useSliceUpload] Stream upload error:', err); + if (err.message === "Upload cancelled") { + message.info("上传已取消"); + } else { + message.error("文件上传失败,请稍后重试"); + } + removeTask({ + ...task, + isCancel: true, + ...taskListRef.current.find((item) => item.key === task.key), + }); + } + }; + + /** + * 注册流式上传事件监听 + * 返回注销函数 + */ + const registerStreamUploadListener = () => { + if (!enableStreamUpload) return () => {}; + + const streamUploadHandler = async (e: Event) => { + const customEvent = e as CustomEvent; + const { dataset, files, updateEvent, hasArchive, prefix } = customEvent.detail; + + const controller = new AbortController(); + const task: TaskItem = { + key: dataset.id, + title: `上传数据集: ${dataset.name} (按行分割)`, + percent: 0, + reqId: -1, + controller, + size: 0, + updateEvent, + hasArchive, + prefix, + }; + + taskListRef.current = [task, ...taskListRef.current]; + setTaskList(taskListRef.current); + + // 显示任务中心 + if (showTaskCenter) { + window.dispatchEvent( + new CustomEvent("show:task-popover", { detail: { show: true } }) + ); + } + + await handleStreamUpload({ task, files }); + }; + + window.addEventListener("upload:dataset-stream", streamUploadHandler); + + return () => { + window.removeEventListener("upload:dataset-stream", streamUploadHandler); + }; + }; + return { taskList, createTask, removeTask, handleUpload, + handleStreamUpload, + registerStreamUploadListener, }; } diff --git a/frontend/src/pages/DataManagement/Detail/components/ImportConfiguration.tsx b/frontend/src/pages/DataManagement/Detail/components/ImportConfiguration.tsx index fee9399..044e16c 100644 --- a/frontend/src/pages/DataManagement/Detail/components/ImportConfiguration.tsx +++ b/frontend/src/pages/DataManagement/Detail/components/ImportConfiguration.tsx @@ -5,7 +5,7 @@ import { Dataset, DatasetType, DataSource } from "../../dataset.model"; import { useCallback, useEffect, useMemo, useState } from "react"; import { queryTasksUsingGet } from "@/pages/DataCollection/collection.apis"; import { updateDatasetByIdUsingPut } from "../../dataset.api"; -import { sliceFile } from "@/utils/file.util"; +import { sliceFile, shouldStreamUpload } from "@/utils/file.util"; import Dragger from "antd/es/upload/Dragger"; const TEXT_FILE_MIME_PREFIX = "text/"; @@ -164,17 +164,75 @@ export default function ImportConfiguration({ // 本地上传文件相关逻辑 const handleUpload = async (dataset: Dataset) => { - let filesToUpload = + const filesToUpload = (form.getFieldValue("files") as UploadFile[] | undefined) || []; - // 如果启用分行分割,处理文件 + // 如果启用分行分割,对大文件使用流式处理 if (importConfig.splitByLine && !hasNonTextFile) { - const splitResults = await Promise.all( - filesToUpload.map((file) => splitFileByLines(file)) - ); - filesToUpload = splitResults.flat(); + // 检查是否有大文件需要流式分割上传 + const filesForStreamUpload: File[] = []; + const filesForNormalUpload: UploadFile[] = []; + + for (const file of filesToUpload) { + const originFile = file.originFileObj ?? file; + if (originFile instanceof File && shouldStreamUpload(originFile)) { + filesForStreamUpload.push(originFile); + } else { + filesForNormalUpload.push(file); + } + } + + // 大文件使用流式分割上传 + if (filesForStreamUpload.length > 0) { + window.dispatchEvent( + new CustomEvent("upload:dataset-stream", { + detail: { + dataset, + files: filesForStreamUpload, + updateEvent, + hasArchive: importConfig.hasArchive, + prefix: currentPrefix, + }, + }) + ); + } + + // 小文件使用传统分割方式 + if (filesForNormalUpload.length > 0) { + const splitResults = await Promise.all( + filesForNormalUpload.map((file) => splitFileByLines(file)) + ); + const smallFilesToUpload = splitResults.flat(); + + // 计算分片列表 + const sliceList = smallFilesToUpload.map((file) => { + const originFile = (file.originFileObj ?? file) as Blob; + const slices = sliceFile(originFile); + return { + originFile: originFile, + slices, + name: file.name, + size: originFile.size || 0, + }; + }); + + console.log("[ImportConfiguration] Uploading small files with currentPrefix:", currentPrefix); + window.dispatchEvent( + new CustomEvent("upload:dataset", { + detail: { + dataset, + files: sliceList, + updateEvent, + hasArchive: importConfig.hasArchive, + prefix: currentPrefix, + }, + }) + ); + } + return; } + // 未启用分行分割,使用普通上传 // 计算分片列表 const sliceList = filesToUpload.map((file) => { const originFile = (file.originFileObj ?? file) as Blob; diff --git a/frontend/src/pages/DataManagement/dataset.model.ts b/frontend/src/pages/DataManagement/dataset.model.ts index e75e80d..d3f3b5e 100644 --- a/frontend/src/pages/DataManagement/dataset.model.ts +++ b/frontend/src/pages/DataManagement/dataset.model.ts @@ -102,6 +102,13 @@ export interface DatasetTask { executionHistory?: { time: string; status: string }[]; } +export interface StreamUploadInfo { + currentFile: string; + fileIndex: number; + totalFiles: number; + uploadedLines: number; +} + export interface TaskItem { key: string; title: string; @@ -113,4 +120,6 @@ export interface TaskItem { updateEvent?: string; size?: number; hasArchive?: boolean; + prefix?: string; + streamUploadInfo?: StreamUploadInfo; } diff --git a/frontend/src/pages/Layout/TaskUpload.tsx b/frontend/src/pages/Layout/TaskUpload.tsx index 6e5436a..373eb98 100644 --- a/frontend/src/pages/Layout/TaskUpload.tsx +++ b/frontend/src/pages/Layout/TaskUpload.tsx @@ -1,69 +1,93 @@ -import { - cancelUploadUsingPut, - preUploadUsingPost, - uploadFileChunkUsingPost, -} from "@/pages/DataManagement/dataset.api"; -import { Button, Empty, Progress } from "antd"; -import { DeleteOutlined } from "@ant-design/icons"; -import { useEffect } from "react"; -import { useFileSliceUpload } from "@/hooks/useSliceUpload"; - -export default function TaskUpload() { - const { createTask, taskList, removeTask, handleUpload } = useFileSliceUpload( - { - preUpload: preUploadUsingPost, - uploadChunk: uploadFileChunkUsingPost, - cancelUpload: cancelUploadUsingPut, - } - ); - - useEffect(() => { - const uploadHandler = (e: any) => { - console.log('[TaskUpload] Received upload event detail:', e.detail); - const { files } = e.detail; - const task = createTask(e.detail); - console.log('[TaskUpload] Created task with prefix:', task.prefix); - handleUpload({ task, files }); - }; - window.addEventListener("upload:dataset", uploadHandler); - return () => { - window.removeEventListener("upload:dataset", uploadHandler); - }; - }, []); - - return ( -
- {taskList.length > 0 && - taskList.map((task) => ( -
-
-
{task.title}
- -
- - -
- ))} - {taskList.length === 0 && ( - - )} -
- ); -} +import { + cancelUploadUsingPut, + preUploadUsingPost, + uploadFileChunkUsingPost, +} from "@/pages/DataManagement/dataset.api"; +import { Button, Empty, Progress, Tag } from "antd"; +import { DeleteOutlined, FileTextOutlined } from "@ant-design/icons"; +import { useEffect } from "react"; +import { useFileSliceUpload } from "@/hooks/useSliceUpload"; + +export default function TaskUpload() { + const { createTask, taskList, removeTask, handleUpload, registerStreamUploadListener } = useFileSliceUpload( + { + preUpload: preUploadUsingPost, + uploadChunk: uploadFileChunkUsingPost, + cancelUpload: cancelUploadUsingPut, + }, + true, // showTaskCenter + true // enableStreamUpload + ); + + useEffect(() => { + const uploadHandler = (e: Event) => { + const customEvent = e as CustomEvent; + console.log('[TaskUpload] Received upload event detail:', customEvent.detail); + const { files } = customEvent.detail; + const task = createTask(customEvent.detail); + console.log('[TaskUpload] Created task with prefix:', task.prefix); + handleUpload({ task, files }); + }; + window.addEventListener("upload:dataset", uploadHandler); + return () => { + window.removeEventListener("upload:dataset", uploadHandler); + }; + }, [createTask, handleUpload]); + + // 注册流式上传监听器 + useEffect(() => { + const unregister = registerStreamUploadListener(); + return unregister; + }, [registerStreamUploadListener]); + + return ( +
+ {taskList.length > 0 && + taskList.map((task) => ( +
+
+
{task.title}
+ +
+ + + {task.streamUploadInfo && ( +
+ } size="small"> + 按行分割 + + + 已上传: {task.streamUploadInfo.uploadedLines} 行 + + {task.streamUploadInfo.totalFiles > 1 && ( + + ({task.streamUploadInfo.fileIndex}/{task.streamUploadInfo.totalFiles} 文件) + + )} +
+ )} +
+ ))} + {taskList.length === 0 && ( + + )} +
+ ); +} diff --git a/frontend/src/utils/file.util.ts b/frontend/src/utils/file.util.ts index e83036a..f9e79d7 100644 --- a/frontend/src/utils/file.util.ts +++ b/frontend/src/utils/file.util.ts @@ -1,79 +1,570 @@ import { UploadFile } from "antd"; import jsSHA from "jssha"; -const CHUNK_SIZE = 1024 * 1024 * 60; +// 默认分片大小:5MB(适合大多数网络环境) +export const DEFAULT_CHUNK_SIZE = 1024 * 1024 * 5; +// 大文件阈值:10MB +export const LARGE_FILE_THRESHOLD = 1024 * 1024 * 10; +// 最大并发上传数 +export const MAX_CONCURRENT_UPLOADS = 3; +// 文本文件读取块大小:20MB(用于计算 SHA256) +const BUFFER_CHUNK_SIZE = 1024 * 1024 * 20; -export function sliceFile(file, chunkSize = CHUNK_SIZE): Blob[] { +/** + * 将文件分割为多个分片 + * @param file 文件对象 + * @param chunkSize 分片大小(字节),默认 5MB + * @returns 分片数组(Blob 列表) + */ +export function sliceFile(file: Blob, chunkSize = DEFAULT_CHUNK_SIZE): Blob[] { const totalSize = file.size; + const chunks: Blob[] = []; + + // 小文件不需要分片 + if (totalSize <= chunkSize) { + return [file]; + } + let start = 0; - let end = start + chunkSize; - const chunks = []; while (start < totalSize) { + const end = Math.min(start + chunkSize, totalSize); const blob = file.slice(start, end); chunks.push(blob); - start = end; - end = start + chunkSize; } + return chunks; } -export function calculateSHA256(file: Blob): Promise { - let count = 0; - const hash = new jsSHA("SHA-256", "ARRAYBUFFER", { encoding: "UTF8" }); +/** + * 计算文件的 SHA256 哈希值 + * @param file 文件 Blob + * @param onProgress 进度回调(可选) + * @returns SHA256 哈希字符串 + */ +export function calculateSHA256( + file: Blob, + onProgress?: (percent: number) => void +): Promise { return new Promise((resolve, reject) => { + const hash = new jsSHA("SHA-256", "ARRAYBUFFER", { encoding: "UTF8" }); const reader = new FileReader(); + let processedSize = 0; function readChunk(start: number, end: number) { const slice = file.slice(start, end); reader.readAsArrayBuffer(slice); } - const bufferChunkSize = 1024 * 1024 * 20; - function processChunk(offset: number) { const start = offset; - const end = Math.min(start + bufferChunkSize, file.size); - count = end; - + const end = Math.min(start + BUFFER_CHUNK_SIZE, file.size); readChunk(start, end); } - reader.onloadend = function () { - const arraybuffer = reader.result; + reader.onloadend = function (e) { + const arraybuffer = reader.result as ArrayBuffer; + if (!arraybuffer) { + reject(new Error("Failed to read file")); + return; + } hash.update(arraybuffer); - if (count < file.size) { - processChunk(count); + processedSize += (e.target as FileReader).result?.byteLength || 0; + + if (onProgress) { + const percent = Math.min(100, Math.round((processedSize / file.size) * 100)); + onProgress(percent); + } + + if (processedSize < file.size) { + processChunk(processedSize); } else { resolve(hash.getHash("HEX", { outputLen: 256 })); } }; + reader.onerror = () => reject(new Error("File reading failed")); processChunk(0); }); } +/** + * 批量计算多个文件的 SHA256 + * @param files 文件列表 + * @param onFileProgress 单个文件进度回调(可选) + * @returns 哈希值数组 + */ +export async function calculateSHA256Batch( + files: Blob[], + onFileProgress?: (index: number, percent: number) => void +): Promise { + const results: string[] = []; + + for (let i = 0; i < files.length; i++) { + const hash = await calculateSHA256(files[i], (percent) => { + onFileProgress?.(i, percent); + }); + results.push(hash); + } + + return results; +} + +/** + * 检查文件是否存在(未被修改或删除) + * @param fileList 文件列表 + * @returns 返回第一个不存在的文件,或 null(如果都存在) + */ export function checkIsFilesExist( - fileList: UploadFile[] -): Promise { + fileList: Array<{ originFile?: Blob }> +): Promise<{ originFile?: Blob } | null> { return new Promise((resolve) => { - const loadEndFn = (file: UploadFile, reachEnd: boolean, e) => { - const fileNotExist = !e.target.result; + if (!fileList.length) { + resolve(null); + return; + } + + let checkedCount = 0; + const totalCount = fileList.length; + + const loadEndFn = (file: { originFile?: Blob }, e: ProgressEvent) => { + checkedCount++; + const fileNotExist = !e.target?.result; if (fileNotExist) { resolve(file); + return; } - if (reachEnd) { + if (checkedCount >= totalCount) { resolve(null); } }; - for (let i = 0; i < fileList.length; i++) { - const { originFile: file } = fileList[i]; + for (const file of fileList) { const fileReader = new FileReader(); - fileReader.readAsArrayBuffer(file); - fileReader.onloadend = (e) => - loadEndFn(fileList[i], i === fileList.length - 1, e); + const actualFile = file.originFile; + + if (!actualFile) { + checkedCount++; + if (checkedCount >= totalCount) { + resolve(null); + } + continue; + } + + fileReader.readAsArrayBuffer(actualFile.slice(0, 1)); + fileReader.onloadend = (e) => loadEndFn(file, e); + fileReader.onerror = () => { + checkedCount++; + resolve(file); + }; } }); } + +/** + * 判断文件是否为大文件 + * @param size 文件大小(字节) + * @param threshold 阈值(字节),默认 10MB + */ +export function isLargeFile(size: number, threshold = LARGE_FILE_THRESHOLD): boolean { + return size > threshold; +} + +/** + * 格式化文件大小为人类可读格式 + * @param bytes 字节数 + * @param decimals 小数位数 + */ +export function formatFileSize(bytes: number, decimals = 2): string { + if (bytes === 0) return "0 B"; + + const k = 1024; + const sizes = ["B", "KB", "MB", "GB", "TB", "PB"]; + const i = Math.floor(Math.log(bytes) / Math.log(k)); + + return `${parseFloat((bytes / Math.pow(k, i)).toFixed(decimals))} ${sizes[i]}`; +} + +/** + * 并发执行异步任务 + * @param tasks 任务函数数组 + * @param maxConcurrency 最大并发数 + * @param onTaskComplete 单个任务完成回调(可选) + */ +export async function runConcurrentTasks( + tasks: (() => Promise)[], + maxConcurrency: number, + onTaskComplete?: (index: number, result: T) => void +): Promise { + const results: T[] = new Array(tasks.length); + let index = 0; + + async function runNext(): Promise { + const currentIndex = index++; + if (currentIndex >= tasks.length) return; + + const result = await tasks[currentIndex](); + results[currentIndex] = result; + onTaskComplete?.(currentIndex, result); + + await runNext(); + } + + const workers = Array(Math.min(maxConcurrency, tasks.length)) + .fill(null) + .map(() => runNext()); + + await Promise.all(workers); + return results; +} + +/** + * 按行分割文本文件内容 + * @param text 文本内容 + * @param skipEmptyLines 是否跳过空行,默认 true + * @returns 行数组 + */ +export function splitTextByLines(text: string, skipEmptyLines = true): string[] { + const lines = text.split(/\r?\n/); + if (skipEmptyLines) { + return lines.filter((line) => line.trim() !== ""); + } + return lines; +} + +/** + * 创建分片信息对象 + * @param file 原始文件 + * @param chunkSize 分片大小 + */ +export function createFileSliceInfo( + file: File | Blob, + chunkSize = DEFAULT_CHUNK_SIZE +): { + originFile: Blob; + slices: Blob[]; + name: string; + size: number; + totalChunks: number; +} { + const slices = sliceFile(file, chunkSize); + return { + originFile: file, + slices, + name: (file as File).name || "unnamed", + size: file.size, + totalChunks: slices.length, + }; +} + +/** + * 支持的文本文件 MIME 类型前缀 + */ +export const TEXT_FILE_MIME_PREFIX = "text/"; + +/** + * 支持的文本文件 MIME 类型集合 + */ +export const TEXT_FILE_MIME_TYPES = new Set([ + "application/json", + "application/xml", + "application/csv", + "application/ndjson", + "application/x-ndjson", + "application/x-yaml", + "application/yaml", + "application/javascript", + "application/x-javascript", + "application/sql", + "application/rtf", + "application/xhtml+xml", + "application/svg+xml", +]); + +/** + * 支持的文本文件扩展名集合 + */ +export const TEXT_FILE_EXTENSIONS = new Set([ + ".txt", + ".md", + ".markdown", + ".csv", + ".tsv", + ".json", + ".jsonl", + ".ndjson", + ".log", + ".xml", + ".yaml", + ".yml", + ".sql", + ".js", + ".ts", + ".jsx", + ".tsx", + ".html", + ".htm", + ".css", + ".scss", + ".less", + ".py", + ".java", + ".c", + ".cpp", + ".h", + ".hpp", + ".go", + ".rs", + ".rb", + ".php", + ".sh", + ".bash", + ".zsh", + ".ps1", + ".bat", + ".cmd", + ".svg", + ".rtf", +]); + +/** + * 判断文件是否为文本文件(支持 UploadFile 类型) + * @param file UploadFile 对象 + */ +export function isTextUploadFile(file: UploadFile): boolean { + const mimeType = (file.type || "").toLowerCase(); + if (mimeType) { + if (mimeType.startsWith(TEXT_FILE_MIME_PREFIX)) return true; + if (TEXT_FILE_MIME_TYPES.has(mimeType)) return true; + } + + const fileName = file.name || ""; + const dotIndex = fileName.lastIndexOf("."); + if (dotIndex < 0) return false; + const ext = fileName.slice(dotIndex).toLowerCase(); + return TEXT_FILE_EXTENSIONS.has(ext); +} + +/** + * 判断文件名是否为文本文件 + * @param fileName 文件名 + */ +export function isTextFileByName(fileName: string): boolean { + const lowerName = fileName.toLowerCase(); + + // 先检查 MIME 类型(如果有) + // 这里简化处理,主要通过扩展名判断 + + const dotIndex = lowerName.lastIndexOf("."); + if (dotIndex < 0) return false; + const ext = lowerName.slice(dotIndex); + return TEXT_FILE_EXTENSIONS.has(ext); +} + +/** + * 获取文件扩展名 + * @param fileName 文件名 + */ +export function getFileExtension(fileName: string): string { + const dotIndex = fileName.lastIndexOf("."); + if (dotIndex < 0) return ""; + return fileName.slice(dotIndex).toLowerCase(); +} + +/** + * 安全地读取文件为文本 + * @param file 文件对象 + * @param encoding 编码,默认 UTF-8 + */ +export function readFileAsText( + file: File | Blob, + encoding = "UTF-8" +): Promise { + return new Promise((resolve, reject) => { + const reader = new FileReader(); + reader.onload = (e) => resolve(e.target?.result as string); + reader.onerror = () => reject(new Error("Failed to read file")); + reader.readAsText(file, encoding); + }); +} + +/** + * 流式分割文件并逐行上传 + * 使用 Blob.slice 逐块读取,避免一次性加载大文件到内存 + * @param file 文件对象 + * @param datasetId 数据集ID + * @param uploadFn 上传函数,接收 FormData 和配置,返回 Promise + * @param onProgress 进度回调 (currentBytes, totalBytes, uploadedLines) + * @param chunkSize 每次读取的块大小,默认 1MB + * @param options 其他选项 + * @returns 上传结果统计 + */ +export interface StreamUploadOptions { + reqId: number; + fileNamePrefix?: string; + fileExtension?: string; + hasArchive?: boolean; + prefix?: string; + signal?: AbortSignal; + maxConcurrency?: number; +} + +export interface StreamUploadResult { + uploadedCount: number; + totalBytes: number; + skippedEmptyCount: number; +} + +export async function streamSplitAndUpload( + file: File, + uploadFn: (formData: FormData, config?: { onUploadProgress?: (e: { loaded: number; total: number }) => void }) => Promise, + onProgress?: (currentBytes: number, totalBytes: number, uploadedLines: number) => void, + chunkSize: number = 1024 * 1024, // 1MB + options: StreamUploadOptions +): Promise { + const { reqId, fileNamePrefix, fileExtension = ".txt", prefix, signal, maxConcurrency = 3 } = options; + + const fileSize = file.size; + let offset = 0; + let buffer = ""; + let uploadedCount = 0; + let skippedEmptyCount = 0; + let currentBytes = 0; + + // 获取文件名基础部分 + const baseName = fileNamePrefix || file.name.replace(/\.[^/.]+$/, ""); + const ext = fileExtension.startsWith(".") ? fileExtension : `.${fileExtension}`; + + // 用于并发控制的队列 + const uploadQueue: Promise[] = []; + let uploadIndex = 0; + + /** + * 上传单行内容 + */ + async function uploadLine(line: string, index: number): Promise { + if (!line.trim()) { + skippedEmptyCount++; + return; + } + + const newFileName = `${baseName}_${String(index + 1).padStart(6, "0")}${ext}`; + const blob = new Blob([line], { type: "text/plain" }); + const lineFile = new File([blob], newFileName, { type: "text/plain" }); + + // 计算分片(小文件通常只需要一个分片) + const slices = sliceFile(lineFile, DEFAULT_CHUNK_SIZE); + const checkSum = await calculateSHA256(slices[0]); + + const formData = new FormData(); + formData.append("file", slices[0]); + formData.append("reqId", reqId.toString()); + formData.append("fileNo", (index + 1).toString()); + formData.append("chunkNo", "1"); + formData.append("fileName", newFileName); + formData.append("fileSize", lineFile.size.toString()); + formData.append("totalChunkNum", "1"); + formData.append("checkSumHex", checkSum); + if (prefix !== undefined) { + formData.append("prefix", prefix); + } + + await uploadFn(formData, { + onUploadProgress: () => { + // 单行文件很小,进度主要用于追踪上传状态 + }, + }); + } + + /** + * 处理并发上传 + */ + async function processUploadQueue(): Promise { + let currentIndex = 0; + + async function runNext(): Promise { + if (currentIndex >= uploadQueue.length) return; + const task = uploadQueue[currentIndex++]; + await task; + await runNext(); + } + + const workers = Array(Math.min(maxConcurrency, uploadQueue.length)) + .fill(null) + .map(() => runNext()); + + await Promise.all(workers); + } + + // 逐块读取文件 + while (offset < fileSize) { + // 检查是否已取消 + if (signal?.aborted) { + throw new Error("Upload cancelled"); + } + + const end = Math.min(offset + chunkSize, fileSize); + const chunk = file.slice(offset, end); + const text = await readFileAsText(chunk); + + // 将新读取的内容追加到 buffer + const combined = buffer + text; + + // 按换行符分割(支持 \n 和 \r\n) + const lines = combined.split(/\r?\n/); + + // 保留最后一行(可能不完整) + buffer = lines.pop() || ""; + + // 将完整行加入上传队列 + for (const line of lines) { + if (signal?.aborted) { + throw new Error("Upload cancelled"); + } + + const currentLineIndex = uploadIndex++; + uploadQueue.push( + uploadLine(line, currentLineIndex).then(() => { + uploadedCount++; + onProgress?.(currentBytes, fileSize, uploadedCount); + }) + ); + } + + currentBytes = end; + offset = end; + + // 每处理完一个 chunk,更新进度 + onProgress?.(currentBytes, fileSize, uploadedCount); + } + + // 处理最后剩余的 buffer(如果文件不以换行符结尾) + if (buffer.trim()) { + const currentLineIndex = uploadIndex++; + uploadQueue.push( + uploadLine(buffer, currentLineIndex).then(() => { + uploadedCount++; + onProgress?.(fileSize, fileSize, uploadedCount); + }) + ); + } + + // 并发执行所有上传任务 + await processUploadQueue(); + + return { + uploadedCount, + totalBytes: fileSize, + skippedEmptyCount, + }; +} + +/** + * 判断文件是否需要流式分割上传 + * @param file 文件对象 + * @param threshold 阈值,默认 5MB + */ +export function shouldStreamUpload(file: File, threshold: number = 5 * 1024 * 1024): boolean { + return file.size > threshold; +}