fix: 修复流式分割上传的三个问题

1. 实现真正的并发控制,避免同时产生大量请求
   - 使用任务队列模式,确保同时运行的任务不超过 maxConcurrency
   - 完成一个任务后才启动下一个,而不是一次性启动所有任务

2. 修复 API 错误(预上传请求不存在)
   - 所有分片使用相同的 fileNo=1(属于同一个预上传请求)
   - chunkNo 改为行号,表示第几行数据
   - 这是根本原因:之前每行都被当作不同文件,但只有第一个文件有有效的预上传请求

3. 保留原始文件扩展名
   - 正确提取并保留文件扩展名
   - 例如:132.txt → 132_000001.txt(而不是 132_000001)
This commit is contained in:
2026-02-04 15:06:02 +08:00
parent 17a62cd3c2
commit 7c7729434b

View File

@@ -431,81 +431,17 @@ export async function streamSplitAndUpload(
let skippedEmptyCount = 0; let skippedEmptyCount = 0;
let currentBytes = 0; let currentBytes = 0;
// 获取文件名基础部分 // 获取文件名基础部分和扩展名
const baseName = fileNamePrefix || file.name.replace(/\.[^/.]+$/, ""); const originalFileName = fileNamePrefix || file.name;
const lastDotIndex = originalFileName.lastIndexOf(".");
const baseName = lastDotIndex > 0 ? originalFileName.slice(0, lastDotIndex) : originalFileName;
const fileExtension = lastDotIndex > 0 ? originalFileName.slice(lastDotIndex) : "";
// 用于并发控制的队列 // 收集所有需要上传的行
const uploadQueue: Promise<void>[] = []; const pendingLines: { line: string; index: number }[] = [];
let uploadIndex = 0; let lineIndex = 0;
/** // 逐块读取文件并收集行
* 上传单行内容
*/
async function uploadLine(line: string, index: number): Promise<void> {
// 检查是否已取消
if (signal?.aborted) {
throw new Error("Upload cancelled");
}
if (!line.trim()) {
skippedEmptyCount++;
return;
}
const newFileName = `${baseName}_${String(index + 1).padStart(6, "0")}`;
const blob = new Blob([line], { type: "text/plain" });
const lineFile = new File([blob], newFileName, { type: "text/plain" });
// 计算分片(小文件通常只需要一个分片)
const slices = sliceFile(lineFile, DEFAULT_CHUNK_SIZE);
const checkSum = await calculateSHA256(slices[0]);
// 检查是否已取消(计算哈希后)
if (signal?.aborted) {
throw new Error("Upload cancelled");
}
const formData = new FormData();
formData.append("file", slices[0]);
formData.append("reqId", reqId.toString());
formData.append("fileNo", (index + 1).toString());
formData.append("chunkNo", "1");
formData.append("fileName", newFileName);
formData.append("fileSize", lineFile.size.toString());
formData.append("totalChunkNum", "1");
formData.append("checkSumHex", checkSum);
if (prefix !== undefined) {
formData.append("prefix", prefix);
}
await uploadFn(formData, {
onUploadProgress: () => {
// 单行文件很小,进度主要用于追踪上传状态
},
});
}
/**
* 处理并发上传
*/
async function processUploadQueue(): Promise<void> {
let currentIndex = 0;
async function runNext(): Promise<void> {
if (currentIndex >= uploadQueue.length) return;
const task = uploadQueue[currentIndex++];
await task;
await runNext();
}
const workers = Array(Math.min(maxConcurrency, uploadQueue.length))
.fill(null)
.map(() => runNext());
await Promise.all(workers);
}
// 逐块读取文件
while (offset < fileSize) { while (offset < fileSize) {
// 检查是否已取消 // 检查是否已取消
if (signal?.aborted) { if (signal?.aborted) {
@@ -525,19 +461,12 @@ export async function streamSplitAndUpload(
// 保留最后一行(可能不完整) // 保留最后一行(可能不完整)
buffer = lines.pop() || ""; buffer = lines.pop() || "";
// 完整行加入上传队列 // 收集完整行
for (const line of lines) { for (const line of lines) {
if (signal?.aborted) { if (signal?.aborted) {
throw new Error("Upload cancelled"); throw new Error("Upload cancelled");
} }
pendingLines.push({ line, index: lineIndex++ });
const currentLineIndex = uploadIndex++;
uploadQueue.push(
uploadLine(line, currentLineIndex).then(() => {
uploadedCount++;
onProgress?.(currentBytes, fileSize, uploadedCount);
})
);
} }
currentBytes = end; currentBytes = end;
@@ -549,17 +478,110 @@ export async function streamSplitAndUpload(
// 处理最后剩余的 buffer(如果文件不以换行符结尾) // 处理最后剩余的 buffer(如果文件不以换行符结尾)
if (buffer.trim()) { if (buffer.trim()) {
const currentLineIndex = uploadIndex++; pendingLines.push({ line: buffer, index: lineIndex++ });
uploadQueue.push( }
uploadLine(buffer, currentLineIndex).then(() => {
/**
* 上传单行内容
* fileNo 固定为 1(因为所有行都属于同一个原始文件,只是不同的分片/行)
* chunkNo 用于标识是第几行
*/
async function uploadLine(line: string, index: number): Promise<void> {
// 检查是否已取消
if (signal?.aborted) {
throw new Error("Upload cancelled");
}
if (!line.trim()) {
skippedEmptyCount++;
return;
}
// 保留原始文件扩展名
const newFileName = `${baseName}_${String(index + 1).padStart(6, "0")}${fileExtension}`;
const blob = new Blob([line], { type: "text/plain" });
const lineFile = new File([blob], newFileName, { type: "text/plain" });
// 计算分片(小文件通常只需要一个分片)
const slices = sliceFile(lineFile, DEFAULT_CHUNK_SIZE);
const checkSum = await calculateSHA256(slices[0]);
// 检查是否已取消(计算哈希后)
if (signal?.aborted) {
throw new Error("Upload cancelled");
}
const formData = new FormData();
formData.append("file", slices[0]);
formData.append("reqId", reqId.toString());
// 所有行使用相同的 fileNo=1,因为它们属于同一个预上传请求
// chunkNo 表示这是第几行数据
formData.append("fileNo", "1");
formData.append("chunkNo", (index + 1).toString());
formData.append("fileName", newFileName);
formData.append("fileSize", lineFile.size.toString());
formData.append("totalChunkNum", "1");
formData.append("checkSumHex", checkSum);
if (prefix !== undefined) {
formData.append("prefix", prefix);
}
await uploadFn(formData, {
onUploadProgress: () => {
// 单行文件很小,进度主要用于追踪上传状态
},
});
}
/**
* 带并发控制的上传队列执行器
* 使用任务队列模式,确保不会同时启动所有上传任务
*/
async function executeUploadsWithConcurrency(): Promise<void> {
const lines = [...pendingLines];
let currentIndex = 0;
let activeCount = 0;
let resolvedCount = 0;
return new Promise((resolve, reject) => {
function tryStartNext() {
// 检查是否已完成
if (resolvedCount >= lines.length) {
if (activeCount === 0) {
resolve();
}
return;
}
// 启动新的上传任务,直到达到最大并发数
while (activeCount < maxConcurrency && currentIndex < lines.length) {
const { line, index } = lines[currentIndex++];
activeCount++;
uploadLine(line, index)
.then(() => {
uploadedCount++; uploadedCount++;
onProgress?.(fileSize, fileSize, uploadedCount); onProgress?.(fileSize, fileSize, uploadedCount);
}) })
); .catch((err) => {
reject(err);
})
.finally(() => {
activeCount--;
resolvedCount++;
// 尝试启动下一个任务
tryStartNext();
});
}
} }
// 并发执行所有上传任务 // 开始执行
await processUploadQueue(); tryStartNext();
});
}
// 使用并发控制执行所有上传
await executeUploadsWithConcurrency();
return { return {
uploadedCount, uploadedCount,