import { UploadFile } from "antd"; import jsSHA from "jssha"; // 默认分片大小:5MB(适合大多数网络环境) export const DEFAULT_CHUNK_SIZE = 1024 * 1024 * 5; // 大文件阈值:10MB export const LARGE_FILE_THRESHOLD = 1024 * 1024 * 10; // 最大并发上传数 export const MAX_CONCURRENT_UPLOADS = 3; // 文本文件读取块大小:20MB(用于计算 SHA256) const BUFFER_CHUNK_SIZE = 1024 * 1024 * 20; /** * 将文件分割为多个分片 * @param file 文件对象 * @param chunkSize 分片大小(字节),默认 5MB * @returns 分片数组(Blob 列表) */ export function sliceFile(file: Blob, chunkSize = DEFAULT_CHUNK_SIZE): Blob[] { const totalSize = file.size; const chunks: Blob[] = []; // 小文件不需要分片 if (totalSize <= chunkSize) { return [file]; } let start = 0; while (start < totalSize) { const end = Math.min(start + chunkSize, totalSize); const blob = file.slice(start, end); chunks.push(blob); start = end; } return chunks; } /** * 计算文件的 SHA256 哈希值 * @param file 文件 Blob * @param onProgress 进度回调(可选) * @returns SHA256 哈希字符串 */ export function calculateSHA256( file: Blob, onProgress?: (percent: number) => void ): Promise { return new Promise((resolve, reject) => { const hash = new jsSHA("SHA-256", "ARRAYBUFFER", { encoding: "UTF8" }); const reader = new FileReader(); let processedSize = 0; function readChunk(start: number, end: number) { const slice = file.slice(start, end); reader.readAsArrayBuffer(slice); } function processChunk(offset: number) { const start = offset; const end = Math.min(start + BUFFER_CHUNK_SIZE, file.size); readChunk(start, end); } reader.onloadend = function (e) { const arraybuffer = reader.result as ArrayBuffer; if (!arraybuffer) { reject(new Error("Failed to read file")); return; } hash.update(arraybuffer); processedSize += (e.target as FileReader).result?.byteLength || 0; if (onProgress) { const percent = Math.min(100, Math.round((processedSize / file.size) * 100)); onProgress(percent); } if (processedSize < file.size) { processChunk(processedSize); } else { resolve(hash.getHash("HEX", { outputLen: 256 })); } }; reader.onerror = () => reject(new Error("File reading failed")); processChunk(0); }); } /** * 批量计算多个文件的 SHA256 * @param files 文件列表 * @param onFileProgress 单个文件进度回调(可选) * @returns 哈希值数组 */ export async function calculateSHA256Batch( files: Blob[], onFileProgress?: (index: number, percent: number) => void ): Promise { const results: string[] = []; for (let i = 0; i < files.length; i++) { const hash = await calculateSHA256(files[i], (percent) => { onFileProgress?.(i, percent); }); results.push(hash); } return results; } /** * 检查文件是否存在(未被修改或删除) * @param fileList 文件列表 * @returns 返回第一个不存在的文件,或 null(如果都存在) */ export function checkIsFilesExist( fileList: Array<{ originFile?: Blob }> ): Promise<{ originFile?: Blob } | null> { return new Promise((resolve) => { if (!fileList.length) { resolve(null); return; } let checkedCount = 0; const totalCount = fileList.length; const loadEndFn = (file: { originFile?: Blob }, e: ProgressEvent) => { checkedCount++; const fileNotExist = !e.target?.result; if (fileNotExist) { resolve(file); return; } if (checkedCount >= totalCount) { resolve(null); } }; for (const file of fileList) { const fileReader = new FileReader(); const actualFile = file.originFile; if (!actualFile) { checkedCount++; if (checkedCount >= totalCount) { resolve(null); } continue; } fileReader.readAsArrayBuffer(actualFile.slice(0, 1)); fileReader.onloadend = (e) => loadEndFn(file, e); fileReader.onerror = () => { checkedCount++; resolve(file); }; } }); } /** * 判断文件是否为大文件 * @param size 文件大小(字节) * @param threshold 阈值(字节),默认 10MB */ export function isLargeFile(size: number, threshold = LARGE_FILE_THRESHOLD): boolean { return size > threshold; } /** * 格式化文件大小为人类可读格式 * @param bytes 字节数 * @param decimals 小数位数 */ export function formatFileSize(bytes: number, decimals = 2): string { if (bytes === 0) return "0 B"; const k = 1024; const sizes = ["B", "KB", "MB", "GB", "TB", "PB"]; const i = Math.floor(Math.log(bytes) / Math.log(k)); return `${parseFloat((bytes / Math.pow(k, i)).toFixed(decimals))} ${sizes[i]}`; } /** * 并发执行异步任务 * @param tasks 任务函数数组 * @param maxConcurrency 最大并发数 * @param onTaskComplete 单个任务完成回调(可选) */ export async function runConcurrentTasks( tasks: (() => Promise)[], maxConcurrency: number, onTaskComplete?: (index: number, result: T) => void ): Promise { const results: T[] = new Array(tasks.length); let index = 0; async function runNext(): Promise { const currentIndex = index++; if (currentIndex >= tasks.length) return; const result = await tasks[currentIndex](); results[currentIndex] = result; onTaskComplete?.(currentIndex, result); await runNext(); } const workers = Array(Math.min(maxConcurrency, tasks.length)) .fill(null) .map(() => runNext()); await Promise.all(workers); return results; } /** * 按行分割文本文件内容 * @param text 文本内容 * @param skipEmptyLines 是否跳过空行,默认 true * @returns 行数组 */ export function splitTextByLines(text: string, skipEmptyLines = true): string[] { const lines = text.split(/\r?\n/); if (skipEmptyLines) { return lines.filter((line) => line.trim() !== ""); } return lines; } /** * 创建分片信息对象 * @param file 原始文件 * @param chunkSize 分片大小 */ export function createFileSliceInfo( file: File | Blob, chunkSize = DEFAULT_CHUNK_SIZE ): { originFile: Blob; slices: Blob[]; name: string; size: number; totalChunks: number; } { const slices = sliceFile(file, chunkSize); return { originFile: file, slices, name: (file as File).name || "unnamed", size: file.size, totalChunks: slices.length, }; } /** * 支持的文本文件 MIME 类型前缀 */ export const TEXT_FILE_MIME_PREFIX = "text/"; /** * 支持的文本文件 MIME 类型集合 */ export const TEXT_FILE_MIME_TYPES = new Set([ "application/json", "application/xml", "application/csv", "application/ndjson", "application/x-ndjson", "application/x-yaml", "application/yaml", "application/javascript", "application/x-javascript", "application/sql", "application/rtf", "application/xhtml+xml", "application/svg+xml", ]); /** * 支持的文本文件扩展名集合 */ export const TEXT_FILE_EXTENSIONS = new Set([ ".txt", ".md", ".markdown", ".csv", ".tsv", ".json", ".jsonl", ".ndjson", ".log", ".xml", ".yaml", ".yml", ".sql", ".js", ".ts", ".jsx", ".tsx", ".html", ".htm", ".css", ".scss", ".less", ".py", ".java", ".c", ".cpp", ".h", ".hpp", ".go", ".rs", ".rb", ".php", ".sh", ".bash", ".zsh", ".ps1", ".bat", ".cmd", ".svg", ".rtf", ]); /** * 判断文件是否为文本文件(支持 UploadFile 类型) * @param file UploadFile 对象 */ export function isTextUploadFile(file: UploadFile): boolean { const mimeType = (file.type || "").toLowerCase(); if (mimeType) { if (mimeType.startsWith(TEXT_FILE_MIME_PREFIX)) return true; if (TEXT_FILE_MIME_TYPES.has(mimeType)) return true; } const fileName = file.name || ""; const dotIndex = fileName.lastIndexOf("."); if (dotIndex < 0) return false; const ext = fileName.slice(dotIndex).toLowerCase(); return TEXT_FILE_EXTENSIONS.has(ext); } /** * 判断文件名是否为文本文件 * @param fileName 文件名 */ export function isTextFileByName(fileName: string): boolean { const lowerName = fileName.toLowerCase(); // 先检查 MIME 类型(如果有) // 这里简化处理,主要通过扩展名判断 const dotIndex = lowerName.lastIndexOf("."); if (dotIndex < 0) return false; const ext = lowerName.slice(dotIndex); return TEXT_FILE_EXTENSIONS.has(ext); } /** * 获取文件扩展名 * @param fileName 文件名 */ export function getFileExtension(fileName: string): string { const dotIndex = fileName.lastIndexOf("."); if (dotIndex < 0) return ""; return fileName.slice(dotIndex).toLowerCase(); } /** * 安全地读取文件为文本 * @param file 文件对象 * @param encoding 编码,默认 UTF-8 */ export function readFileAsText( file: File | Blob, encoding = "UTF-8" ): Promise { return new Promise((resolve, reject) => { const reader = new FileReader(); reader.onload = (e) => resolve(e.target?.result as string); reader.onerror = () => reject(new Error("Failed to read file")); reader.readAsText(file, encoding); }); } /** * 流式分割文件并逐行上传 * 使用 Blob.slice 逐块读取,避免一次性加载大文件到内存 * @param file 文件对象 * @param datasetId 数据集ID * @param uploadFn 上传函数,接收 FormData 和配置,返回 Promise * @param onProgress 进度回调 (currentBytes, totalBytes, uploadedLines) * @param chunkSize 每次读取的块大小,默认 1MB * @param options 其他选项 * @returns 上传结果统计 */ export interface StreamUploadOptions { reqId: number; fileNamePrefix?: string; hasArchive?: boolean; prefix?: string; signal?: AbortSignal; maxConcurrency?: number; } export interface StreamUploadResult { uploadedCount: number; totalBytes: number; skippedEmptyCount: number; } export async function streamSplitAndUpload( file: File, uploadFn: (formData: FormData, config?: { onUploadProgress?: (e: { loaded: number; total: number }) => void }) => Promise, onProgress?: (currentBytes: number, totalBytes: number, uploadedLines: number) => void, chunkSize: number = 1024 * 1024, // 1MB options: StreamUploadOptions ): Promise { const { reqId, fileNamePrefix, prefix, signal, maxConcurrency = 3 } = options; const fileSize = file.size; let offset = 0; let buffer = ""; let uploadedCount = 0; let skippedEmptyCount = 0; let currentBytes = 0; // 获取文件名基础部分 const baseName = fileNamePrefix || file.name.replace(/\.[^/.]+$/, ""); // 用于并发控制的队列 const uploadQueue: Promise[] = []; let uploadIndex = 0; /** * 上传单行内容 */ async function uploadLine(line: string, index: number): Promise { if (!line.trim()) { skippedEmptyCount++; return; } const newFileName = `${baseName}_${String(index + 1).padStart(6, "0")}`; const blob = new Blob([line], { type: "text/plain" }); const lineFile = new File([blob], newFileName, { type: "text/plain" }); // 计算分片(小文件通常只需要一个分片) const slices = sliceFile(lineFile, DEFAULT_CHUNK_SIZE); const checkSum = await calculateSHA256(slices[0]); const formData = new FormData(); formData.append("file", slices[0]); formData.append("reqId", reqId.toString()); formData.append("fileNo", (index + 1).toString()); formData.append("chunkNo", "1"); formData.append("fileName", newFileName); formData.append("fileSize", lineFile.size.toString()); formData.append("totalChunkNum", "1"); formData.append("checkSumHex", checkSum); if (prefix !== undefined) { formData.append("prefix", prefix); } await uploadFn(formData, { onUploadProgress: () => { // 单行文件很小,进度主要用于追踪上传状态 }, }); } /** * 处理并发上传 */ async function processUploadQueue(): Promise { let currentIndex = 0; async function runNext(): Promise { if (currentIndex >= uploadQueue.length) return; const task = uploadQueue[currentIndex++]; await task; await runNext(); } const workers = Array(Math.min(maxConcurrency, uploadQueue.length)) .fill(null) .map(() => runNext()); await Promise.all(workers); } // 逐块读取文件 while (offset < fileSize) { // 检查是否已取消 if (signal?.aborted) { throw new Error("Upload cancelled"); } const end = Math.min(offset + chunkSize, fileSize); const chunk = file.slice(offset, end); const text = await readFileAsText(chunk); // 将新读取的内容追加到 buffer const combined = buffer + text; // 按换行符分割(支持 \n 和 \r\n) const lines = combined.split(/\r?\n/); // 保留最后一行(可能不完整) buffer = lines.pop() || ""; // 将完整行加入上传队列 for (const line of lines) { if (signal?.aborted) { throw new Error("Upload cancelled"); } const currentLineIndex = uploadIndex++; uploadQueue.push( uploadLine(line, currentLineIndex).then(() => { uploadedCount++; onProgress?.(currentBytes, fileSize, uploadedCount); }) ); } currentBytes = end; offset = end; // 每处理完一个 chunk,更新进度 onProgress?.(currentBytes, fileSize, uploadedCount); } // 处理最后剩余的 buffer(如果文件不以换行符结尾) if (buffer.trim()) { const currentLineIndex = uploadIndex++; uploadQueue.push( uploadLine(buffer, currentLineIndex).then(() => { uploadedCount++; onProgress?.(fileSize, fileSize, uploadedCount); }) ); } // 并发执行所有上传任务 await processUploadQueue(); return { uploadedCount, totalBytes: fileSize, skippedEmptyCount, }; } /** * 判断文件是否需要流式分割上传 * @param file 文件对象 * @param threshold 阈值,默认 5MB */ export function shouldStreamUpload(file: File, threshold: number = 5 * 1024 * 1024): boolean { return file.size > threshold; }