You've already forked DataMate
feat(upload): 实现流式分割上传,优化大文件上传体验
实现边分割边上传的流式处理,避免大文件一次性加载导致前端卡顿。 修改内容: 1. file.util.ts - 流式分割上传核心功能 - 新增 streamSplitAndUpload 函数,实现边分割边上传 - 新增 shouldStreamUpload 函数,判断是否使用流式上传 - 新增 StreamUploadOptions 和 StreamUploadResult 接口 - 优化分片大小(默认 5MB) 2. ImportConfiguration.tsx - 智能上传策略 - 大文件(>5MB)使用流式分割上传 - 小文件(≤5MB)使用传统分割方式 - 保持 UI 不变 3. useSliceUpload.tsx - 流式上传处理 - 新增 handleStreamUpload 处理流式上传事件 - 支持并发上传和更好的进度管理 4. TaskUpload.tsx - 进度显示优化 - 注册流式上传事件监听器 - 显示流式上传信息(已上传行数、当前文件等) 5. dataset.model.ts - 类型定义扩展 - 新增 StreamUploadInfo 接口 - TaskItem 接口添加 streamUploadInfo 和 prefix 字段 实现特点: - 流式读取:使用 Blob.slice 逐块读取,避免一次性加载 - 逐行检测:按换行符分割,形成完整行后立即上传 - 内存优化:buffer 只保留当前块和未完成行,不累积所有分割结果 - 并发控制:支持 3 个并发上传,提升效率 - 进度可见:实时显示已上传行数和总体进度 - 错误处理:单个文件上传失败不影响其他文件 - 向后兼容:小文件仍使用原有分割方式 优势: - 大文件上传不再卡顿,用户体验大幅提升 - 内存占用显著降低(从加载整个文件到只保留当前块) - 上传效率提升(边分割边上传,并发上传多个小文件) 相关文件: - frontend/src/utils/file.util.ts - frontend/src/pages/DataManagement/Detail/components/ImportConfiguration.tsx - frontend/src/hooks/useSliceUpload.tsx - frontend/src/pages/Layout/TaskUpload.tsx - frontend/src/pages/DataManagement/dataset.model.ts
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import { TaskItem } from "@/pages/DataManagement/dataset.model";
|
||||
import { calculateSHA256, checkIsFilesExist } from "@/utils/file.util";
|
||||
import { calculateSHA256, checkIsFilesExist, streamSplitAndUpload, StreamUploadResult } from "@/utils/file.util";
|
||||
import { App } from "antd";
|
||||
import { useRef, useState } from "react";
|
||||
|
||||
@@ -9,17 +9,18 @@ export function useFileSliceUpload(
|
||||
uploadChunk,
|
||||
cancelUpload,
|
||||
}: {
|
||||
preUpload: (id: string, params: any) => Promise<{ data: number }>;
|
||||
uploadChunk: (id: string, formData: FormData, config: any) => Promise<any>;
|
||||
cancelUpload: ((reqId: number) => Promise<any>) | null;
|
||||
preUpload: (id: string, params: Record<string, unknown>) => Promise<{ data: number }>;
|
||||
uploadChunk: (id: string, formData: FormData, config: Record<string, unknown>) => Promise<unknown>;
|
||||
cancelUpload: ((reqId: number) => Promise<unknown>) | null;
|
||||
},
|
||||
showTaskCenter = true // 上传时是否显示任务中心
|
||||
showTaskCenter = true, // 上传时是否显示任务中心
|
||||
enableStreamUpload = true // 是否启用流式分割上传
|
||||
) {
|
||||
const { message } = App.useApp();
|
||||
const [taskList, setTaskList] = useState<TaskItem[]>([]);
|
||||
const taskListRef = useRef<TaskItem[]>([]); // 用于固定任务顺序
|
||||
|
||||
const createTask = (detail: any = {}) => {
|
||||
const createTask = (detail: Record<string, unknown> = {}) => {
|
||||
const { dataset } = detail;
|
||||
const title = `上传数据集: ${dataset.name} `;
|
||||
const controller = new AbortController();
|
||||
@@ -68,7 +69,7 @@ export function useFileSliceUpload(
|
||||
// 携带前缀信息,便于刷新后仍停留在当前目录
|
||||
window.dispatchEvent(
|
||||
new CustomEvent(task.updateEvent, {
|
||||
detail: { prefix: (task as any).prefix },
|
||||
detail: { prefix: task.prefix },
|
||||
})
|
||||
);
|
||||
}
|
||||
@@ -79,7 +80,7 @@ export function useFileSliceUpload(
|
||||
}
|
||||
};
|
||||
|
||||
async function buildFormData({ file, reqId, i, j }) {
|
||||
async function buildFormData({ file, reqId, i, j }: { file: { slices: Blob[]; name: string; size: number }; reqId: number; i: number; j: number }) {
|
||||
const formData = new FormData();
|
||||
const { slices, name, size } = file;
|
||||
const checkSum = await calculateSHA256(slices[j]);
|
||||
@@ -94,7 +95,7 @@ export function useFileSliceUpload(
|
||||
return formData;
|
||||
}
|
||||
|
||||
async function uploadSlice(task: TaskItem, fileInfo) {
|
||||
async function uploadSlice(task: TaskItem, fileInfo: { loaded: number; i: number; j: number; files: { slices: Blob[]; name: string; size: number }[]; totalSize: number }) {
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
@@ -124,7 +125,7 @@ export function useFileSliceUpload(
|
||||
});
|
||||
}
|
||||
|
||||
async function uploadFile({ task, files, totalSize }) {
|
||||
async function uploadFile({ task, files, totalSize }: { task: TaskItem; files: { slices: Blob[]; name: string; size: number; originFile: Blob }[]; totalSize: number }) {
|
||||
console.log('[useSliceUpload] Calling preUpload with prefix:', task.prefix);
|
||||
const { data: reqId } = await preUpload(task.key, {
|
||||
totalFileNum: files.length,
|
||||
@@ -167,7 +168,7 @@ export function useFileSliceUpload(
|
||||
removeTask(newTask);
|
||||
}
|
||||
|
||||
const handleUpload = async ({ task, files }) => {
|
||||
const handleUpload = async ({ task, files }: { task: TaskItem; files: { slices: Blob[]; name: string; size: number; originFile: Blob }[] }) => {
|
||||
const isErrorFile = await checkIsFilesExist(files);
|
||||
if (isErrorFile) {
|
||||
message.error("文件被修改或删除,请重新选择文件上传");
|
||||
@@ -193,10 +194,155 @@ export function useFileSliceUpload(
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* 流式分割上传处理
|
||||
* 用于大文件按行分割并立即上传的场景
|
||||
*/
|
||||
const handleStreamUpload = async ({ task, files }: { task: TaskItem; files: File[] }) => {
|
||||
try {
|
||||
console.log('[useSliceUpload] Starting stream upload for', files.length, 'files');
|
||||
|
||||
// 预上传,获取 reqId
|
||||
const totalSize = files.reduce((acc, file) => acc + file.size, 0);
|
||||
const { data: reqId } = await preUpload(task.key, {
|
||||
totalFileNum: -1, // 流式上传,文件数量不确定
|
||||
totalSize,
|
||||
datasetId: task.key,
|
||||
hasArchive: task.hasArchive,
|
||||
prefix: task.prefix,
|
||||
});
|
||||
|
||||
console.log('[useSliceUpload] Stream upload preUpload response reqId:', reqId);
|
||||
|
||||
const newTask: TaskItem = {
|
||||
...task,
|
||||
reqId,
|
||||
isCancel: false,
|
||||
cancelFn: () => {
|
||||
task.controller.abort();
|
||||
cancelUpload?.(reqId);
|
||||
if (task.updateEvent) window.dispatchEvent(new Event(task.updateEvent));
|
||||
},
|
||||
};
|
||||
updateTaskList(newTask);
|
||||
|
||||
let totalUploadedLines = 0;
|
||||
let totalProcessedBytes = 0;
|
||||
const results: StreamUploadResult[] = [];
|
||||
|
||||
// 逐个处理文件
|
||||
for (let i = 0; i < files.length; i++) {
|
||||
const file = files[i];
|
||||
console.log(`[useSliceUpload] Processing file ${i + 1}/${files.length}: ${file.name}`);
|
||||
|
||||
const result = await streamSplitAndUpload(
|
||||
file,
|
||||
(formData, config) => uploadChunk(task.key, formData, config),
|
||||
(currentBytes, totalBytes, uploadedLines) => {
|
||||
// 更新进度
|
||||
const overallBytes = totalProcessedBytes + currentBytes;
|
||||
const curPercent = Number((overallBytes / totalSize) * 100).toFixed(2);
|
||||
|
||||
const updatedTask: TaskItem = {
|
||||
...newTask,
|
||||
...taskListRef.current.find((item) => item.key === task.key),
|
||||
size: overallBytes,
|
||||
percent: curPercent >= 100 ? 99.99 : curPercent,
|
||||
streamUploadInfo: {
|
||||
currentFile: file.name,
|
||||
fileIndex: i + 1,
|
||||
totalFiles: files.length,
|
||||
uploadedLines: totalUploadedLines + uploadedLines,
|
||||
},
|
||||
};
|
||||
updateTaskList(updatedTask);
|
||||
},
|
||||
1024 * 1024, // 1MB chunk size
|
||||
{
|
||||
reqId,
|
||||
hasArchive: task.hasArchive,
|
||||
prefix: task.prefix,
|
||||
signal: task.controller.signal,
|
||||
maxConcurrency: 3,
|
||||
}
|
||||
);
|
||||
|
||||
results.push(result);
|
||||
totalUploadedLines += result.uploadedCount;
|
||||
totalProcessedBytes += file.size;
|
||||
|
||||
console.log(`[useSliceUpload] File ${file.name} processed, uploaded ${result.uploadedCount} lines`);
|
||||
}
|
||||
|
||||
console.log('[useSliceUpload] Stream upload completed, total lines:', totalUploadedLines);
|
||||
removeTask(newTask);
|
||||
|
||||
message.success(`成功上传 ${totalUploadedLines} 个文件(按行分割)`);
|
||||
} catch (err) {
|
||||
console.error('[useSliceUpload] Stream upload error:', err);
|
||||
if (err.message === "Upload cancelled") {
|
||||
message.info("上传已取消");
|
||||
} else {
|
||||
message.error("文件上传失败,请稍后重试");
|
||||
}
|
||||
removeTask({
|
||||
...task,
|
||||
isCancel: true,
|
||||
...taskListRef.current.find((item) => item.key === task.key),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* 注册流式上传事件监听
|
||||
* 返回注销函数
|
||||
*/
|
||||
const registerStreamUploadListener = () => {
|
||||
if (!enableStreamUpload) return () => {};
|
||||
|
||||
const streamUploadHandler = async (e: Event) => {
|
||||
const customEvent = e as CustomEvent;
|
||||
const { dataset, files, updateEvent, hasArchive, prefix } = customEvent.detail;
|
||||
|
||||
const controller = new AbortController();
|
||||
const task: TaskItem = {
|
||||
key: dataset.id,
|
||||
title: `上传数据集: ${dataset.name} (按行分割)`,
|
||||
percent: 0,
|
||||
reqId: -1,
|
||||
controller,
|
||||
size: 0,
|
||||
updateEvent,
|
||||
hasArchive,
|
||||
prefix,
|
||||
};
|
||||
|
||||
taskListRef.current = [task, ...taskListRef.current];
|
||||
setTaskList(taskListRef.current);
|
||||
|
||||
// 显示任务中心
|
||||
if (showTaskCenter) {
|
||||
window.dispatchEvent(
|
||||
new CustomEvent("show:task-popover", { detail: { show: true } })
|
||||
);
|
||||
}
|
||||
|
||||
await handleStreamUpload({ task, files });
|
||||
};
|
||||
|
||||
window.addEventListener("upload:dataset-stream", streamUploadHandler);
|
||||
|
||||
return () => {
|
||||
window.removeEventListener("upload:dataset-stream", streamUploadHandler);
|
||||
};
|
||||
};
|
||||
|
||||
return {
|
||||
taskList,
|
||||
createTask,
|
||||
removeTask,
|
||||
handleUpload,
|
||||
handleStreamUpload,
|
||||
registerStreamUploadListener,
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user