refactor(upload): 重构切片上传逻辑支持动态请求ID解析

- 移除预先批量获取reqId的方式,改为按需解析
- 新增resolveReqId函数支持动态获取请求ID
- 添加onReqIdResolved回调处理ID解析完成事件
- 改进文件按行切片上传,每行作为独立文件处理
- 优化空行跳过逻辑,统计跳过的空行数量
- 修复fileNo和chunkNo的对应关系
- 更新streamSplitAndUpload参数结构
This commit is contained in:
2026-02-04 15:58:58 +08:00
parent 078f303f57
commit 8415166949
2 changed files with 66 additions and 27 deletions

View File

@@ -251,18 +251,6 @@ export function useFileSliceUpload(
const file = files[i]; const file = files[i];
console.log(`[useSliceUpload] Processing file ${i + 1}/${files.length}: ${file.name}`); console.log(`[useSliceUpload] Processing file ${i + 1}/${files.length}: ${file.name}`);
// 为每个文件单独调用 preUpload,获取独立的 reqId
const { data: reqId } = await preUpload(task.key, {
totalFileNum: 1,
totalSize: file.size,
datasetId: task.key,
hasArchive: task.hasArchive,
prefix: task.prefix,
});
console.log(`[useSliceUpload] File ${file.name} preUpload response reqId:`, reqId);
reqIds.push(reqId);
const result = await streamSplitAndUpload( const result = await streamSplitAndUpload(
file, file,
(formData, config) => uploadChunk(task.key, formData, { (formData, config) => uploadChunk(task.key, formData, {
@@ -292,10 +280,21 @@ export function useFileSliceUpload(
}, },
}; };
updateTaskList(updatedTask); updateTaskList(updatedTask);
}, },
1024 * 1024, // 1MB chunk size 1024 * 1024, // 1MB chunk size
{ {
reqId, resolveReqId: async ({ totalFileNum, totalSize }) => {
const { data: reqId } = await preUpload(task.key, {
totalFileNum,
totalSize,
datasetId: task.key,
hasArchive: task.hasArchive,
prefix: task.prefix,
});
console.log(`[useSliceUpload] File ${file.name} preUpload response reqId:`, reqId);
reqIds.push(reqId);
return reqId;
},
hasArchive: newTask.hasArchive, hasArchive: newTask.hasArchive,
prefix: newTask.prefix, prefix: newTask.prefix,
signal: newTask.controller.signal, signal: newTask.controller.signal,

View File

@@ -401,7 +401,9 @@ export function readFileAsText(
* @returns 上传结果统计 * @returns 上传结果统计
*/ */
export interface StreamUploadOptions { export interface StreamUploadOptions {
reqId: number; reqId?: number;
resolveReqId?: (params: { totalFileNum: number; totalSize: number }) => Promise<number>;
onReqIdResolved?: (reqId: number) => void;
fileNamePrefix?: string; fileNamePrefix?: string;
hasArchive?: boolean; hasArchive?: boolean;
prefix?: string; prefix?: string;
@@ -422,8 +424,16 @@ export async function streamSplitAndUpload(
chunkSize: number = 1024 * 1024, // 1MB chunkSize: number = 1024 * 1024, // 1MB
options: StreamUploadOptions options: StreamUploadOptions
): Promise<StreamUploadResult> { ): Promise<StreamUploadResult> {
const { reqId, fileNamePrefix, prefix, signal, maxConcurrency = 3 } = options; const {
reqId: initialReqId,
resolveReqId,
onReqIdResolved,
fileNamePrefix,
prefix,
signal,
maxConcurrency = 3,
} = options;
const fileSize = file.size; const fileSize = file.size;
let offset = 0; let offset = 0;
let buffer = ""; let buffer = "";
@@ -441,7 +451,7 @@ export async function streamSplitAndUpload(
const pendingLines: { line: string; index: number }[] = []; const pendingLines: { line: string; index: number }[] = [];
let lineIndex = 0; let lineIndex = 0;
// 逐块读取文件并收集行 // 逐块读取文件并收集非空
while (offset < fileSize) { while (offset < fileSize) {
// 检查是否已取消 // 检查是否已取消
if (signal?.aborted) { if (signal?.aborted) {
@@ -461,11 +471,15 @@ export async function streamSplitAndUpload(
// 保留最后一行(可能不完整) // 保留最后一行(可能不完整)
buffer = lines.pop() || ""; buffer = lines.pop() || "";
// 收集完整行 // 收集完整行(跳过空行)
for (const line of lines) { for (const line of lines) {
if (signal?.aborted) { if (signal?.aborted) {
throw new Error("Upload cancelled"); throw new Error("Upload cancelled");
} }
if (!line.trim()) {
skippedEmptyCount++;
continue;
}
pendingLines.push({ line, index: lineIndex++ }); pendingLines.push({ line, index: lineIndex++ });
} }
@@ -479,12 +493,38 @@ export async function streamSplitAndUpload(
// 处理最后剩余的 buffer(如果文件不以换行符结尾) // 处理最后剩余的 buffer(如果文件不以换行符结尾)
if (buffer.trim()) { if (buffer.trim()) {
pendingLines.push({ line: buffer, index: lineIndex++ }); pendingLines.push({ line: buffer, index: lineIndex++ });
} else if (buffer.length > 0) {
skippedEmptyCount++;
}
const totalFileNum = pendingLines.length;
if (totalFileNum === 0) {
return {
uploadedCount: 0,
totalBytes: fileSize,
skippedEmptyCount,
};
}
if (signal?.aborted) {
throw new Error("Upload cancelled");
}
let resolvedReqId = initialReqId;
if (!resolvedReqId) {
if (!resolveReqId) {
throw new Error("Missing pre-upload request id");
}
resolvedReqId = await resolveReqId({ totalFileNum, totalSize: fileSize });
if (!resolvedReqId) {
throw new Error("Failed to resolve pre-upload request id");
}
onReqIdResolved?.(resolvedReqId);
} }
/** /**
* 上传单行内容 * 上传单行内容
* fileNo 固定为 1(因为所有行都属于同一个原始文件,只是不同的分片/行) * 每行作为独立文件上传,fileNo 对应行序号,chunkNo 固定为 1
* chunkNo 用于标识是第几行
*/ */
async function uploadLine(line: string, index: number): Promise<void> { async function uploadLine(line: string, index: number): Promise<void> {
// 检查是否已取消 // 检查是否已取消
@@ -498,7 +538,8 @@ export async function streamSplitAndUpload(
} }
// 保留原始文件扩展名 // 保留原始文件扩展名
const newFileName = `${baseName}_${String(index + 1).padStart(6, "0")}${fileExtension}`; const fileIndex = index + 1;
const newFileName = `${baseName}_${String(fileIndex).padStart(6, "0")}${fileExtension}`;
const blob = new Blob([line], { type: "text/plain" }); const blob = new Blob([line], { type: "text/plain" });
const lineFile = new File([blob], newFileName, { type: "text/plain" }); const lineFile = new File([blob], newFileName, { type: "text/plain" });
@@ -513,11 +554,10 @@ export async function streamSplitAndUpload(
const formData = new FormData(); const formData = new FormData();
formData.append("file", slices[0]); formData.append("file", slices[0]);
formData.append("reqId", reqId.toString()); formData.append("reqId", resolvedReqId.toString());
// 所有行使用相同的 fileNo=1,因为它们属于同一个预上传请求 // 每行作为独立文件上传
// chunkNo 表示这是第几行数据 formData.append("fileNo", fileIndex.toString());
formData.append("fileNo", "1"); formData.append("chunkNo", "1");
formData.append("chunkNo", (index + 1).toString());
formData.append("fileName", newFileName); formData.append("fileName", newFileName);
formData.append("fileSize", lineFile.size.toString()); formData.append("fileSize", lineFile.size.toString());
formData.append("totalChunkNum", "1"); formData.append("totalChunkNum", "1");