diff --git a/frontend/src/utils/file.util.ts b/frontend/src/utils/file.util.ts index 883f3f7..03ed5a2 100644 --- a/frontend/src/utils/file.util.ts +++ b/frontend/src/utils/file.util.ts @@ -417,6 +417,63 @@ export interface StreamUploadResult { skippedEmptyCount: number; } +async function processFileLines( + file: File, + chunkSize: number, + signal: AbortSignal | undefined, + onLine?: (line: string, index: number) => Promise | void, + onProgress?: (currentBytes: number, totalBytes: number, processedLines: number) => void +): Promise<{ lineCount: number; skippedEmptyCount: number }> { + const fileSize = file.size; + let offset = 0; + let buffer = ""; + let skippedEmptyCount = 0; + let lineIndex = 0; + + while (offset < fileSize) { + if (signal?.aborted) { + throw new Error("Upload cancelled"); + } + + const end = Math.min(offset + chunkSize, fileSize); + const chunk = file.slice(offset, end); + const text = await readFileAsText(chunk); + const combined = buffer + text; + const lines = combined.split(/\r?\n/); + buffer = lines.pop() || ""; + + for (const line of lines) { + if (signal?.aborted) { + throw new Error("Upload cancelled"); + } + if (!line.trim()) { + skippedEmptyCount++; + continue; + } + const currentIndex = lineIndex; + lineIndex += 1; + if (onLine) { + await onLine(line, currentIndex); + } + } + + offset = end; + onProgress?.(offset, fileSize, lineIndex); + } + + if (buffer.trim()) { + const currentIndex = lineIndex; + lineIndex += 1; + if (onLine) { + await onLine(buffer, currentIndex); + } + } else if (buffer.length > 0) { + skippedEmptyCount++; + } + + return { lineCount: lineIndex, skippedEmptyCount }; +} + export async function streamSplitAndUpload( file: File, uploadFn: (formData: FormData, config?: { onUploadProgress?: (e: { loaded: number; total: number }) => void }) => Promise, @@ -435,83 +492,30 @@ export async function streamSplitAndUpload( } = options; const fileSize = file.size; - let offset = 0; - let buffer = ""; let uploadedCount = 0; let skippedEmptyCount = 0; - let currentBytes = 0; // 获取文件名基础部分和扩展名 const originalFileName = fileNamePrefix || file.name; const lastDotIndex = originalFileName.lastIndexOf("."); const baseName = lastDotIndex > 0 ? originalFileName.slice(0, lastDotIndex) : originalFileName; const fileExtension = lastDotIndex > 0 ? originalFileName.slice(lastDotIndex) : ""; - - // 收集所有需要上传的行 - const pendingLines: { line: string; index: number }[] = []; - let lineIndex = 0; - - // 逐块读取文件并收集非空行 - while (offset < fileSize) { - // 检查是否已取消 - if (signal?.aborted) { - throw new Error("Upload cancelled"); - } - - const end = Math.min(offset + chunkSize, fileSize); - const chunk = file.slice(offset, end); - const text = await readFileAsText(chunk); - - // 将新读取的内容追加到 buffer - const combined = buffer + text; - - // 按换行符分割(支持 \n 和 \r\n) - const lines = combined.split(/\r?\n/); - - // 保留最后一行(可能不完整) - buffer = lines.pop() || ""; - - // 收集完整行(跳过空行) - for (const line of lines) { - if (signal?.aborted) { - throw new Error("Upload cancelled"); - } - if (!line.trim()) { - skippedEmptyCount++; - continue; - } - pendingLines.push({ line, index: lineIndex++ }); - } - - currentBytes = end; - offset = end; - - // 每处理完一个 chunk,更新进度 - onProgress?.(currentBytes, fileSize, uploadedCount); - } - - // 处理最后剩余的 buffer(如果文件不以换行符结尾) - if (buffer.trim()) { - pendingLines.push({ line: buffer, index: lineIndex++ }); - } else if (buffer.length > 0) { - skippedEmptyCount++; - } - - const totalFileNum = pendingLines.length; - if (totalFileNum === 0) { - return { - uploadedCount: 0, - totalBytes: fileSize, - skippedEmptyCount, - }; - } - - if (signal?.aborted) { - throw new Error("Upload cancelled"); - } let resolvedReqId = initialReqId; if (!resolvedReqId) { + const scanResult = await processFileLines(file, chunkSize, signal); + const totalFileNum = scanResult.lineCount; + skippedEmptyCount = scanResult.skippedEmptyCount; + if (totalFileNum === 0) { + return { + uploadedCount: 0, + totalBytes: fileSize, + skippedEmptyCount, + }; + } + if (signal?.aborted) { + throw new Error("Upload cancelled"); + } if (!resolveReqId) { throw new Error("Missing pre-upload request id"); } @@ -521,6 +525,9 @@ export async function streamSplitAndUpload( } onReqIdResolved?.(resolvedReqId); } + if (!resolvedReqId) { + throw new Error("Missing pre-upload request id"); + } /** * 上传单行内容 @@ -573,55 +580,65 @@ export async function streamSplitAndUpload( }); } - /** - * 带并发控制的上传队列执行器 - * 使用任务队列模式,确保不会同时启动所有上传任务 - */ - async function executeUploadsWithConcurrency(): Promise { - const lines = [...pendingLines]; - let currentIndex = 0; - let activeCount = 0; - let resolvedCount = 0; - - return new Promise((resolve, reject) => { - function tryStartNext() { - // 检查是否已完成 - if (resolvedCount >= lines.length) { - if (activeCount === 0) { - resolve(); - } - return; - } - - // 启动新的上传任务,直到达到最大并发数 - while (activeCount < maxConcurrency && currentIndex < lines.length) { - const { line, index } = lines[currentIndex++]; - activeCount++; - - uploadLine(line, index) - .then(() => { - uploadedCount++; - onProgress?.(fileSize, fileSize, uploadedCount); - }) - .catch((err) => { - reject(err); - }) - .finally(() => { - activeCount--; - resolvedCount++; - // 尝试启动下一个任务 - tryStartNext(); - }); - } + const inFlight = new Set>(); + let uploadError: unknown = null; + const enqueueUpload = async (line: string, index: number) => { + if (signal?.aborted) { + throw new Error("Upload cancelled"); + } + if (uploadError) { + throw uploadError; + } + const uploadPromise = uploadLine(line, index) + .then(() => { + uploadedCount++; + }) + .catch((err) => { + uploadError = err; + }); + inFlight.add(uploadPromise); + uploadPromise.finally(() => inFlight.delete(uploadPromise)); + if (inFlight.size >= maxConcurrency) { + await Promise.race(inFlight); + if (uploadError) { + throw uploadError; } - - // 开始执行 - tryStartNext(); - }); + } + }; + + let uploadResult: { lineCount: number; skippedEmptyCount: number } | null = null; + try { + uploadResult = await processFileLines( + file, + chunkSize, + signal, + enqueueUpload, + (currentBytes, totalBytes) => { + onProgress?.(currentBytes, totalBytes, uploadedCount); + } + ); + if (uploadError) { + throw uploadError; + } + } finally { + if (inFlight.size > 0) { + await Promise.allSettled(inFlight); + } } - // 使用并发控制执行所有上传 - await executeUploadsWithConcurrency(); + if (!uploadResult || (initialReqId && uploadResult.lineCount === 0)) { + return { + uploadedCount: 0, + totalBytes: fileSize, + skippedEmptyCount: uploadResult?.skippedEmptyCount ?? 0, + }; + } + + if (!initialReqId) { + skippedEmptyCount = skippedEmptyCount || uploadResult.skippedEmptyCount; + } else { + skippedEmptyCount = uploadResult.skippedEmptyCount; + } return { uploadedCount,