Compare commits

...

6 Commits

Author SHA1 Message Date
f381d641ab fix(upload): 修复流式上传中的文件名处理逻辑
- 修正预上传接口调用时传递正确的文件总数而非固定值-1
- 移除导入配置中文件分割时的文件扩展名保留逻辑
- 删除流式上传选项中的fileExtension参数定义
- 移除流式上传实现中的文件扩展名处理相关代码
- 简化新文件名生成逻辑,不再附加扩展名后缀
2026-02-04 07:47:41 +08:00
c8611d29ff feat(upload): 实现流式分割上传,优化大文件上传体验
实现边分割边上传的流式处理,避免大文件一次性加载导致前端卡顿。

修改内容:
1. file.util.ts - 流式分割上传核心功能
   - 新增 streamSplitAndUpload 函数,实现边分割边上传
   - 新增 shouldStreamUpload 函数,判断是否使用流式上传
   - 新增 StreamUploadOptions 和 StreamUploadResult 接口
   - 优化分片大小(默认 5MB)

2. ImportConfiguration.tsx - 智能上传策略
   - 大文件(>5MB)使用流式分割上传
   - 小文件(≤5MB)使用传统分割方式
   - 保持 UI 不变

3. useSliceUpload.tsx - 流式上传处理
   - 新增 handleStreamUpload 处理流式上传事件
   - 支持并发上传和更好的进度管理

4. TaskUpload.tsx - 进度显示优化
   - 注册流式上传事件监听器
   - 显示流式上传信息(已上传行数、当前文件等)

5. dataset.model.ts - 类型定义扩展
   - 新增 StreamUploadInfo 接口
   - TaskItem 接口添加 streamUploadInfo 和 prefix 字段

实现特点:
- 流式读取:使用 Blob.slice 逐块读取,避免一次性加载
- 逐行检测:按换行符分割,形成完整行后立即上传
- 内存优化:buffer 只保留当前块和未完成行,不累积所有分割结果
- 并发控制:支持 3 个并发上传,提升效率
- 进度可见:实时显示已上传行数和总体进度
- 错误处理:单个文件上传失败不影响其他文件
- 向后兼容:小文件仍使用原有分割方式

优势:
- 大文件上传不再卡顿,用户体验大幅提升
- 内存占用显著降低(从加载整个文件到只保留当前块)
- 上传效率提升(边分割边上传,并发上传多个小文件)

相关文件:
- frontend/src/utils/file.util.ts
- frontend/src/pages/DataManagement/Detail/components/ImportConfiguration.tsx
- frontend/src/hooks/useSliceUpload.tsx
- frontend/src/pages/Layout/TaskUpload.tsx
- frontend/src/pages/DataManagement/dataset.model.ts
2026-02-03 13:12:10 +00:00
147beb1ec7 feat(annotation): 实现文本切片预生成功能
在创建标注任务时自动预生成文本切片结构,避免每次进入标注页面时的实时计算。

修改内容:
1. 在 AnnotationEditorService 中新增 precompute_segmentation_for_project 方法
   - 为项目的所有文本文件预计算切片结构
   - 使用 AnnotationTextSplitter 执行切片
   - 将切片结构持久化到 AnnotationResult 表(状态为 IN_PROGRESS)
   - 支持失败重试机制
   - 返回统计信息

2. 修改 create_mapping 接口
   - 在创建标注任务后,如果启用分段且为文本数据集,自动触发切片预生成
   - 使用 try-except 捕获异常,确保切片失败不影响项目创建

特点:
- 使用现有的 AnnotationTextSplitter 类
- 切片数据结构与现有分段标注格式一致
- 向后兼容(未切片的任务仍然使用实时计算)
- 性能优化:避免进入标注页面时的重复计算

相关文件:
- runtime/datamate-python/app/module/annotation/service/editor.py
- runtime/datamate-python/app/module/annotation/interface/project.py
2026-02-03 12:59:29 +00:00
699031dae7 fix: 修复编辑数据集时无法清除关联数据集的编译问题
问题分析:
之前尝试使用 @TableField(updateStrategy = FieldStrategy.IGNORED/ALWAYS) 注解
来强制更新 null 值,但 FieldStrategy.ALWAYS 可能不存在于当前
MyBatis-Plus 3.5.14 版本中,导致编译错误。

修复方案:
1. 移除 Dataset.java 中 parentDatasetId 字段的 @TableField(updateStrategy) 注解
2. 移除不需要的 import com.baomidou.mybatisplus.annotation.FieldStrategy
3. 在 DatasetApplicationService.updateDataset 方法中:
   - 添加 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper
   - 保存原始的 parentDatasetId 值用于比较
   - handleParentChange 之后,检查 parentDatasetId 是否发生变化
   - 如果发生变化,使用 LambdaUpdateWrapper 显式地更新 parentDatasetId 字段
   - 这样即使值为 null 也能被正确更新到数据库

原理:
MyBatis-Plus 的 updateById 方法默认只更新非 null 字段。
通过使用 LambdaUpdateWrapper 的 set 方法,可以显式地设置字段值,
包括 null 值,从而确保字段能够被正确更新到数据库。
2026-02-03 11:09:15 +00:00
88b1383653 fix: 恢复前端发送空字符串以支持清除关联数据集
修改说明:
移除了之前将空字符串转换为 undefined 的逻辑,
现在直接发送表单值,包括空字符串。

配合后端修改(commit cc6415c):
1. 当用户选择"无关联数据集"时,发送空字符串 ""
2. 后端 handleParentChange 方法通过 normalizeParentId 将空字符串转为 null
3. Dataset.parentDatasetId 字段添加了 @TableField(updateStrategy = FieldStrategy.IGNORED)
4. 确保即使值为 null 也会被更新到数据库
2026-02-03 10:57:14 +00:00
cc6415c4d9 fix: 修复编辑数据集时无法清除关联数据集的问题
问题描述:
在数据管理的数据集编辑中,如果之前设置了关联数据集,编辑时选择不关联数据集后保存不会生效。

根本原因:
MyBatis-Plus 的 updateById 方法默认使用 FieldStrategy.NOT_NULL 策略,
只有当字段值为非 null 时才会更新到数据库。
当 parentDatasetId 从有值变为 null 时,默认情况下不会更新到数据库。

