diff --git a/frontend/src/utils/file.util.ts b/frontend/src/utils/file.util.ts index 096f15b..9606ddc 100644 --- a/frontend/src/utils/file.util.ts +++ b/frontend/src/utils/file.util.ts @@ -431,81 +431,17 @@ export async function streamSplitAndUpload( let skippedEmptyCount = 0; let currentBytes = 0; - // 获取文件名基础部分 - const baseName = fileNamePrefix || file.name.replace(/\.[^/.]+$/, ""); + // 获取文件名基础部分和扩展名 + const originalFileName = fileNamePrefix || file.name; + const lastDotIndex = originalFileName.lastIndexOf("."); + const baseName = lastDotIndex > 0 ? originalFileName.slice(0, lastDotIndex) : originalFileName; + const fileExtension = lastDotIndex > 0 ? originalFileName.slice(lastDotIndex) : ""; - // 用于并发控制的队列 - const uploadQueue: Promise[] = []; - let uploadIndex = 0; + // 收集所有需要上传的行 + const pendingLines: { line: string; index: number }[] = []; + let lineIndex = 0; - /** - * 上传单行内容 - */ - async function uploadLine(line: string, index: number): Promise { - // 检查是否已取消 - if (signal?.aborted) { - throw new Error("Upload cancelled"); - } - - if (!line.trim()) { - skippedEmptyCount++; - return; - } - - const newFileName = `${baseName}_${String(index + 1).padStart(6, "0")}`; - const blob = new Blob([line], { type: "text/plain" }); - const lineFile = new File([blob], newFileName, { type: "text/plain" }); - - // 计算分片(小文件通常只需要一个分片) - const slices = sliceFile(lineFile, DEFAULT_CHUNK_SIZE); - const checkSum = await calculateSHA256(slices[0]); - - // 检查是否已取消(计算哈希后) - if (signal?.aborted) { - throw new Error("Upload cancelled"); - } - - const formData = new FormData(); - formData.append("file", slices[0]); - formData.append("reqId", reqId.toString()); - formData.append("fileNo", (index + 1).toString()); - formData.append("chunkNo", "1"); - formData.append("fileName", newFileName); - formData.append("fileSize", lineFile.size.toString()); - formData.append("totalChunkNum", "1"); - formData.append("checkSumHex", checkSum); - if (prefix !== undefined) { - formData.append("prefix", prefix); - } - - await uploadFn(formData, { - onUploadProgress: () => { - // 单行文件很小,进度主要用于追踪上传状态 - }, - }); - } - - /** - * 处理并发上传 - */ - async function processUploadQueue(): Promise { - let currentIndex = 0; - - async function runNext(): Promise { - if (currentIndex >= uploadQueue.length) return; - const task = uploadQueue[currentIndex++]; - await task; - await runNext(); - } - - const workers = Array(Math.min(maxConcurrency, uploadQueue.length)) - .fill(null) - .map(() => runNext()); - - await Promise.all(workers); - } - - // 逐块读取文件 + // 逐块读取文件并收集行 while (offset < fileSize) { // 检查是否已取消 if (signal?.aborted) { @@ -525,19 +461,12 @@ export async function streamSplitAndUpload( // 保留最后一行(可能不完整) buffer = lines.pop() || ""; - // 将完整行加入上传队列 + // 收集完整行 for (const line of lines) { if (signal?.aborted) { throw new Error("Upload cancelled"); } - - const currentLineIndex = uploadIndex++; - uploadQueue.push( - uploadLine(line, currentLineIndex).then(() => { - uploadedCount++; - onProgress?.(currentBytes, fileSize, uploadedCount); - }) - ); + pendingLines.push({ line, index: lineIndex++ }); } currentBytes = end; @@ -549,17 +478,110 @@ export async function streamSplitAndUpload( // 处理最后剩余的 buffer(如果文件不以换行符结尾) if (buffer.trim()) { - const currentLineIndex = uploadIndex++; - uploadQueue.push( - uploadLine(buffer, currentLineIndex).then(() => { - uploadedCount++; - onProgress?.(fileSize, fileSize, uploadedCount); - }) - ); + pendingLines.push({ line: buffer, index: lineIndex++ }); } - // 并发执行所有上传任务 - await processUploadQueue(); + /** + * 上传单行内容 + * fileNo 固定为 1(因为所有行都属于同一个原始文件,只是不同的分片/行) + * chunkNo 用于标识是第几行 + */ + async function uploadLine(line: string, index: number): Promise { + // 检查是否已取消 + if (signal?.aborted) { + throw new Error("Upload cancelled"); + } + + if (!line.trim()) { + skippedEmptyCount++; + return; + } + + // 保留原始文件扩展名 + const newFileName = `${baseName}_${String(index + 1).padStart(6, "0")}${fileExtension}`; + const blob = new Blob([line], { type: "text/plain" }); + const lineFile = new File([blob], newFileName, { type: "text/plain" }); + + // 计算分片(小文件通常只需要一个分片) + const slices = sliceFile(lineFile, DEFAULT_CHUNK_SIZE); + const checkSum = await calculateSHA256(slices[0]); + + // 检查是否已取消(计算哈希后) + if (signal?.aborted) { + throw new Error("Upload cancelled"); + } + + const formData = new FormData(); + formData.append("file", slices[0]); + formData.append("reqId", reqId.toString()); + // 所有行使用相同的 fileNo=1,因为它们属于同一个预上传请求 + // chunkNo 表示这是第几行数据 + formData.append("fileNo", "1"); + formData.append("chunkNo", (index + 1).toString()); + formData.append("fileName", newFileName); + formData.append("fileSize", lineFile.size.toString()); + formData.append("totalChunkNum", "1"); + formData.append("checkSumHex", checkSum); + if (prefix !== undefined) { + formData.append("prefix", prefix); + } + + await uploadFn(formData, { + onUploadProgress: () => { + // 单行文件很小,进度主要用于追踪上传状态 + }, + }); + } + + /** + * 带并发控制的上传队列执行器 + * 使用任务队列模式,确保不会同时启动所有上传任务 + */ + async function executeUploadsWithConcurrency(): Promise { + const lines = [...pendingLines]; + let currentIndex = 0; + let activeCount = 0; + let resolvedCount = 0; + + return new Promise((resolve, reject) => { + function tryStartNext() { + // 检查是否已完成 + if (resolvedCount >= lines.length) { + if (activeCount === 0) { + resolve(); + } + return; + } + + // 启动新的上传任务,直到达到最大并发数 + while (activeCount < maxConcurrency && currentIndex < lines.length) { + const { line, index } = lines[currentIndex++]; + activeCount++; + + uploadLine(line, index) + .then(() => { + uploadedCount++; + onProgress?.(fileSize, fileSize, uploadedCount); + }) + .catch((err) => { + reject(err); + }) + .finally(() => { + activeCount--; + resolvedCount++; + // 尝试启动下一个任务 + tryStartNext(); + }); + } + } + + // 开始执行 + tryStartNext(); + }); + } + + // 使用并发控制执行所有上传 + await executeUploadsWithConcurrency(); return { uploadedCount,