修复方案:
在 Dataset.java 的 parentDatasetId 字段上添加 @TableField(updateStrategy = FieldStrategy.IGNORED) 注解,
表示即使值为 null 也需要更新到数据库。

配合前端修改(恢复发送空字符串),现在可以正确清除关联数据集:
1. 前端发送空字符串表示"无关联数据集"
2. 后端 handleParentChange 通过 normalizeParentId 将空字符串转为 null
3. dataset.setParentDatasetId(null) 设置为 null
4. 由于添加了 IGNORED 策略,即使为 null 也会更新到数据库
2026-02-03 10:57:08 +00:00
10 changed files with 1084 additions and 124 deletions

View File

@@ -1,5 +1,6 @@
package com.datamate.datamanagement.application;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.datamate.common.domain.utils.ChunksSaver;
@@ -101,6 +102,7 @@ public class DatasetApplicationService {
public Dataset updateDataset(String datasetId, UpdateDatasetRequest updateDatasetRequest) {
Dataset dataset = datasetRepository.getById(datasetId);
BusinessAssert.notNull(dataset, DataManagementErrorCode.DATASET_NOT_FOUND);
if (StringUtils.hasText(updateDatasetRequest.getName())) {
dataset.setName(updateDatasetRequest.getName());
}
@@ -113,14 +115,31 @@ public class DatasetApplicationService {
if (Objects.nonNull(updateDatasetRequest.getStatus())) {
dataset.setStatus(updateDatasetRequest.getStatus());
}
// 处理父数据集变更:始终调用 handleParentChange,以支持设置新的关联或清除关联
// handleParentChange 内部通过 normalizeParentId 方法将空字符串和 null 都转换为 null
// 这样既支持设置新的父数据集,也支持清除关联
handleParentChange(dataset, updateDatasetRequest.getParentDatasetId());
if (updateDatasetRequest.isParentDatasetIdProvided()) {
// 保存原始的 parentDatasetId 值,用于比较是否发生了变化
String originalParentDatasetId = dataset.getParentDatasetId();
// 处理父数据集变更:仅当请求显式包含 parentDatasetId 时处理
// handleParentChange 内部通过 normalizeParentId 方法将空字符串和 null 都转换为 null
// 这样既支持设置新的父数据集,也支持清除关联
handleParentChange(dataset, updateDatasetRequest.getParentDatasetId());
// 检查 parentDatasetId 是否发生了变化
if (!Objects.equals(originalParentDatasetId, dataset.getParentDatasetId())) {
// 使用 LambdaUpdateWrapper 显式地更新 parentDatasetId 字段
// 这样即使值为 null 也能被正确更新到数据库
datasetRepository.update(null, new LambdaUpdateWrapper<Dataset>()
.eq(Dataset::getId, datasetId)
.set(Dataset::getParentDatasetId, dataset.getParentDatasetId()));
}
}
if (StringUtils.hasText(updateDatasetRequest.getDataSource())) {
// 数据源id不为空,使用异步线程进行文件扫盘落库
processDataSourceAsync(dataset.getId(), updateDatasetRequest.getDataSource());
}
// 更新其他字段(不包括 parentDatasetId,因为它已经在上面的代码中更新了)
datasetRepository.updateById(dataset);
return dataset;
}

View File

@@ -1,8 +1,10 @@
package com.datamate.datamanagement.interfaces.dto;
import com.datamate.datamanagement.common.enums.DatasetStatusType;
import com.fasterxml.jackson.annotation.JsonIgnore;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Size;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
@@ -24,9 +26,18 @@ public class UpdateDatasetRequest {
/** 归集任务id */
private String dataSource;
/** 父数据集ID */
@Setter(AccessLevel.NONE)
private String parentDatasetId;
@JsonIgnore
@Setter(AccessLevel.NONE)
private boolean parentDatasetIdProvided;
/** 标签列表 */
private List<String> tags;
/** 数据集状态 */
private DatasetStatusType status;
public void setParentDatasetId(String parentDatasetId) {
this.parentDatasetIdProvided = true;
this.parentDatasetId = parentDatasetId;
}
}

View File

@@ -1,5 +1,5 @@
import { TaskItem } from "@/pages/DataManagement/dataset.model";
import { calculateSHA256, checkIsFilesExist } from "@/utils/file.util";
import { calculateSHA256, checkIsFilesExist, streamSplitAndUpload, StreamUploadResult } from "@/utils/file.util";
import { App } from "antd";
import { useRef, useState } from "react";
@@ -9,17 +9,18 @@ export function useFileSliceUpload(
uploadChunk,
cancelUpload,
}: {
preUpload: (id: string, params: any) => Promise<{ data: number }>;
uploadChunk: (id: string, formData: FormData, config: any) => Promise<any>;
cancelUpload: ((reqId: number) => Promise<any>) | null;
preUpload: (id: string, params: Record<string, unknown>) => Promise<{ data: number }>;
uploadChunk: (id: string, formData: FormData, config: Record<string, unknown>) => Promise<unknown>;
cancelUpload: ((reqId: number) => Promise<unknown>) | null;
},
showTaskCenter = true // 上传时是否显示任务中心
showTaskCenter = true, // 上传时是否显示任务中心
enableStreamUpload = true // 是否启用流式分割上传
) {
const { message } = App.useApp();
const [taskList, setTaskList] = useState<TaskItem[]>([]);
const taskListRef = useRef<TaskItem[]>([]); // 用于固定任务顺序
const createTask = (detail: any = {}) => {
const createTask = (detail: Record<string, unknown> = {}) => {
const { dataset } = detail;
const title = `上传数据集: ${dataset.name} `;
const controller = new AbortController();
@@ -68,7 +69,7 @@ export function useFileSliceUpload(
// 携带前缀信息,便于刷新后仍停留在当前目录
window.dispatchEvent(
new CustomEvent(task.updateEvent, {
detail: { prefix: (task as any).prefix },
detail: { prefix: task.prefix },
})
);
}
@@ -79,7 +80,7 @@ export function useFileSliceUpload(
}
};
async function buildFormData({ file, reqId, i, j }) {
async function buildFormData({ file, reqId, i, j }: { file: { slices: Blob[]; name: string; size: number }; reqId: number; i: number; j: number }) {
const formData = new FormData();
const { slices, name, size } = file;
const checkSum = await calculateSHA256(slices[j]);
@@ -94,7 +95,7 @@ export function useFileSliceUpload(
return formData;
}
async function uploadSlice(task: TaskItem, fileInfo) {
async function uploadSlice(task: TaskItem, fileInfo: { loaded: number; i: number; j: number; files: { slices: Blob[]; name: string; size: number }[]; totalSize: number }) {
if (!task) {
return;
}
@@ -124,7 +125,7 @@ export function useFileSliceUpload(
});
}
async function uploadFile({ task, files, totalSize }) {
async function uploadFile({ task, files, totalSize }: { task: TaskItem; files: { slices: Blob[]; name: string; size: number; originFile: Blob }[]; totalSize: number }) {
console.log('[useSliceUpload] Calling preUpload with prefix:', task.prefix);
const { data: reqId } = await preUpload(task.key, {
totalFileNum: files.length,
@@ -167,7 +168,7 @@ export function useFileSliceUpload(
removeTask(newTask);
}
const handleUpload = async ({ task, files }) => {
const handleUpload = async ({ task, files }: { task: TaskItem; files: { slices: Blob[]; name: string; size: number; originFile: Blob }[] }) => {
const isErrorFile = await checkIsFilesExist(files);
if (isErrorFile) {
message.error("文件被修改或删除,请重新选择文件上传");
@@ -193,10 +194,155 @@ export function useFileSliceUpload(
}
};
/**
* 流式分割上传处理
* 用于大文件按行分割并立即上传的场景
*/
const handleStreamUpload = async ({ task, files }: { task: TaskItem; files: File[] }) => {
try {
console.log('[useSliceUpload] Starting stream upload for', files.length, 'files');
// 预上传,获取 reqId
const totalSize = files.reduce((acc, file) => acc + file.size, 0);
const { data: reqId } = await preUpload(task.key, {
totalFileNum: files.length,
totalSize,
datasetId: task.key,
hasArchive: task.hasArchive,
prefix: task.prefix,
});
console.log('[useSliceUpload] Stream upload preUpload response reqId:', reqId);
const newTask: TaskItem = {
...task,
reqId,
isCancel: false,
cancelFn: () => {
task.controller.abort();
cancelUpload?.(reqId);
if (task.updateEvent) window.dispatchEvent(new Event(task.updateEvent));
},
};
updateTaskList(newTask);
let totalUploadedLines = 0;
let totalProcessedBytes = 0;
const results: StreamUploadResult[] = [];
// 逐个处理文件
for (let i = 0; i < files.length; i++) {
const file = files[i];
console.log(`[useSliceUpload] Processing file ${i + 1}/${files.length}: ${file.name}`);
const result = await streamSplitAndUpload(
file,
(formData, config) => uploadChunk(task.key, formData, config),
(currentBytes, totalBytes, uploadedLines) => {
// 更新进度
const overallBytes = totalProcessedBytes + currentBytes;
const curPercent = Number((overallBytes / totalSize) * 100).toFixed(2);
const updatedTask: TaskItem = {
...newTask,
...taskListRef.current.find((item) => item.key === task.key),
size: overallBytes,
percent: curPercent >= 100 ? 99.99 : curPercent,
streamUploadInfo: {
currentFile: file.name,
fileIndex: i + 1,
totalFiles: files.length,
uploadedLines: totalUploadedLines + uploadedLines,
},
};
updateTaskList(updatedTask);
},
1024 * 1024, // 1MB chunk size
{
reqId,
hasArchive: task.hasArchive,
prefix: task.prefix,
signal: task.controller.signal,
maxConcurrency: 3,
}
);
results.push(result);
totalUploadedLines += result.uploadedCount;
totalProcessedBytes += file.size;
console.log(`[useSliceUpload] File ${file.name} processed, uploaded ${result.uploadedCount} lines`);
}
console.log('[useSliceUpload] Stream upload completed, total lines:', totalUploadedLines);
removeTask(newTask);
message.success(`成功上传 ${totalUploadedLines} 个文件(按行分割)`);
} catch (err) {
console.error('[useSliceUpload] Stream upload error:', err);
if (err.message === "Upload cancelled") {
message.info("上传已取消");
} else {
message.error("文件上传失败,请稍后重试");
}
removeTask({
...task,
isCancel: true,
...taskListRef.current.find((item) => item.key === task.key),
});
}
};
/**
* 注册流式上传事件监听
* 返回注销函数
*/
const registerStreamUploadListener = () => {
if (!enableStreamUpload) return () => {};
const streamUploadHandler = async (e: Event) => {
const customEvent = e as CustomEvent;
const { dataset, files, updateEvent, hasArchive, prefix } = customEvent.detail;
const controller = new AbortController();
const task: TaskItem = {
key: dataset.id,
title: `上传数据集: ${dataset.name} (按行分割)`,
percent: 0,
reqId: -1,
controller,
size: 0,
updateEvent,
hasArchive,
prefix,
};
taskListRef.current = [task, ...taskListRef.current];
setTaskList(taskListRef.current);
// 显示任务中心
if (showTaskCenter) {
window.dispatchEvent(
new CustomEvent("show:task-popover", { detail: { show: true } })
);
}
await handleStreamUpload({ task, files });
};
window.addEventListener("upload:dataset-stream", streamUploadHandler);
return () => {
window.removeEventListener("upload:dataset-stream", streamUploadHandler);
};
};
return {
taskList,
createTask,
removeTask,
handleUpload,
handleStreamUpload,
registerStreamUploadListener,
};
}

View File

@@ -58,8 +58,6 @@ export default function EditDataset({
const params = {
...formValues,
files: undefined,
// 将空字符串转换为 undefined,以便后端正确识别为清除关联数据集
parentDatasetId: formValues.parentDatasetId || undefined,
};
try {
await updateDatasetByIdUsingPut(data?.id, params);

View File

@@ -5,7 +5,7 @@ import { Dataset, DatasetType, DataSource } from "../../dataset.model";
import { useCallback, useEffect, useMemo, useState } from "react";
import { queryTasksUsingGet } from "@/pages/DataCollection/collection.apis";
import { updateDatasetByIdUsingPut } from "../../dataset.api";
import { sliceFile } from "@/utils/file.util";
import { sliceFile, shouldStreamUpload } from "@/utils/file.util";
import Dragger from "antd/es/upload/Dragger";
const TEXT_FILE_MIME_PREFIX = "text/";
@@ -90,14 +90,16 @@ async function splitFileByLines(file: UploadFile): Promise<UploadFile[]> {
const lines = text.split(/\r?\n/).filter((line: string) => line.trim() !== "");
if (lines.length === 0) return [];
// 生成文件名:原文件名_序号.扩展名
// 生成文件名:原文件名_序号(不保留后缀)
const nameParts = file.name.split(".");
const ext = nameParts.length > 1 ? "." + nameParts.pop() : "";
if (nameParts.length > 1) {
nameParts.pop();
}
const baseName = nameParts.join(".");
const padLength = String(lines.length).length;
return lines.map((line: string, index: number) => {
const newFileName = `${baseName}_${String(index + 1).padStart(padLength, "0")}${ext}`;
const newFileName = `${baseName}_${String(index + 1).padStart(padLength, "0")}`;
const blob = new Blob([line], { type: "text/plain" });
const newFile = new File([blob], newFileName, { type: "text/plain" });
return {
@@ -164,17 +166,75 @@ export default function ImportConfiguration({
// 本地上传文件相关逻辑
const handleUpload = async (dataset: Dataset) => {
let filesToUpload =
const filesToUpload =
(form.getFieldValue("files") as UploadFile[] | undefined) || [];
// 如果启用分行分割,处理文件
// 如果启用分行分割,对大文件使用流式处理
if (importConfig.splitByLine && !hasNonTextFile) {
const splitResults = await Promise.all(
filesToUpload.map((file) => splitFileByLines(file))
);
filesToUpload = splitResults.flat();
// 检查是否有大文件需要流式分割上传
const filesForStreamUpload: File[] = [];
const filesForNormalUpload: UploadFile[] = [];
for (const file of filesToUpload) {
const originFile = file.originFileObj ?? file;
if (originFile instanceof File && shouldStreamUpload(originFile)) {
filesForStreamUpload.push(originFile);
} else {
filesForNormalUpload.push(file);
}
}
// 大文件使用流式分割上传
if (filesForStreamUpload.length > 0) {
window.dispatchEvent(
new CustomEvent("upload:dataset-stream", {
detail: {
dataset,
files: filesForStreamUpload,
updateEvent,
hasArchive: importConfig.hasArchive,
prefix: currentPrefix,
},
})
);
}
// 小文件使用传统分割方式
if (filesForNormalUpload.length > 0) {
const splitResults = await Promise.all(
filesForNormalUpload.map((file) => splitFileByLines(file))
);
const smallFilesToUpload = splitResults.flat();
// 计算分片列表
const sliceList = smallFilesToUpload.map((file) => {
const originFile = (file.originFileObj ?? file) as Blob;
const slices = sliceFile(originFile);
return {
originFile: originFile,
slices,
name: file.name,
size: originFile.size || 0,
};
});
console.log("[ImportConfiguration] Uploading small files with currentPrefix:", currentPrefix);
window.dispatchEvent(
new CustomEvent("upload:dataset", {
detail: {
dataset,
files: sliceList,
updateEvent,
hasArchive: importConfig.hasArchive,
prefix: currentPrefix,
},
})
);
}
return;
}
// 未启用分行分割,使用普通上传
// 计算分片列表
const sliceList = filesToUpload.map((file) => {
const originFile = (file.originFileObj ?? file) as Blob;

View File

@@ -102,6 +102,13 @@ export interface DatasetTask {
executionHistory?: { time: string; status: string }[];
}
export interface StreamUploadInfo {
currentFile: string;
fileIndex: number;
totalFiles: number;
uploadedLines: number;
}
export interface TaskItem {
key: string;
title: string;
@@ -113,4 +120,6 @@ export interface TaskItem {
updateEvent?: string;
size?: number;
hasArchive?: boolean;
prefix?: string;
streamUploadInfo?: StreamUploadInfo;
}

View File

@@ -1,69 +1,93 @@
import {
cancelUploadUsingPut,
preUploadUsingPost,
uploadFileChunkUsingPost,
} from "@/pages/DataManagement/dataset.api";
import { Button, Empty, Progress } from "antd";
import { DeleteOutlined } from "@ant-design/icons";
import { useEffect } from "react";
import { useFileSliceUpload } from "@/hooks/useSliceUpload";
export default function TaskUpload() {
const { createTask, taskList, removeTask, handleUpload } = useFileSliceUpload(
{
preUpload: preUploadUsingPost,
uploadChunk: uploadFileChunkUsingPost,
cancelUpload: cancelUploadUsingPut,
}
);
useEffect(() => {
const uploadHandler = (e: any) => {
console.log('[TaskUpload] Received upload event detail:', e.detail);
const { files } = e.detail;
const task = createTask(e.detail);
console.log('[TaskUpload] Created task with prefix:', task.prefix);
handleUpload({ task, files });
};
window.addEventListener("upload:dataset", uploadHandler);
return () => {
window.removeEventListener("upload:dataset", uploadHandler);
};
}, []);
return (
<div
className="w-90 max-w-90 max-h-96 overflow-y-auto p-2"
id="header-task-popover"
>
{taskList.length > 0 &&
taskList.map((task) => (
<div key={task.key} className="border-b border-gray-200 pb-2">
<div className="flex items-center justify-between">
<div>{task.title}</div>
<Button
type="text"
danger
disabled={!task?.cancelFn}
onClick={() =>
removeTask({
...task,
isCancel: true,
})
}
icon={<DeleteOutlined />}
></Button>
</div>
<Progress size="small" percent={task.percent} />
</div>
))}
{taskList.length === 0 && (
<Empty
image={Empty.PRESENTED_IMAGE_SIMPLE}
description="暂无上传任务"
/>
)}
</div>
);
}
import {
cancelUploadUsingPut,
preUploadUsingPost,
uploadFileChunkUsingPost,
} from "@/pages/DataManagement/dataset.api";
import { Button, Empty, Progress, Tag } from "antd";
import { DeleteOutlined, FileTextOutlined } from "@ant-design/icons";
import { useEffect } from "react";
import { useFileSliceUpload } from "@/hooks/useSliceUpload";
export default function TaskUpload() {
const { createTask, taskList, removeTask, handleUpload, registerStreamUploadListener } = useFileSliceUpload(
{
preUpload: preUploadUsingPost,
uploadChunk: uploadFileChunkUsingPost,
cancelUpload: cancelUploadUsingPut,
},
true, // showTaskCenter
true // enableStreamUpload
);
useEffect(() => {
const uploadHandler = (e: Event) => {
const customEvent = e as CustomEvent;
console.log('[TaskUpload] Received upload event detail:', customEvent.detail);
const { files } = customEvent.detail;
const task = createTask(customEvent.detail);
console.log('[TaskUpload] Created task with prefix:', task.prefix);
handleUpload({ task, files });
};
window.addEventListener("upload:dataset", uploadHandler);
return () => {
window.removeEventListener("upload:dataset", uploadHandler);
};
}, [createTask, handleUpload]);
// 注册流式上传监听器
useEffect(() => {
const unregister = registerStreamUploadListener();
return unregister;
}, [registerStreamUploadListener]);
return (
<div
className="w-90 max-w-90 max-h-96 overflow-y-auto p-2"
id="header-task-popover"
>
{taskList.length > 0 &&
taskList.map((task) => (
<div key={task.key} className="border-b border-gray-200 pb-2">
<div className="flex items-center justify-between">
<div>{task.title}</div>
<Button
type="text"
danger
disabled={!task?.cancelFn}
onClick={() =>
removeTask({
...task,
isCancel: true,
})
}
icon={<DeleteOutlined />}
></Button>
</div>
<Progress size="small" percent={Number(task.percent)} />
{task.streamUploadInfo && (
<div className="flex items-center gap-2 text-xs text-gray-500 mt-1">
<Tag icon={<FileTextOutlined />} size="small">
</Tag>
<span>
: {task.streamUploadInfo.uploadedLines}
</span>
{task.streamUploadInfo.totalFiles > 1 && (
<span>
({task.streamUploadInfo.fileIndex}/{task.streamUploadInfo.totalFiles} )
</span>
)}
</div>
)}
</div>
))}
{taskList.length === 0 && (
<Empty
image={Empty.PRESENTED_IMAGE_SIMPLE}
description="暂无上传任务"
/>
)}
</div>
);
}

View File

@@ -1,79 +1,568 @@
import { UploadFile } from "antd";
import jsSHA from "jssha";
const CHUNK_SIZE = 1024 * 1024 * 60;
// 默认分片大小:5MB(适合大多数网络环境)
export const DEFAULT_CHUNK_SIZE = 1024 * 1024 * 5;
// 大文件阈值:10MB
export const LARGE_FILE_THRESHOLD = 1024 * 1024 * 10;
// 最大并发上传数
export const MAX_CONCURRENT_UPLOADS = 3;
// 文本文件读取块大小:20MB(用于计算 SHA256)
const BUFFER_CHUNK_SIZE = 1024 * 1024 * 20;
export function sliceFile(file, chunkSize = CHUNK_SIZE): Blob[] {
/**
* 将文件分割为多个分片
* @param file 文件对象
* @param chunkSize 分片大小(字节),默认 5MB
* @returns 分片数组(Blob 列表)
*/
export function sliceFile(file: Blob, chunkSize = DEFAULT_CHUNK_SIZE): Blob[] {
const totalSize = file.size;
const chunks: Blob[] = [];
// 小文件不需要分片
if (totalSize <= chunkSize) {
return [file];
}
let start = 0;
let end = start + chunkSize;
const chunks = [];
while (start < totalSize) {
const end = Math.min(start + chunkSize, totalSize);
const blob = file.slice(start, end);
chunks.push(blob);
start = end;
end = start + chunkSize;
}
return chunks;
}
export function calculateSHA256(file: Blob): Promise<string> {
let count = 0;
const hash = new jsSHA("SHA-256", "ARRAYBUFFER", { encoding: "UTF8" });
/**
* 计算文件的 SHA256 哈希值
* @param file 文件 Blob
* @param onProgress 进度回调(可选)
* @returns SHA256 哈希字符串
*/
export function calculateSHA256(
file: Blob,
onProgress?: (percent: number) => void
): Promise<string> {
return new Promise((resolve, reject) => {
const hash = new jsSHA("SHA-256", "ARRAYBUFFER", { encoding: "UTF8" });
const reader = new FileReader();
let processedSize = 0;
function readChunk(start: number, end: number) {
const slice = file.slice(start, end);
reader.readAsArrayBuffer(slice);
}
const bufferChunkSize = 1024 * 1024 * 20;
function processChunk(offset: number) {
const start = offset;
const end = Math.min(start + bufferChunkSize, file.size);
count = end;
const end = Math.min(start + BUFFER_CHUNK_SIZE, file.size);
readChunk(start, end);
}
reader.onloadend = function () {
const arraybuffer = reader.result;
reader.onloadend = function (e) {
const arraybuffer = reader.result as ArrayBuffer;
if (!arraybuffer) {
reject(new Error("Failed to read file"));
return;
}
hash.update(arraybuffer);
if (count < file.size) {
processChunk(count);
processedSize += (e.target as FileReader).result?.byteLength || 0;
if (onProgress) {
const percent = Math.min(100, Math.round((processedSize / file.size) * 100));
onProgress(percent);
}
if (processedSize < file.size) {
processChunk(processedSize);
} else {
resolve(hash.getHash("HEX", { outputLen: 256 }));
}
};
reader.onerror = () => reject(new Error("File reading failed"));
processChunk(0);
});
}
/**
* 批量计算多个文件的 SHA256
* @param files 文件列表
* @param onFileProgress 单个文件进度回调(可选)
* @returns 哈希值数组
*/
export async function calculateSHA256Batch(
files: Blob[],
onFileProgress?: (index: number, percent: number) => void
): Promise<string[]> {
const results: string[] = [];
for (let i = 0; i < files.length; i++) {
const hash = await calculateSHA256(files[i], (percent) => {
onFileProgress?.(i, percent);
});
results.push(hash);
}
return results;
}
/**
* 检查文件是否存在(未被修改或删除)
* @param fileList 文件列表
* @returns 返回第一个不存在的文件,或 null(如果都存在)
*/
export function checkIsFilesExist(
fileList: UploadFile[]
): Promise<UploadFile | null> {
fileList: Array<{ originFile?: Blob }>
): Promise<{ originFile?: Blob } | null> {
return new Promise((resolve) => {
const loadEndFn = (file: UploadFile, reachEnd: boolean, e) => {
const fileNotExist = !e.target.result;
if (!fileList.length) {
resolve(null);
return;
}
let checkedCount = 0;
const totalCount = fileList.length;
const loadEndFn = (file: { originFile?: Blob }, e: ProgressEvent<FileReader>) => {
checkedCount++;
const fileNotExist = !e.target?.result;
if (fileNotExist) {
resolve(file);
return;
}
if (reachEnd) {
if (checkedCount >= totalCount) {
resolve(null);
}
};
for (let i = 0; i < fileList.length; i++) {
const { originFile: file } = fileList[i];
for (const file of fileList) {
const fileReader = new FileReader();
fileReader.readAsArrayBuffer(file);
fileReader.onloadend = (e) =>
loadEndFn(fileList[i], i === fileList.length - 1, e);
const actualFile = file.originFile;
if (!actualFile) {
checkedCount++;
if (checkedCount >= totalCount) {
resolve(null);
}
continue;
}
fileReader.readAsArrayBuffer(actualFile.slice(0, 1));
fileReader.onloadend = (e) => loadEndFn(file, e);
fileReader.onerror = () => {
checkedCount++;
resolve(file);
};
}
});
}
/**
* 判断文件是否为大文件
* @param size 文件大小(字节)
* @param threshold 阈值(字节),默认 10MB
*/
export function isLargeFile(size: number, threshold = LARGE_FILE_THRESHOLD): boolean {
return size > threshold;
}
/**
* 格式化文件大小为人类可读格式
* @param bytes 字节数
* @param decimals 小数位数
*/
export function formatFileSize(bytes: number, decimals = 2): string {
if (bytes === 0) return "0 B";
const k = 1024;
const sizes = ["B", "KB", "MB", "GB", "TB", "PB"];
const i = Math.floor(Math.log(bytes) / Math.log(k));
return `${parseFloat((bytes / Math.pow(k, i)).toFixed(decimals))} ${sizes[i]}`;
}
/**
* 并发执行异步任务
* @param tasks 任务函数数组
* @param maxConcurrency 最大并发数
* @param onTaskComplete 单个任务完成回调(可选)
*/
export async function runConcurrentTasks<T>(
tasks: (() => Promise<T>)[],
maxConcurrency: number,
onTaskComplete?: (index: number, result: T) => void
): Promise<T[]> {
const results: T[] = new Array(tasks.length);
let index = 0;
async function runNext(): Promise<void> {
const currentIndex = index++;
if (currentIndex >= tasks.length) return;
const result = await tasks[currentIndex]();
results[currentIndex] = result;
onTaskComplete?.(currentIndex, result);
await runNext();
}
const workers = Array(Math.min(maxConcurrency, tasks.length))
.fill(null)
.map(() => runNext());
await Promise.all(workers);
return results;
}
/**
* 按行分割文本文件内容
* @param text 文本内容
* @param skipEmptyLines 是否跳过空行,默认 true
* @returns 行数组
*/
export function splitTextByLines(text: string, skipEmptyLines = true): string[] {
const lines = text.split(/\r?\n/);
if (skipEmptyLines) {
return lines.filter((line) => line.trim() !== "");
}
return lines;
}
/**
* 创建分片信息对象
* @param file 原始文件
* @param chunkSize 分片大小
*/
export function createFileSliceInfo(
file: File | Blob,
chunkSize = DEFAULT_CHUNK_SIZE
): {
originFile: Blob;
slices: Blob[];
name: string;
size: number;
totalChunks: number;
} {
const slices = sliceFile(file, chunkSize);
return {
originFile: file,
slices,
name: (file as File).name || "unnamed",
size: file.size,
totalChunks: slices.length,
};
}
/**
* 支持的文本文件 MIME 类型前缀
*/
export const TEXT_FILE_MIME_PREFIX = "text/";
/**
* 支持的文本文件 MIME 类型集合
*/
export const TEXT_FILE_MIME_TYPES = new Set([
"application/json",
"application/xml",
"application/csv",
"application/ndjson",
"application/x-ndjson",
"application/x-yaml",
"application/yaml",
"application/javascript",
"application/x-javascript",
"application/sql",
"application/rtf",
"application/xhtml+xml",
"application/svg+xml",
]);
/**
* 支持的文本文件扩展名集合
*/
export const TEXT_FILE_EXTENSIONS = new Set([
".txt",
".md",
".markdown",
".csv",
".tsv",
".json",
".jsonl",
".ndjson",
".log",
".xml",
".yaml",
".yml",
".sql",
".js",
".ts",
".jsx",
".tsx",
".html",
".htm",
".css",
".scss",
".less",
".py",
".java",
".c",
".cpp",
".h",
".hpp",
".go",
".rs",
".rb",
".php",
".sh",
".bash",
".zsh",
".ps1",
".bat",
".cmd",
".svg",
".rtf",
]);
/**
* 判断文件是否为文本文件(支持 UploadFile 类型)
* @param file UploadFile 对象
*/
export function isTextUploadFile(file: UploadFile): boolean {
const mimeType = (file.type || "").toLowerCase();
if (mimeType) {
if (mimeType.startsWith(TEXT_FILE_MIME_PREFIX)) return true;
if (TEXT_FILE_MIME_TYPES.has(mimeType)) return true;
}
const fileName = file.name || "";
const dotIndex = fileName.lastIndexOf(".");
if (dotIndex < 0) return false;
const ext = fileName.slice(dotIndex).toLowerCase();
return TEXT_FILE_EXTENSIONS.has(ext);
}
/**
* 判断文件名是否为文本文件
* @param fileName 文件名
*/
export function isTextFileByName(fileName: string): boolean {
const lowerName = fileName.toLowerCase();
// 先检查 MIME 类型(如果有)
// 这里简化处理,主要通过扩展名判断
const dotIndex = lowerName.lastIndexOf(".");
if (dotIndex < 0) return false;
const ext = lowerName.slice(dotIndex);
return TEXT_FILE_EXTENSIONS.has(ext);
}
/**
* 获取文件扩展名
* @param fileName 文件名
*/
export function getFileExtension(fileName: string): string {
const dotIndex = fileName.lastIndexOf(".");
if (dotIndex < 0) return "";
return fileName.slice(dotIndex).toLowerCase();
}
/**
* 安全地读取文件为文本
* @param file 文件对象
* @param encoding 编码,默认 UTF-8
*/
export function readFileAsText(
file: File | Blob,
encoding = "UTF-8"
): Promise<string> {
return new Promise((resolve, reject) => {
const reader = new FileReader();
reader.onload = (e) => resolve(e.target?.result as string);
reader.onerror = () => reject(new Error("Failed to read file"));
reader.readAsText(file, encoding);
});
}
/**
* 流式分割文件并逐行上传
* 使用 Blob.slice 逐块读取,避免一次性加载大文件到内存
* @param file 文件对象
* @param datasetId 数据集ID
* @param uploadFn 上传函数,接收 FormData 和配置,返回 Promise
* @param onProgress 进度回调 (currentBytes, totalBytes, uploadedLines)
* @param chunkSize 每次读取的块大小,默认 1MB
* @param options 其他选项
* @returns 上传结果统计
*/
export interface StreamUploadOptions {
reqId: number;
fileNamePrefix?: string;
hasArchive?: boolean;
prefix?: string;
signal?: AbortSignal;
maxConcurrency?: number;
}
export interface StreamUploadResult {
uploadedCount: number;
totalBytes: number;
skippedEmptyCount: number;
}
export async function streamSplitAndUpload(
file: File,
uploadFn: (formData: FormData, config?: { onUploadProgress?: (e: { loaded: number; total: number }) => void }) => Promise<unknown>,
onProgress?: (currentBytes: number, totalBytes: number, uploadedLines: number) => void,
chunkSize: number = 1024 * 1024, // 1MB
options: StreamUploadOptions
): Promise<StreamUploadResult> {
const { reqId, fileNamePrefix, prefix, signal, maxConcurrency = 3 } = options;
const fileSize = file.size;
let offset = 0;
let buffer = "";
let uploadedCount = 0;
let skippedEmptyCount = 0;
let currentBytes = 0;
// 获取文件名基础部分
const baseName = fileNamePrefix || file.name.replace(/\.[^/.]+$/, "");
// 用于并发控制的队列
const uploadQueue: Promise<void>[] = [];
let uploadIndex = 0;
/**
* 上传单行内容
*/
async function uploadLine(line: string, index: number): Promise<void> {
if (!line.trim()) {
skippedEmptyCount++;
return;
}
const newFileName = `${baseName}_${String(index + 1).padStart(6, "0")}`;
const blob = new Blob([line], { type: "text/plain" });
const lineFile = new File([blob], newFileName, { type: "text/plain" });
// 计算分片(小文件通常只需要一个分片)
const slices = sliceFile(lineFile, DEFAULT_CHUNK_SIZE);
const checkSum = await calculateSHA256(slices[0]);
const formData = new FormData();
formData.append("file", slices[0]);
formData.append("reqId", reqId.toString());
formData.append("fileNo", (index + 1).toString());
formData.append("chunkNo", "1");
formData.append("fileName", newFileName);
formData.append("fileSize", lineFile.size.toString());
formData.append("totalChunkNum", "1");
formData.append("checkSumHex", checkSum);
if (prefix !== undefined) {
formData.append("prefix", prefix);
}
await uploadFn(formData, {
onUploadProgress: () => {
// 单行文件很小,进度主要用于追踪上传状态
},
});
}
/**
* 处理并发上传
*/
async function processUploadQueue(): Promise<void> {
let currentIndex = 0;
async function runNext(): Promise<void> {
if (currentIndex >= uploadQueue.length) return;
const task = uploadQueue[currentIndex++];
await task;
await runNext();
}
const workers = Array(Math.min(maxConcurrency, uploadQueue.length))
.fill(null)
.map(() => runNext());
await Promise.all(workers);
}
// 逐块读取文件
while (offset < fileSize) {
// 检查是否已取消
if (signal?.aborted) {
throw new Error("Upload cancelled");
}
const end = Math.min(offset + chunkSize, fileSize);
const chunk = file.slice(offset, end);
const text = await readFileAsText(chunk);
// 将新读取的内容追加到 buffer
const combined = buffer + text;
// 按换行符分割(支持 \n 和 \r\n)
const lines = combined.split(/\r?\n/);
// 保留最后一行(可能不完整)
buffer = lines.pop() || "";
// 将完整行加入上传队列
for (const line of lines) {
if (signal?.aborted) {
throw new Error("Upload cancelled");
}
const currentLineIndex = uploadIndex++;
uploadQueue.push(
uploadLine(line, currentLineIndex).then(() => {
uploadedCount++;
onProgress?.(currentBytes, fileSize, uploadedCount);
})
);
}
currentBytes = end;
offset = end;
// 每处理完一个 chunk,更新进度
onProgress?.(currentBytes, fileSize, uploadedCount);
}
// 处理最后剩余的 buffer(如果文件不以换行符结尾)
if (buffer.trim()) {
const currentLineIndex = uploadIndex++;
uploadQueue.push(
uploadLine(buffer, currentLineIndex).then(() => {
uploadedCount++;
onProgress?.(fileSize, fileSize, uploadedCount);
})
);
}
// 并发执行所有上传任务
await processUploadQueue();
return {
uploadedCount,
totalBytes: fileSize,
skippedEmptyCount,
};
}
/**
* 判断文件是否需要流式分割上传
* @param file 文件对象
* @param threshold 阈值,默认 5MB
*/
export function shouldStreamUpload(file: File, threshold: number = 5 * 1024 * 1024): boolean {
return file.size > threshold;
}

View File

@@ -150,6 +150,18 @@ async def create_mapping(
labeling_project, snapshot_file_ids
)
# 如果启用了分段且为文本数据集,预生成切片结构
if dataset_type == TEXT_DATASET_TYPE and request.segmentation_enabled:
try:
from ..service.editor import AnnotationEditorService
editor_service = AnnotationEditorService(db)
# 异步预计算切片(不阻塞创建响应)
segmentation_result = await editor_service.precompute_segmentation_for_project(labeling_project.id)
logger.info(f"Precomputed segmentation for project {labeling_project.id}: {segmentation_result}")
except Exception as e:
logger.warning(f"Failed to precompute segmentation for project {labeling_project.id}: {e}")
# 不影响项目创建,只记录警告
response_data = DatasetMappingCreateResponse(
id=mapping.id,
labeling_project_id=str(mapping.labeling_project_id),

View File

@@ -1185,3 +1185,195 @@ class AnnotationEditorService:
except Exception as exc:
logger.warning("标注同步知识管理失败:%s", exc)
async def precompute_segmentation_for_project(
self,
project_id: str,
max_retries: int = 3
) -> Dict[str, Any]:
"""
为指定项目的所有文本文件预计算切片结构并持久化到数据库
Args:
project_id: 标注项目ID
max_retries: 失败重试次数
Returns:
统计信息:{total_files, succeeded, failed}
"""
project = await self._get_project_or_404(project_id)
dataset_type = self._normalize_dataset_type(await self._get_dataset_type(project.dataset_id))
# 只处理文本数据集
if dataset_type != DATASET_TYPE_TEXT:
logger.info(f"项目 {project_id} 不是文本数据集,跳过切片预生成")
return {"total_files": 0, "succeeded": 0, "failed": 0}
# 检查是否启用分段
if not self._resolve_segmentation_enabled(project):
logger.info(f"项目 {project_id} 未启用分段,跳过切片预生成")
return {"total_files": 0, "succeeded": 0, "failed": 0}
# 获取项目的所有文本文件(排除源文档)
files_result = await self.db.execute(
select(DatasetFiles)
.join(LabelingProjectFile, LabelingProjectFile.file_id == DatasetFiles.id)
.where(
LabelingProjectFile.project_id == project_id,
DatasetFiles.dataset_id == project.dataset_id,
)
)
file_records = files_result.scalars().all()
if not file_records:
logger.info(f"项目 {project_id} 没有文件,跳过切片预生成")
return {"total_files": 0, "succeeded": 0, "failed": 0}
# 过滤源文档文件
valid_files = []
for file_record in file_records:
file_type = str(getattr(file_record, "file_type", "") or "").lower()
file_name = str(getattr(file_record, "file_name", "")).lower()
is_source_document = (
file_type in SOURCE_DOCUMENT_TYPES or
any(file_name.endswith(ext) for ext in SOURCE_DOCUMENT_EXTENSIONS)
)
if not is_source_document:
valid_files.append(file_record)
total_files = len(valid_files)
succeeded = 0
failed = 0
label_config = await self._resolve_project_label_config(project)
primary_text_key = self._resolve_primary_text_key(label_config)
for file_record in valid_files:
file_id = str(file_record.id) # type: ignore
file_name = str(getattr(file_record, "file_name", ""))
for retry in range(max_retries):
try:
# 读取文本内容
text_content = await self._fetch_text_content_via_download_api(project.dataset_id, file_id)
if not isinstance(text_content, str):
logger.warning(f"文件 {file_id} 内容不是字符串,跳过切片")
failed += 1
break
# 解析文本记录
records: List[Tuple[Optional[Dict[str, Any]], str]] = []
if file_name.lower().endswith(JSONL_EXTENSION):
records = self._parse_jsonl_records(text_content)
else:
parsed_payload = self._try_parse_json_payload(text_content)
if parsed_payload:
records = [(parsed_payload, text_content)]
if not records:
records = [(None, text_content)]
record_texts = [
self._resolve_primary_text_value(payload, raw_text, primary_text_key)
for payload, raw_text in records
]
if not record_texts:
record_texts = [text_content]
# 判断是否需要分段
needs_segmentation = len(records) > 1 or any(
len(text or "") > self.SEGMENT_THRESHOLD for text in record_texts
)
if not needs_segmentation:
# 不需要分段的文件,跳过
succeeded += 1
break
# 执行切片
splitter = AnnotationTextSplitter(max_chars=self.SEGMENT_THRESHOLD)
segment_cursor = 0
segments = {}
for record_index, ((payload, raw_text), record_text) in enumerate(zip(records, record_texts)):
normalized_text = record_text or ""
if len(normalized_text) > self.SEGMENT_THRESHOLD:
raw_segments = splitter.split(normalized_text)
for chunk_index, seg in enumerate(raw_segments):
segments[str(segment_cursor)] = {
SEGMENT_RESULT_KEY: [],
SEGMENT_CREATED_AT_KEY: datetime.utcnow().isoformat() + "Z",
SEGMENT_UPDATED_AT_KEY: datetime.utcnow().isoformat() + "Z",
}
segment_cursor += 1
else:
segments[str(segment_cursor)] = {
SEGMENT_RESULT_KEY: [],
SEGMENT_CREATED_AT_KEY: datetime.utcnow().isoformat() + "Z",
SEGMENT_UPDATED_AT_KEY: datetime.utcnow().isoformat() + "Z",
}
segment_cursor += 1
if not segments:
succeeded += 1
break
# 构造分段标注结构
final_payload = {
SEGMENTED_KEY: True,
"version": 1,
SEGMENTS_KEY: segments,
SEGMENT_TOTAL_KEY: segment_cursor,
}
# 检查是否已存在标注
existing_result = await self.db.execute(
select(AnnotationResult).where(
AnnotationResult.project_id == project_id,
AnnotationResult.file_id == file_id,
)
)
existing = existing_result.scalar_one_or_none()
now = datetime.utcnow()
if existing:
# 更新现有标注
existing.annotation = final_payload # type: ignore[assignment]
existing.annotation_status = ANNOTATION_STATUS_IN_PROGRESS # type: ignore[assignment]
existing.updated_at = now # type: ignore[assignment]
else:
# 创建新标注记录
record = AnnotationResult(
id=str(uuid.uuid4()),
project_id=project_id,
file_id=file_id,
annotation=final_payload,
annotation_status=ANNOTATION_STATUS_IN_PROGRESS,
created_at=now,
updated_at=now,
)
self.db.add(record)
await self.db.commit()
succeeded += 1
logger.info(f"成功为文件 {file_id} 预生成 {segment_cursor} 个切片")
break
except Exception as e:
logger.warning(
f"为文件 {file_id} 预生成切片失败 (重试 {retry + 1}/{max_retries}): {e}"
)
if retry == max_retries - 1:
failed += 1
await self.db.rollback()
logger.info(
f"项目 {project_id} 切片预生成完成: 总计 {total_files}, 成功 {succeeded}, 失败 {failed}"
)
return {
"total_files": total_files,
"succeeded": succeeded,
"failed": failed,
}