You've already forked DataMate
Compare commits
22 Commits
05e6842fc8
...
lsf
| Author | SHA1 | Date | |
|---|---|---|---|
| 473f4e717f | |||
| 6b0042cb66 | |||
| fa9e9d9f68 | |||
| 707e65b017 | |||
| cda22a720c | |||
| 394e2bda18 | |||
| 4220284f5a | |||
| 8415166949 | |||
| 078f303f57 | |||
| 50f2da5503 | |||
| 3af1daf8b6 | |||
| 7c7729434b | |||
| 17a62cd3c2 | |||
| f381d641ab | |||
| c8611d29ff | |||
| 147beb1ec7 | |||
| 699031dae7 | |||
| 88b1383653 | |||
| cc6415c4d9 | |||
| 3d036c4cd6 | |||
| 2445235fd2 | |||
| 893e0a1580 |
@@ -447,12 +447,12 @@ paths:
|
||||
schema:
|
||||
type: string
|
||||
|
||||
/data-management/datasets/{datasetId}/files/upload/chunk:
|
||||
post:
|
||||
tags: [ DatasetFile ]
|
||||
operationId: chunkUpload
|
||||
summary: 切片上传
|
||||
description: 使用预上传返回的请求ID进行分片上传
|
||||
/data-management/datasets/{datasetId}/files/upload/chunk:
|
||||
post:
|
||||
tags: [ DatasetFile ]
|
||||
operationId: chunkUpload
|
||||
summary: 切片上传
|
||||
description: 使用预上传返回的请求ID进行分片上传
|
||||
parameters:
|
||||
- name: datasetId
|
||||
in: path
|
||||
@@ -466,15 +466,32 @@ paths:
|
||||
multipart/form-data:
|
||||
schema:
|
||||
$ref: '#/components/schemas/UploadFileRequest'
|
||||
responses:
|
||||
'200':
|
||||
description: 上传成功
|
||||
|
||||
/data-management/dataset-types:
|
||||
get:
|
||||
operationId: getDatasetTypes
|
||||
tags: [DatasetType]
|
||||
summary: 获取数据集类型列表
|
||||
responses:
|
||||
'200':
|
||||
description: 上传成功
|
||||
|
||||
/data-management/datasets/upload/cancel-upload/{reqId}:
|
||||
put:
|
||||
tags: [ DatasetFile ]
|
||||
operationId: cancelUpload
|
||||
summary: 取消上传
|
||||
description: 取消预上传请求并清理临时分片
|
||||
parameters:
|
||||
- name: reqId
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
description: 预上传请求ID
|
||||
responses:
|
||||
'200':
|
||||
description: 取消成功
|
||||
|
||||
/data-management/dataset-types:
|
||||
get:
|
||||
operationId: getDatasetTypes
|
||||
tags: [DatasetType]
|
||||
summary: 获取数据集类型列表
|
||||
description: 获取所有支持的数据集类型
|
||||
responses:
|
||||
'200':
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.datamate.datamanagement.application;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.datamate.common.domain.utils.ChunksSaver;
|
||||
@@ -101,6 +102,7 @@ public class DatasetApplicationService {
|
||||
public Dataset updateDataset(String datasetId, UpdateDatasetRequest updateDatasetRequest) {
|
||||
Dataset dataset = datasetRepository.getById(datasetId);
|
||||
BusinessAssert.notNull(dataset, DataManagementErrorCode.DATASET_NOT_FOUND);
|
||||
|
||||
if (StringUtils.hasText(updateDatasetRequest.getName())) {
|
||||
dataset.setName(updateDatasetRequest.getName());
|
||||
}
|
||||
@@ -113,13 +115,31 @@ public class DatasetApplicationService {
|
||||
if (Objects.nonNull(updateDatasetRequest.getStatus())) {
|
||||
dataset.setStatus(updateDatasetRequest.getStatus());
|
||||
}
|
||||
if (updateDatasetRequest.getParentDatasetId() != null) {
|
||||
if (updateDatasetRequest.isParentDatasetIdProvided()) {
|
||||
// 保存原始的 parentDatasetId 值,用于比较是否发生了变化
|
||||
String originalParentDatasetId = dataset.getParentDatasetId();
|
||||
|
||||
// 处理父数据集变更:仅当请求显式包含 parentDatasetId 时处理
|
||||
// handleParentChange 内部通过 normalizeParentId 方法将空字符串和 null 都转换为 null
|
||||
// 这样既支持设置新的父数据集,也支持清除关联
|
||||
handleParentChange(dataset, updateDatasetRequest.getParentDatasetId());
|
||||
|
||||
// 检查 parentDatasetId 是否发生了变化
|
||||
if (!Objects.equals(originalParentDatasetId, dataset.getParentDatasetId())) {
|
||||
// 使用 LambdaUpdateWrapper 显式地更新 parentDatasetId 字段
|
||||
// 这样即使值为 null 也能被正确更新到数据库
|
||||
datasetRepository.update(null, new LambdaUpdateWrapper<Dataset>()
|
||||
.eq(Dataset::getId, datasetId)
|
||||
.set(Dataset::getParentDatasetId, dataset.getParentDatasetId()));
|
||||
}
|
||||
}
|
||||
|
||||
if (StringUtils.hasText(updateDatasetRequest.getDataSource())) {
|
||||
// 数据源id不为空,使用异步线程进行文件扫盘落库
|
||||
processDataSourceAsync(dataset.getId(), updateDatasetRequest.getDataSource());
|
||||
}
|
||||
|
||||
// 更新其他字段(不包括 parentDatasetId,因为它已经在上面的代码中更新了)
|
||||
datasetRepository.updateById(dataset);
|
||||
return dataset;
|
||||
}
|
||||
|
||||
@@ -499,11 +499,19 @@ public class DatasetFileApplicationService {
|
||||
*
|
||||
* @param uploadFileRequest 上传请求
|
||||
*/
|
||||
@Transactional
|
||||
public void chunkUpload(String datasetId, UploadFileRequest uploadFileRequest) {
|
||||
FileUploadResult uploadResult = fileService.chunkUpload(DatasetConverter.INSTANCE.toChunkUploadRequest(uploadFileRequest));
|
||||
saveFileInfoToDb(uploadResult, datasetId);
|
||||
}
|
||||
@Transactional
|
||||
public void chunkUpload(String datasetId, UploadFileRequest uploadFileRequest) {
|
||||
FileUploadResult uploadResult = fileService.chunkUpload(DatasetConverter.INSTANCE.toChunkUploadRequest(uploadFileRequest));
|
||||
saveFileInfoToDb(uploadResult, datasetId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消上传
|
||||
*/
|
||||
@Transactional
|
||||
public void cancelUpload(String reqId) {
|
||||
fileService.cancelUpload(reqId);
|
||||
}
|
||||
|
||||
private void saveFileInfoToDb(FileUploadResult fileUploadResult, String datasetId) {
|
||||
if (Objects.isNull(fileUploadResult.getSavedFile())) {
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package com.datamate.datamanagement.interfaces.dto;
|
||||
|
||||
import com.datamate.datamanagement.common.enums.DatasetStatusType;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.Size;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
@@ -24,9 +26,18 @@ public class UpdateDatasetRequest {
|
||||
/** 归集任务id */
|
||||
private String dataSource;
|
||||
/** 父数据集ID */
|
||||
@Setter(AccessLevel.NONE)
|
||||
private String parentDatasetId;
|
||||
@JsonIgnore
|
||||
@Setter(AccessLevel.NONE)
|
||||
private boolean parentDatasetIdProvided;
|
||||
/** 标签列表 */
|
||||
private List<String> tags;
|
||||
/** 数据集状态 */
|
||||
private DatasetStatusType status;
|
||||
|
||||
public void setParentDatasetId(String parentDatasetId) {
|
||||
this.parentDatasetIdProvided = true;
|
||||
this.parentDatasetId = parentDatasetId;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
package com.datamate.datamanagement.interfaces.rest;
|
||||
|
||||
import com.datamate.datamanagement.application.DatasetFileApplicationService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PutMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* 数据集上传控制器
|
||||
*/
|
||||
@Slf4j
|
||||
@RestController
|
||||
@RequiredArgsConstructor
|
||||
@RequestMapping("/data-management/datasets/upload")
|
||||
public class DatasetUploadController {
|
||||
|
||||
private final DatasetFileApplicationService datasetFileApplicationService;
|
||||
|
||||
/**
|
||||
* 取消上传
|
||||
*
|
||||
* @param reqId 预上传请求ID
|
||||
*/
|
||||
@PutMapping("/cancel-upload/{reqId}")
|
||||
public ResponseEntity<Void> cancelUpload(@PathVariable("reqId") String reqId) {
|
||||
datasetFileApplicationService.cancelUpload(reqId);
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
}
|
||||
@@ -74,6 +74,26 @@ public class FileService {
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消上传
|
||||
*/
|
||||
@Transactional
|
||||
public void cancelUpload(String reqId) {
|
||||
if (reqId == null || reqId.isBlank()) {
|
||||
throw BusinessException.of(CommonErrorCode.PARAM_ERROR);
|
||||
}
|
||||
ChunkUploadPreRequest preRequest = chunkUploadRequestMapper.findById(reqId);
|
||||
if (preRequest == null) {
|
||||
return;
|
||||
}
|
||||
String uploadPath = preRequest.getUploadPath();
|
||||
if (uploadPath != null && !uploadPath.isBlank()) {
|
||||
File tempDir = new File(uploadPath, String.format(ChunksSaver.TEMP_DIR_NAME_FORMAT, preRequest.getId()));
|
||||
ChunksSaver.deleteFolder(tempDir.getPath());
|
||||
}
|
||||
chunkUploadRequestMapper.deleteById(reqId);
|
||||
}
|
||||
|
||||
private File uploadFile(ChunkUploadRequest fileUploadRequest, ChunkUploadPreRequest preRequest) {
|
||||
File savedFile = ChunksSaver.saveFile(fileUploadRequest, preRequest);
|
||||
preRequest.setTimeout(LocalDateTime.now().plusSeconds(DEFAULT_TIMEOUT));
|
||||
|
||||
@@ -1,198 +1,383 @@
|
||||
import { TaskItem } from "@/pages/DataManagement/dataset.model";
|
||||
import { calculateSHA256, checkIsFilesExist } from "@/utils/file.util";
|
||||
import { App } from "antd";
|
||||
import { useRef, useState } from "react";
|
||||
|
||||
export function useFileSliceUpload(
|
||||
{
|
||||
preUpload,
|
||||
uploadChunk,
|
||||
cancelUpload,
|
||||
}: {
|
||||
preUpload: (id: string, params: any) => Promise<{ data: number }>;
|
||||
uploadChunk: (id: string, formData: FormData, config: any) => Promise<any>;
|
||||
cancelUpload: ((reqId: number) => Promise<any>) | null;
|
||||
},
|
||||
showTaskCenter = true // 上传时是否显示任务中心
|
||||
) {
|
||||
const { message } = App.useApp();
|
||||
const [taskList, setTaskList] = useState<TaskItem[]>([]);
|
||||
const taskListRef = useRef<TaskItem[]>([]); // 用于固定任务顺序
|
||||
|
||||
const createTask = (detail: any = {}) => {
|
||||
const { dataset } = detail;
|
||||
const title = `上传数据集: ${dataset.name} `;
|
||||
const controller = new AbortController();
|
||||
const task: TaskItem = {
|
||||
key: dataset.id,
|
||||
title,
|
||||
percent: 0,
|
||||
reqId: -1,
|
||||
controller,
|
||||
size: 0,
|
||||
updateEvent: detail.updateEvent,
|
||||
hasArchive: detail.hasArchive,
|
||||
prefix: detail.prefix,
|
||||
};
|
||||
taskListRef.current = [task, ...taskListRef.current];
|
||||
|
||||
setTaskList(taskListRef.current);
|
||||
return task;
|
||||
};
|
||||
|
||||
const updateTaskList = (task: TaskItem) => {
|
||||
taskListRef.current = taskListRef.current.map((item) =>
|
||||
item.key === task.key ? task : item
|
||||
);
|
||||
setTaskList(taskListRef.current);
|
||||
};
|
||||
|
||||
const removeTask = (task: TaskItem) => {
|
||||
const { key } = task;
|
||||
taskListRef.current = taskListRef.current.filter(
|
||||
(item) => item.key !== key
|
||||
);
|
||||
setTaskList(taskListRef.current);
|
||||
if (task.isCancel && task.cancelFn) {
|
||||
task.cancelFn();
|
||||
}
|
||||
if (task.updateEvent) {
|
||||
// 携带前缀信息,便于刷新后仍停留在当前目录
|
||||
window.dispatchEvent(
|
||||
new CustomEvent(task.updateEvent, {
|
||||
detail: { prefix: (task as any).prefix },
|
||||
})
|
||||
);
|
||||
}
|
||||
if (showTaskCenter) {
|
||||
window.dispatchEvent(
|
||||
new CustomEvent("show:task-popover", { detail: { show: false } })
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
async function buildFormData({ file, reqId, i, j }) {
|
||||
const formData = new FormData();
|
||||
const { slices, name, size } = file;
|
||||
const checkSum = await calculateSHA256(slices[j]);
|
||||
formData.append("file", slices[j]);
|
||||
formData.append("reqId", reqId.toString());
|
||||
formData.append("fileNo", (i + 1).toString());
|
||||
formData.append("chunkNo", (j + 1).toString());
|
||||
formData.append("fileName", name);
|
||||
formData.append("fileSize", size.toString());
|
||||
formData.append("totalChunkNum", slices.length.toString());
|
||||
formData.append("checkSumHex", checkSum);
|
||||
return formData;
|
||||
}
|
||||
|
||||
async function uploadSlice(task: TaskItem, fileInfo) {
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
const { reqId, key } = task;
|
||||
const { loaded, i, j, files, totalSize } = fileInfo;
|
||||
const formData = await buildFormData({
|
||||
file: files[i],
|
||||
i,
|
||||
j,
|
||||
reqId,
|
||||
});
|
||||
|
||||
let newTask = { ...task };
|
||||
await uploadChunk(key, formData, {
|
||||
onUploadProgress: (e) => {
|
||||
const loadedSize = loaded + e.loaded;
|
||||
const curPercent = Number((loadedSize / totalSize) * 100).toFixed(2);
|
||||
|
||||
newTask = {
|
||||
...newTask,
|
||||
...taskListRef.current.find((item) => item.key === key),
|
||||
size: loadedSize,
|
||||
percent: curPercent >= 100 ? 99.99 : curPercent,
|
||||
};
|
||||
updateTaskList(newTask);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async function uploadFile({ task, files, totalSize }) {
|
||||
console.log('[useSliceUpload] Calling preUpload with prefix:', task.prefix);
|
||||
const { data: reqId } = await preUpload(task.key, {
|
||||
totalFileNum: files.length,
|
||||
totalSize,
|
||||
datasetId: task.key,
|
||||
hasArchive: task.hasArchive,
|
||||
prefix: task.prefix,
|
||||
});
|
||||
console.log('[useSliceUpload] PreUpload response reqId:', reqId);
|
||||
|
||||
const newTask: TaskItem = {
|
||||
...task,
|
||||
reqId,
|
||||
isCancel: false,
|
||||
cancelFn: () => {
|
||||
task.controller.abort();
|
||||
cancelUpload?.(reqId);
|
||||
if (task.updateEvent) window.dispatchEvent(new Event(task.updateEvent));
|
||||
},
|
||||
};
|
||||
updateTaskList(newTask);
|
||||
if (showTaskCenter) {
|
||||
window.dispatchEvent(
|
||||
new CustomEvent("show:task-popover", { detail: { show: true } })
|
||||
);
|
||||
}
|
||||
// // 更新数据状态
|
||||
if (task.updateEvent) window.dispatchEvent(new Event(task.updateEvent));
|
||||
|
||||
let loaded = 0;
|
||||
for (let i = 0; i < files.length; i++) {
|
||||
const { slices } = files[i];
|
||||
for (let j = 0; j < slices.length; j++) {
|
||||
await uploadSlice(newTask, {
|
||||
loaded,
|
||||
i,
|
||||
j,
|
||||
files,
|
||||
totalSize,
|
||||
});
|
||||
loaded += slices[j].size;
|
||||
}
|
||||
}
|
||||
removeTask(newTask);
|
||||
}
|
||||
|
||||
const handleUpload = async ({ task, files }) => {
|
||||
const isErrorFile = await checkIsFilesExist(files);
|
||||
if (isErrorFile) {
|
||||
message.error("文件被修改或删除,请重新选择文件上传");
|
||||
removeTask({
|
||||
...task,
|
||||
isCancel: false,
|
||||
...taskListRef.current.find((item) => item.key === task.key),
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const totalSize = files.reduce((acc, file) => acc + file.size, 0);
|
||||
await uploadFile({ task, files, totalSize });
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
message.error("文件上传失败,请稍后重试");
|
||||
removeTask({
|
||||
...task,
|
||||
isCancel: true,
|
||||
...taskListRef.current.find((item) => item.key === task.key),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
taskList,
|
||||
createTask,
|
||||
removeTask,
|
||||
handleUpload,
|
||||
};
|
||||
}
|
||||
import { TaskItem } from "@/pages/DataManagement/dataset.model";
|
||||
import { calculateSHA256, checkIsFilesExist, streamSplitAndUpload, StreamUploadResult } from "@/utils/file.util";
|
||||
import { App } from "antd";
|
||||
import { useRef, useState } from "react";
|
||||
|
||||
export function useFileSliceUpload(
|
||||
{
|
||||
preUpload,
|
||||
uploadChunk,
|
||||
cancelUpload,
|
||||
}: {
|
||||
preUpload: (id: string, params: Record<string, unknown>) => Promise<{ data: number }>;
|
||||
uploadChunk: (id: string, formData: FormData, config: Record<string, unknown>) => Promise<unknown>;
|
||||
cancelUpload: ((reqId: number) => Promise<unknown>) | null;
|
||||
},
|
||||
showTaskCenter = true, // 上传时是否显示任务中心
|
||||
enableStreamUpload = true // 是否启用流式分割上传
|
||||
) {
|
||||
const { message } = App.useApp();
|
||||
const [taskList, setTaskList] = useState<TaskItem[]>([]);
|
||||
const taskListRef = useRef<TaskItem[]>([]); // 用于固定任务顺序
|
||||
|
||||
const createTask = (detail: Record<string, unknown> = {}) => {
|
||||
const { dataset } = detail;
|
||||
const title = `上传数据集: ${dataset.name} `;
|
||||
const controller = new AbortController();
|
||||
const task: TaskItem = {
|
||||
key: dataset.id,
|
||||
title,
|
||||
percent: 0,
|
||||
reqId: -1,
|
||||
controller,
|
||||
size: 0,
|
||||
updateEvent: detail.updateEvent,
|
||||
hasArchive: detail.hasArchive,
|
||||
prefix: detail.prefix,
|
||||
};
|
||||
taskListRef.current = [task, ...taskListRef.current];
|
||||
|
||||
setTaskList(taskListRef.current);
|
||||
|
||||
// 立即显示任务中心,让用户感知上传已开始
|
||||
if (showTaskCenter) {
|
||||
window.dispatchEvent(
|
||||
new CustomEvent("show:task-popover", { detail: { show: true } })
|
||||
);
|
||||
}
|
||||
|
||||
return task;
|
||||
};
|
||||
|
||||
const updateTaskList = (task: TaskItem) => {
|
||||
taskListRef.current = taskListRef.current.map((item) =>
|
||||
item.key === task.key ? task : item
|
||||
);
|
||||
setTaskList(taskListRef.current);
|
||||
};
|
||||
|
||||
const removeTask = (task: TaskItem) => {
|
||||
const { key } = task;
|
||||
taskListRef.current = taskListRef.current.filter(
|
||||
(item) => item.key !== key
|
||||
);
|
||||
setTaskList(taskListRef.current);
|
||||
if (task.isCancel && task.cancelFn) {
|
||||
task.cancelFn();
|
||||
}
|
||||
if (task.updateEvent) {
|
||||
// 携带前缀信息,便于刷新后仍停留在当前目录
|
||||
window.dispatchEvent(
|
||||
new CustomEvent(task.updateEvent, {
|
||||
detail: { prefix: task.prefix },
|
||||
})
|
||||
);
|
||||
}
|
||||
if (showTaskCenter) {
|
||||
window.dispatchEvent(
|
||||
new CustomEvent("show:task-popover", { detail: { show: false } })
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
async function buildFormData({ file, reqId, i, j }: { file: { slices: Blob[]; name: string; size: number }; reqId: number; i: number; j: number }) {
|
||||
const formData = new FormData();
|
||||
const { slices, name, size } = file;
|
||||
const checkSum = await calculateSHA256(slices[j]);
|
||||
formData.append("file", slices[j]);
|
||||
formData.append("reqId", reqId.toString());
|
||||
formData.append("fileNo", (i + 1).toString());
|
||||
formData.append("chunkNo", (j + 1).toString());
|
||||
formData.append("fileName", name);
|
||||
formData.append("fileSize", size.toString());
|
||||
formData.append("totalChunkNum", slices.length.toString());
|
||||
formData.append("checkSumHex", checkSum);
|
||||
return formData;
|
||||
}
|
||||
|
||||
async function uploadSlice(task: TaskItem, fileInfo: { loaded: number; i: number; j: number; files: { slices: Blob[]; name: string; size: number }[]; totalSize: number }) {
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
const { reqId, key, controller } = task;
|
||||
const { loaded, i, j, files, totalSize } = fileInfo;
|
||||
|
||||
// 检查是否已取消
|
||||
if (controller.signal.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
|
||||
const formData = await buildFormData({
|
||||
file: files[i],
|
||||
i,
|
||||
j,
|
||||
reqId,
|
||||
});
|
||||
|
||||
let newTask = { ...task };
|
||||
await uploadChunk(key, formData, {
|
||||
signal: controller.signal,
|
||||
onUploadProgress: (e) => {
|
||||
const loadedSize = loaded + e.loaded;
|
||||
const curPercent = Number((loadedSize / totalSize) * 100).toFixed(2);
|
||||
|
||||
newTask = {
|
||||
...newTask,
|
||||
...taskListRef.current.find((item) => item.key === key),
|
||||
size: loadedSize,
|
||||
percent: curPercent >= 100 ? 99.99 : curPercent,
|
||||
};
|
||||
updateTaskList(newTask);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async function uploadFile({ task, files, totalSize }: { task: TaskItem; files: { slices: Blob[]; name: string; size: number; originFile: Blob }[]; totalSize: number }) {
|
||||
console.log('[useSliceUpload] Calling preUpload with prefix:', task.prefix);
|
||||
const { data: reqId } = await preUpload(task.key, {
|
||||
totalFileNum: files.length,
|
||||
totalSize,
|
||||
datasetId: task.key,
|
||||
hasArchive: task.hasArchive,
|
||||
prefix: task.prefix,
|
||||
});
|
||||
console.log('[useSliceUpload] PreUpload response reqId:', reqId);
|
||||
|
||||
const newTask: TaskItem = {
|
||||
...task,
|
||||
reqId,
|
||||
isCancel: false,
|
||||
cancelFn: () => {
|
||||
// 使用 newTask 的 controller 确保一致性
|
||||
newTask.controller.abort();
|
||||
cancelUpload?.(reqId);
|
||||
if (newTask.updateEvent) window.dispatchEvent(new Event(newTask.updateEvent));
|
||||
},
|
||||
};
|
||||
updateTaskList(newTask);
|
||||
// 注意:show:task-popover 事件已在 createTask 中触发,此处不再重复触发
|
||||
// // 更新数据状态
|
||||
if (task.updateEvent) window.dispatchEvent(new Event(task.updateEvent));
|
||||
|
||||
let loaded = 0;
|
||||
for (let i = 0; i < files.length; i++) {
|
||||
// 检查是否已取消
|
||||
if (newTask.controller.signal.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
const { slices } = files[i];
|
||||
for (let j = 0; j < slices.length; j++) {
|
||||
// 检查是否已取消
|
||||
if (newTask.controller.signal.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
await uploadSlice(newTask, {
|
||||
loaded,
|
||||
i,
|
||||
j,
|
||||
files,
|
||||
totalSize,
|
||||
});
|
||||
loaded += slices[j].size;
|
||||
}
|
||||
}
|
||||
removeTask(newTask);
|
||||
}
|
||||
|
||||
const handleUpload = async ({ task, files }: { task: TaskItem; files: { slices: Blob[]; name: string; size: number; originFile: Blob }[] }) => {
|
||||
const isErrorFile = await checkIsFilesExist(files);
|
||||
if (isErrorFile) {
|
||||
message.error("文件被修改或删除,请重新选择文件上传");
|
||||
removeTask({
|
||||
...task,
|
||||
isCancel: false,
|
||||
...taskListRef.current.find((item) => item.key === task.key),
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const totalSize = files.reduce((acc, file) => acc + file.size, 0);
|
||||
await uploadFile({ task, files, totalSize });
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
message.error("文件上传失败,请稍后重试");
|
||||
removeTask({
|
||||
...task,
|
||||
isCancel: true,
|
||||
...taskListRef.current.find((item) => item.key === task.key),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* 流式分割上传处理
|
||||
* 用于大文件按行分割并立即上传的场景
|
||||
*/
|
||||
const handleStreamUpload = async ({ task, files }: { task: TaskItem; files: File[] }) => {
|
||||
try {
|
||||
console.log('[useSliceUpload] Starting stream upload for', files.length, 'files');
|
||||
|
||||
const totalSize = files.reduce((acc, file) => acc + file.size, 0);
|
||||
|
||||
// 存储所有文件的 reqId,用于取消上传
|
||||
const reqIds: number[] = [];
|
||||
|
||||
const newTask: TaskItem = {
|
||||
...task,
|
||||
reqId: -1,
|
||||
isCancel: false,
|
||||
cancelFn: () => {
|
||||
// 使用 newTask 的 controller 确保一致性
|
||||
newTask.controller.abort();
|
||||
// 取消所有文件的预上传请求
|
||||
reqIds.forEach(id => cancelUpload?.(id));
|
||||
if (newTask.updateEvent) window.dispatchEvent(new Event(newTask.updateEvent));
|
||||
},
|
||||
};
|
||||
updateTaskList(newTask);
|
||||
|
||||
let totalUploadedLines = 0;
|
||||
let totalProcessedBytes = 0;
|
||||
const results: StreamUploadResult[] = [];
|
||||
|
||||
// 逐个处理文件,每个文件单独调用 preUpload
|
||||
for (let i = 0; i < files.length; i++) {
|
||||
// 检查是否已取消
|
||||
if (newTask.controller.signal.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
|
||||
const file = files[i];
|
||||
console.log(`[useSliceUpload] Processing file ${i + 1}/${files.length}: ${file.name}`);
|
||||
|
||||
const result = await streamSplitAndUpload(
|
||||
file,
|
||||
(formData, config) => uploadChunk(task.key, formData, {
|
||||
...config,
|
||||
signal: newTask.controller.signal,
|
||||
}),
|
||||
(currentBytes, totalBytes, uploadedLines) => {
|
||||
// 检查是否已取消
|
||||
if (newTask.controller.signal.aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 更新进度
|
||||
const overallBytes = totalProcessedBytes + currentBytes;
|
||||
const curPercent = Number((overallBytes / totalSize) * 100).toFixed(2);
|
||||
|
||||
const updatedTask: TaskItem = {
|
||||
...newTask,
|
||||
...taskListRef.current.find((item) => item.key === task.key),
|
||||
size: overallBytes,
|
||||
percent: curPercent >= 100 ? 99.99 : curPercent,
|
||||
streamUploadInfo: {
|
||||
currentFile: file.name,
|
||||
fileIndex: i + 1,
|
||||
totalFiles: files.length,
|
||||
uploadedLines: totalUploadedLines + uploadedLines,
|
||||
},
|
||||
};
|
||||
updateTaskList(updatedTask);
|
||||
},
|
||||
1024 * 1024, // 1MB chunk size
|
||||
{
|
||||
resolveReqId: async ({ totalFileNum, totalSize }) => {
|
||||
const { data: reqId } = await preUpload(task.key, {
|
||||
totalFileNum,
|
||||
totalSize,
|
||||
datasetId: task.key,
|
||||
hasArchive: task.hasArchive,
|
||||
prefix: task.prefix,
|
||||
});
|
||||
console.log(`[useSliceUpload] File ${file.name} preUpload response reqId:`, reqId);
|
||||
reqIds.push(reqId);
|
||||
return reqId;
|
||||
},
|
||||
hasArchive: newTask.hasArchive,
|
||||
prefix: newTask.prefix,
|
||||
signal: newTask.controller.signal,
|
||||
maxConcurrency: 3,
|
||||
}
|
||||
);
|
||||
|
||||
results.push(result);
|
||||
totalUploadedLines += result.uploadedCount;
|
||||
totalProcessedBytes += file.size;
|
||||
|
||||
console.log(`[useSliceUpload] File ${file.name} processed, uploaded ${result.uploadedCount} lines`);
|
||||
}
|
||||
|
||||
console.log('[useSliceUpload] Stream upload completed, total lines:', totalUploadedLines);
|
||||
removeTask(newTask);
|
||||
|
||||
message.success(`成功上传 ${totalUploadedLines} 个文件(按行分割)`);
|
||||
} catch (err) {
|
||||
console.error('[useSliceUpload] Stream upload error:', err);
|
||||
if (err.message === "Upload cancelled") {
|
||||
message.info("上传已取消");
|
||||
} else {
|
||||
message.error("文件上传失败,请稍后重试");
|
||||
}
|
||||
removeTask({
|
||||
...task,
|
||||
isCancel: true,
|
||||
...taskListRef.current.find((item) => item.key === task.key),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* 注册流式上传事件监听
|
||||
* 返回注销函数
|
||||
*/
|
||||
const registerStreamUploadListener = () => {
|
||||
if (!enableStreamUpload) return () => {};
|
||||
|
||||
const streamUploadHandler = async (e: Event) => {
|
||||
const customEvent = e as CustomEvent;
|
||||
const { dataset, files, updateEvent, hasArchive, prefix } = customEvent.detail;
|
||||
|
||||
const controller = new AbortController();
|
||||
const task: TaskItem = {
|
||||
key: dataset.id,
|
||||
title: `上传数据集: ${dataset.name} (按行分割)`,
|
||||
percent: 0,
|
||||
reqId: -1,
|
||||
controller,
|
||||
size: 0,
|
||||
updateEvent,
|
||||
hasArchive,
|
||||
prefix,
|
||||
};
|
||||
|
||||
taskListRef.current = [task, ...taskListRef.current];
|
||||
setTaskList(taskListRef.current);
|
||||
|
||||
// 显示任务中心
|
||||
if (showTaskCenter) {
|
||||
window.dispatchEvent(
|
||||
new CustomEvent("show:task-popover", { detail: { show: true } })
|
||||
);
|
||||
}
|
||||
|
||||
await handleStreamUpload({ task, files });
|
||||
};
|
||||
|
||||
window.addEventListener("upload:dataset-stream", streamUploadHandler);
|
||||
|
||||
return () => {
|
||||
window.removeEventListener("upload:dataset-stream", streamUploadHandler);
|
||||
};
|
||||
};
|
||||
|
||||
return {
|
||||
taskList,
|
||||
createTask,
|
||||
removeTask,
|
||||
handleUpload,
|
||||
handleStreamUpload,
|
||||
registerStreamUploadListener,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
|
||||
import { App, Button, Card, List, Spin, Typography, Tag, Switch, Tree, Empty } from "antd";
|
||||
import { LeftOutlined, ReloadOutlined, SaveOutlined, MenuFoldOutlined, MenuUnfoldOutlined, CheckOutlined } from "@ant-design/icons";
|
||||
import { App, Button, Card, List, Spin, Typography, Tag, Empty } from "antd";
|
||||
import { LeftOutlined, ReloadOutlined, SaveOutlined, MenuFoldOutlined, MenuUnfoldOutlined } from "@ant-design/icons";
|
||||
import { useNavigate, useParams } from "react-router";
|
||||
|
||||
import {
|
||||
@@ -28,7 +28,6 @@ type EditorTaskListItem = {
|
||||
hasAnnotation: boolean;
|
||||
annotationUpdatedAt?: string | null;
|
||||
annotationStatus?: AnnotationResultStatus | null;
|
||||
segmentStats?: SegmentStats;
|
||||
};
|
||||
|
||||
type LsfMessage = {
|
||||
@@ -36,21 +35,6 @@ type LsfMessage = {
|
||||
payload?: unknown;
|
||||
};
|
||||
|
||||
type SegmentInfo = {
|
||||
idx: number;
|
||||
text: string;
|
||||
start: number;
|
||||
end: number;
|
||||
hasAnnotation: boolean;
|
||||
lineIndex: number;
|
||||
chunkIndex: number;
|
||||
};
|
||||
|
||||
type SegmentStats = {
|
||||
done: number;
|
||||
total: number;
|
||||
};
|
||||
|
||||
type ApiResponse<T> = {
|
||||
code?: number;
|
||||
message?: string;
|
||||
@@ -66,10 +50,11 @@ type EditorTaskPayload = {
|
||||
type EditorTaskResponse = {
|
||||
task?: EditorTaskPayload;
|
||||
segmented?: boolean;
|
||||
segments?: SegmentInfo[];
|
||||
totalSegments?: number;
|
||||
currentSegmentIndex?: number;
|
||||
};
|
||||
|
||||
|
||||
type EditorTaskListResponse = {
|
||||
content?: EditorTaskListItem[];
|
||||
totalElements?: number;
|
||||
@@ -91,8 +76,6 @@ type ExportPayload = {
|
||||
requestId?: string | null;
|
||||
};
|
||||
|
||||
type SwitchDecision = "save" | "discard" | "cancel";
|
||||
|
||||
const LSF_IFRAME_SRC = "/lsf/lsf.html";
|
||||
const TASK_PAGE_START = 0;
|
||||
const TASK_PAGE_SIZE = 200;
|
||||
@@ -154,16 +137,6 @@ const isAnnotationResultEmpty = (annotation?: Record<string, unknown>) => {
|
||||
};
|
||||
|
||||
const resolveTaskStatusMeta = (item: EditorTaskListItem) => {
|
||||
const segmentSummary = resolveSegmentSummary(item);
|
||||
if (segmentSummary) {
|
||||
if (segmentSummary.done >= segmentSummary.total) {
|
||||
return { text: "已标注", type: "success" as const };
|
||||
}
|
||||
if (segmentSummary.done > 0) {
|
||||
return { text: "标注中", type: "warning" as const };
|
||||
}
|
||||
return { text: "未标注", type: "secondary" as const };
|
||||
}
|
||||
if (!item.hasAnnotation) {
|
||||
return { text: "未标注", type: "secondary" as const };
|
||||
}
|
||||
@@ -216,25 +189,6 @@ const buildAnnotationSnapshot = (annotation?: Record<string, unknown>) => {
|
||||
const buildSnapshotKey = (fileId: string, segmentIndex?: number) =>
|
||||
`${fileId}::${segmentIndex ?? "full"}`;
|
||||
|
||||
const buildSegmentStats = (segmentList?: SegmentInfo[] | null): SegmentStats | null => {
|
||||
if (!Array.isArray(segmentList) || segmentList.length === 0) return null;
|
||||
const total = segmentList.length;
|
||||
const done = segmentList.reduce((count, seg) => count + (seg.hasAnnotation ? 1 : 0), 0);
|
||||
return { done, total };
|
||||
};
|
||||
|
||||
const normalizeSegmentStats = (stats?: SegmentStats | null): SegmentStats | null => {
|
||||
if (!stats) return null;
|
||||
const total = Number(stats.total);
|
||||
const done = Number(stats.done);
|
||||
if (!Number.isFinite(total) || total <= 0) return null;
|
||||
const safeDone = Math.min(Math.max(done, 0), total);
|
||||
return { done: safeDone, total };
|
||||
};
|
||||
|
||||
const resolveSegmentSummary = (item: EditorTaskListItem) =>
|
||||
normalizeSegmentStats(item.segmentStats);
|
||||
|
||||
const mergeTaskItems = (base: EditorTaskListItem[], next: EditorTaskListItem[]) => {
|
||||
if (next.length === 0) return base;
|
||||
const seen = new Set(base.map((item) => item.fileId));
|
||||
@@ -282,18 +236,13 @@ export default function LabelStudioTextEditor() {
|
||||
resolve: (payload?: ExportPayload) => void;
|
||||
timer?: number;
|
||||
} | null>(null);
|
||||
const exportCheckSeqRef = useRef(0);
|
||||
const savedSnapshotsRef = useRef<Record<string, string>>({});
|
||||
const pendingAutoAdvanceRef = useRef(false);
|
||||
const segmentStatsCacheRef = useRef<Record<string, SegmentStats>>({});
|
||||
const segmentStatsSeqRef = useRef(0);
|
||||
const segmentStatsLoadingRef = useRef<Set<string>>(new Set());
|
||||
|
||||
const [loadingProject, setLoadingProject] = useState(true);
|
||||
const [loadingTasks, setLoadingTasks] = useState(false);
|
||||
const [loadingTaskDetail, setLoadingTaskDetail] = useState(false);
|
||||
const [saving, setSaving] = useState(false);
|
||||
const [segmentSwitching, setSegmentSwitching] = useState(false);
|
||||
|
||||
const [iframeReady, setIframeReady] = useState(false);
|
||||
const [lsReady, setLsReady] = useState(false);
|
||||
@@ -306,16 +255,19 @@ export default function LabelStudioTextEditor() {
|
||||
const [prefetching, setPrefetching] = useState(false);
|
||||
const [selectedFileId, setSelectedFileId] = useState<string>("");
|
||||
const [sidebarCollapsed, setSidebarCollapsed] = useState(false);
|
||||
const [autoSaveOnSwitch, setAutoSaveOnSwitch] = useState(false);
|
||||
|
||||
// 分段相关状态
|
||||
const [segmented, setSegmented] = useState(false);
|
||||
const [segments, setSegments] = useState<SegmentInfo[]>([]);
|
||||
const [currentSegmentIndex, setCurrentSegmentIndex] = useState(0);
|
||||
const [segmentTotal, setSegmentTotal] = useState(0);
|
||||
const isTextProject = useMemo(
|
||||
() => (project?.datasetType || "").toUpperCase() === "TEXT",
|
||||
[project?.datasetType],
|
||||
);
|
||||
const segmentIndices = useMemo(() => {
|
||||
if (segmentTotal <= 0) return [] as number[];
|
||||
return Array.from({ length: segmentTotal }, (_, index) => index);
|
||||
}, [segmentTotal]);
|
||||
|
||||
const focusIframe = useCallback(() => {
|
||||
const iframe = iframeRef.current;
|
||||
@@ -330,70 +282,6 @@ export default function LabelStudioTextEditor() {
|
||||
win.postMessage({ type, payload }, origin);
|
||||
}, [origin]);
|
||||
|
||||
const applySegmentStats = useCallback((fileId: string, stats: SegmentStats | null) => {
|
||||
if (!fileId) return;
|
||||
const normalized = normalizeSegmentStats(stats);
|
||||
setTasks((prev) =>
|
||||
prev.map((item) =>
|
||||
item.fileId === fileId
|
||||
? { ...item, segmentStats: normalized || undefined }
|
||||
: item
|
||||
)
|
||||
);
|
||||
}, []);
|
||||
|
||||
const updateSegmentStatsCache = useCallback((fileId: string, stats: SegmentStats | null) => {
|
||||
if (!fileId) return;
|
||||
const normalized = normalizeSegmentStats(stats);
|
||||
if (normalized) {
|
||||
segmentStatsCacheRef.current[fileId] = normalized;
|
||||
} else {
|
||||
delete segmentStatsCacheRef.current[fileId];
|
||||
}
|
||||
applySegmentStats(fileId, normalized);
|
||||
}, [applySegmentStats]);
|
||||
|
||||
const fetchSegmentStatsForFile = useCallback(async (fileId: string, seq: number) => {
|
||||
if (!projectId || !fileId) return;
|
||||
if (segmentStatsCacheRef.current[fileId] || segmentStatsLoadingRef.current.has(fileId)) return;
|
||||
segmentStatsLoadingRef.current.add(fileId);
|
||||
try {
|
||||
const resp = (await getEditorTaskUsingGet(projectId, fileId, {
|
||||
segmentIndex: 0,
|
||||
})) as ApiResponse<EditorTaskResponse>;
|
||||
if (segmentStatsSeqRef.current !== seq) return;
|
||||
const data = resp?.data;
|
||||
if (!data?.segmented) return;
|
||||
const stats = buildSegmentStats(data.segments);
|
||||
if (!stats) return;
|
||||
segmentStatsCacheRef.current[fileId] = stats;
|
||||
applySegmentStats(fileId, stats);
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
} finally {
|
||||
segmentStatsLoadingRef.current.delete(fileId);
|
||||
}
|
||||
}, [applySegmentStats, projectId]);
|
||||
|
||||
const prefetchSegmentStats = useCallback((items: EditorTaskListItem[]) => {
|
||||
if (!projectId) return;
|
||||
const fileIds = items
|
||||
.map((item) => item.fileId)
|
||||
.filter((fileId) => fileId && !segmentStatsCacheRef.current[fileId]);
|
||||
if (fileIds.length === 0) return;
|
||||
const seq = segmentStatsSeqRef.current;
|
||||
let cursor = 0;
|
||||
const workerCount = Math.min(3, fileIds.length);
|
||||
const runWorker = async () => {
|
||||
while (cursor < fileIds.length && segmentStatsSeqRef.current === seq) {
|
||||
const fileId = fileIds[cursor];
|
||||
cursor += 1;
|
||||
await fetchSegmentStatsForFile(fileId, seq);
|
||||
}
|
||||
};
|
||||
void Promise.all(Array.from({ length: workerCount }, () => runWorker()));
|
||||
}, [fetchSegmentStatsForFile, projectId]);
|
||||
|
||||
const confirmEmptyAnnotationStatus = useCallback(() => {
|
||||
return new Promise<AnnotationResultStatus | null>((resolve) => {
|
||||
let resolved = false;
|
||||
@@ -446,8 +334,6 @@ export default function LabelStudioTextEditor() {
|
||||
|
||||
const updateTaskSelection = useCallback((items: EditorTaskListItem[]) => {
|
||||
const isCompleted = (item: EditorTaskListItem) => {
|
||||
const summary = resolveSegmentSummary(item);
|
||||
if (summary) return summary.done >= summary.total;
|
||||
return item.hasAnnotation;
|
||||
};
|
||||
const defaultFileId =
|
||||
@@ -508,9 +394,6 @@ export default function LabelStudioTextEditor() {
|
||||
if (mode === "reset") {
|
||||
prefetchSeqRef.current += 1;
|
||||
setPrefetching(false);
|
||||
segmentStatsSeqRef.current += 1;
|
||||
segmentStatsCacheRef.current = {};
|
||||
segmentStatsLoadingRef.current = new Set();
|
||||
}
|
||||
if (mode === "append") {
|
||||
setLoadingMore(true);
|
||||
@@ -591,20 +474,19 @@ export default function LabelStudioTextEditor() {
|
||||
if (seq !== initSeqRef.current) return;
|
||||
|
||||
// 更新分段状态
|
||||
const segmentIndex = data?.segmented
|
||||
const isSegmented = !!data?.segmented;
|
||||
const segmentIndex = isSegmented
|
||||
? resolveSegmentIndex(data.currentSegmentIndex) ?? 0
|
||||
: undefined;
|
||||
if (data?.segmented) {
|
||||
const stats = buildSegmentStats(data.segments);
|
||||
if (isSegmented) {
|
||||
setSegmented(true);
|
||||
setSegments(data.segments || []);
|
||||
setCurrentSegmentIndex(segmentIndex ?? 0);
|
||||
updateSegmentStatsCache(fileId, stats);
|
||||
const totalSegments = Number(data?.totalSegments ?? 0);
|
||||
setSegmentTotal(Number.isFinite(totalSegments) && totalSegments > 0 ? totalSegments : 0);
|
||||
} else {
|
||||
setSegmented(false);
|
||||
setSegments([]);
|
||||
setCurrentSegmentIndex(0);
|
||||
updateSegmentStatsCache(fileId, null);
|
||||
setSegmentTotal(0);
|
||||
}
|
||||
|
||||
const taskData = {
|
||||
@@ -664,19 +546,14 @@ export default function LabelStudioTextEditor() {
|
||||
} finally {
|
||||
if (seq === initSeqRef.current) setLoadingTaskDetail(false);
|
||||
}
|
||||
}, [iframeReady, message, postToIframe, project, projectId, updateSegmentStatsCache]);
|
||||
}, [iframeReady, message, postToIframe, project, projectId]);
|
||||
|
||||
const advanceAfterSave = useCallback(async (fileId: string, segmentIndex?: number) => {
|
||||
if (!fileId) return;
|
||||
if (segmented && segments.length > 0) {
|
||||
const sortedSegmentIndices = segments
|
||||
.map((seg) => seg.idx)
|
||||
.sort((a, b) => a - b);
|
||||
const baseIndex = segmentIndex ?? currentSegmentIndex;
|
||||
const currentPos = sortedSegmentIndices.indexOf(baseIndex);
|
||||
const nextSegmentIndex =
|
||||
currentPos >= 0 ? sortedSegmentIndices[currentPos + 1] : sortedSegmentIndices[0];
|
||||
if (nextSegmentIndex !== undefined) {
|
||||
if (segmented && segmentTotal > 0) {
|
||||
const baseIndex = Math.max(segmentIndex ?? currentSegmentIndex, 0);
|
||||
const nextSegmentIndex = baseIndex + 1;
|
||||
if (nextSegmentIndex < segmentTotal) {
|
||||
await initEditorForFile(fileId, nextSegmentIndex);
|
||||
return;
|
||||
}
|
||||
@@ -698,7 +575,7 @@ export default function LabelStudioTextEditor() {
|
||||
initEditorForFile,
|
||||
message,
|
||||
segmented,
|
||||
segments,
|
||||
segmentTotal,
|
||||
tasks,
|
||||
]);
|
||||
|
||||
@@ -772,16 +649,6 @@ export default function LabelStudioTextEditor() {
|
||||
const snapshot = buildAnnotationSnapshot(isRecord(annotation) ? annotation : undefined);
|
||||
savedSnapshotsRef.current[snapshotKey] = snapshot;
|
||||
|
||||
// 分段模式下更新当前段落的标注状态
|
||||
if (segmented && segmentIndex !== undefined) {
|
||||
const nextSegments = segments.map((seg) =>
|
||||
seg.idx === segmentIndex
|
||||
? { ...seg, hasAnnotation: true }
|
||||
: seg
|
||||
);
|
||||
setSegments(nextSegments);
|
||||
updateSegmentStatsCache(String(fileId), buildSegmentStats(nextSegments));
|
||||
}
|
||||
if (options?.autoAdvance) {
|
||||
await advanceAfterSave(String(fileId), segmentIndex);
|
||||
}
|
||||
@@ -800,69 +667,10 @@ export default function LabelStudioTextEditor() {
|
||||
message,
|
||||
projectId,
|
||||
segmented,
|
||||
segments,
|
||||
selectedFileId,
|
||||
tasks,
|
||||
updateSegmentStatsCache,
|
||||
]);
|
||||
|
||||
const requestExportForCheck = useCallback(() => {
|
||||
if (!iframeReady || !lsReady) return Promise.resolve(undefined);
|
||||
if (exportCheckRef.current) {
|
||||
if (exportCheckRef.current.timer) {
|
||||
window.clearTimeout(exportCheckRef.current.timer);
|
||||
}
|
||||
exportCheckRef.current.resolve(undefined);
|
||||
exportCheckRef.current = null;
|
||||
}
|
||||
const requestId = `check_${Date.now()}_${++exportCheckSeqRef.current}`;
|
||||
return new Promise<ExportPayload | undefined>((resolve) => {
|
||||
const timer = window.setTimeout(() => {
|
||||
if (exportCheckRef.current?.requestId === requestId) {
|
||||
exportCheckRef.current = null;
|
||||
}
|
||||
resolve(undefined);
|
||||
}, 3000);
|
||||
exportCheckRef.current = {
|
||||
requestId,
|
||||
resolve,
|
||||
timer,
|
||||
};
|
||||
postToIframe("LS_EXPORT_CHECK", { requestId });
|
||||
});
|
||||
}, [iframeReady, lsReady, postToIframe]);
|
||||
|
||||
const confirmSaveBeforeSwitch = useCallback(() => {
|
||||
return new Promise<SwitchDecision>((resolve) => {
|
||||
let resolved = false;
|
||||
let modalInstance: { destroy: () => void } | null = null;
|
||||
const settle = (decision: SwitchDecision) => {
|
||||
if (resolved) return;
|
||||
resolved = true;
|
||||
resolve(decision);
|
||||
};
|
||||
const handleDiscard = () => {
|
||||
if (modalInstance) modalInstance.destroy();
|
||||
settle("discard");
|
||||
};
|
||||
modalInstance = modal.confirm({
|
||||
title: "当前段落有未保存标注",
|
||||
content: (
|
||||
<div className="flex flex-col gap-2">
|
||||
<Typography.Text>切换段落前请先保存当前标注。</Typography.Text>
|
||||
<Button type="link" danger style={{ padding: 0, height: "auto" }} onClick={handleDiscard}>
|
||||
放弃未保存并切换
|
||||
</Button>
|
||||
</div>
|
||||
),
|
||||
okText: "保存并切换",
|
||||
cancelText: "取消",
|
||||
onOk: () => settle("save"),
|
||||
onCancel: () => settle("cancel"),
|
||||
});
|
||||
});
|
||||
}, [modal]);
|
||||
|
||||
const requestExport = useCallback((autoAdvance: boolean) => {
|
||||
if (!selectedFileId) {
|
||||
message.warning("请先选择文件");
|
||||
@@ -875,7 +683,7 @@ export default function LabelStudioTextEditor() {
|
||||
useEffect(() => {
|
||||
const handleSaveShortcut = (event: KeyboardEvent) => {
|
||||
if (!isSaveShortcut(event) || event.repeat) return;
|
||||
if (saving || loadingTaskDetail || segmentSwitching) return;
|
||||
if (saving || loadingTaskDetail) return;
|
||||
if (!iframeReady || !lsReady) return;
|
||||
event.preventDefault();
|
||||
event.stopPropagation();
|
||||
@@ -883,83 +691,7 @@ export default function LabelStudioTextEditor() {
|
||||
};
|
||||
window.addEventListener("keydown", handleSaveShortcut);
|
||||
return () => window.removeEventListener("keydown", handleSaveShortcut);
|
||||
}, [iframeReady, loadingTaskDetail, lsReady, requestExport, saving, segmentSwitching]);
|
||||
|
||||
// 段落切换处理
|
||||
const handleSegmentChange = useCallback(async (newIndex: number) => {
|
||||
if (newIndex === currentSegmentIndex) return;
|
||||
if (segmentSwitching || saving || loadingTaskDetail) return;
|
||||
if (!iframeReady || !lsReady) {
|
||||
message.warning("编辑器未就绪,无法切换段落");
|
||||
return;
|
||||
}
|
||||
|
||||
setSegmentSwitching(true);
|
||||
try {
|
||||
const payload = await requestExportForCheck();
|
||||
if (!payload) {
|
||||
message.warning("无法读取当前标注,已取消切换");
|
||||
return;
|
||||
}
|
||||
|
||||
const payloadTaskId = payload.taskId;
|
||||
if (expectedTaskIdRef.current && payloadTaskId) {
|
||||
if (Number(payloadTaskId) !== expectedTaskIdRef.current) {
|
||||
message.warning("已忽略过期的标注数据");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const payloadFileId = payload.fileId || selectedFileId;
|
||||
const payloadSegmentIndex = resolveSegmentIndex(payload.segmentIndex);
|
||||
const resolvedSegmentIndex =
|
||||
payloadSegmentIndex !== undefined
|
||||
? payloadSegmentIndex
|
||||
: segmented
|
||||
? currentSegmentIndex
|
||||
: undefined;
|
||||
const annotation = isRecord(payload.annotation) ? payload.annotation : undefined;
|
||||
const snapshotKey = payloadFileId
|
||||
? buildSnapshotKey(String(payloadFileId), resolvedSegmentIndex)
|
||||
: undefined;
|
||||
const latestSnapshot = buildAnnotationSnapshot(annotation);
|
||||
const lastSnapshot = snapshotKey ? savedSnapshotsRef.current[snapshotKey] : undefined;
|
||||
const hasUnsavedChange = snapshotKey !== undefined && lastSnapshot !== undefined && latestSnapshot !== lastSnapshot;
|
||||
|
||||
if (hasUnsavedChange) {
|
||||
if (autoSaveOnSwitch) {
|
||||
const saved = await saveFromExport(payload);
|
||||
if (!saved) return;
|
||||
} else {
|
||||
const decision = await confirmSaveBeforeSwitch();
|
||||
if (decision === "cancel") return;
|
||||
if (decision === "save") {
|
||||
const saved = await saveFromExport(payload);
|
||||
if (!saved) return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await initEditorForFile(selectedFileId, newIndex);
|
||||
} finally {
|
||||
setSegmentSwitching(false);
|
||||
}
|
||||
}, [
|
||||
autoSaveOnSwitch,
|
||||
confirmSaveBeforeSwitch,
|
||||
currentSegmentIndex,
|
||||
iframeReady,
|
||||
initEditorForFile,
|
||||
loadingTaskDetail,
|
||||
lsReady,
|
||||
message,
|
||||
requestExportForCheck,
|
||||
saveFromExport,
|
||||
segmented,
|
||||
selectedFileId,
|
||||
segmentSwitching,
|
||||
saving,
|
||||
]);
|
||||
}, [iframeReady, loadingTaskDetail, lsReady, requestExport, saving]);
|
||||
|
||||
useEffect(() => {
|
||||
setIframeReady(false);
|
||||
@@ -977,12 +709,9 @@ export default function LabelStudioTextEditor() {
|
||||
expectedTaskIdRef.current = null;
|
||||
// 重置分段状态
|
||||
setSegmented(false);
|
||||
setSegments([]);
|
||||
setCurrentSegmentIndex(0);
|
||||
setSegmentTotal(0);
|
||||
savedSnapshotsRef.current = {};
|
||||
segmentStatsSeqRef.current += 1;
|
||||
segmentStatsCacheRef.current = {};
|
||||
segmentStatsLoadingRef.current = new Set();
|
||||
if (exportCheckRef.current?.timer) {
|
||||
window.clearTimeout(exportCheckRef.current.timer);
|
||||
}
|
||||
@@ -996,12 +725,6 @@ export default function LabelStudioTextEditor() {
|
||||
loadTasks({ mode: "reset" });
|
||||
}, [project?.supported, loadTasks]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!segmented) return;
|
||||
if (tasks.length === 0) return;
|
||||
prefetchSegmentStats(tasks);
|
||||
}, [prefetchSegmentStats, segmented, tasks]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!selectedFileId) return;
|
||||
initEditorForFile(selectedFileId);
|
||||
@@ -1026,60 +749,6 @@ export default function LabelStudioTextEditor() {
|
||||
return () => window.removeEventListener("focus", handleWindowFocus);
|
||||
}, [focusIframe, lsReady]);
|
||||
|
||||
const segmentTreeData = useMemo(() => {
|
||||
if (!segmented || segments.length === 0) return [];
|
||||
const lineMap = new Map<number, SegmentInfo[]>();
|
||||
segments.forEach((seg) => {
|
||||
const list = lineMap.get(seg.lineIndex) || [];
|
||||
list.push(seg);
|
||||
lineMap.set(seg.lineIndex, list);
|
||||
});
|
||||
return Array.from(lineMap.entries())
|
||||
.sort((a, b) => a[0] - b[0])
|
||||
.map(([lineIndex, lineSegments]) => ({
|
||||
key: `line-${lineIndex}`,
|
||||
title: `第${lineIndex + 1}行`,
|
||||
selectable: false,
|
||||
children: lineSegments
|
||||
.sort((a, b) => a.chunkIndex - b.chunkIndex)
|
||||
.map((seg) => ({
|
||||
key: `seg-${seg.idx}`,
|
||||
title: (
|
||||
<span className="flex items-center gap-1">
|
||||
<span>{`片${seg.chunkIndex + 1}`}</span>
|
||||
{seg.hasAnnotation && (
|
||||
<CheckOutlined style={{ fontSize: 10, color: "#52c41a" }} />
|
||||
)}
|
||||
</span>
|
||||
),
|
||||
})),
|
||||
}));
|
||||
}, [segmented, segments]);
|
||||
|
||||
const segmentLineKeys = useMemo(
|
||||
() => segmentTreeData.map((item) => String(item.key)),
|
||||
[segmentTreeData]
|
||||
);
|
||||
|
||||
const inProgressSegmentedCount = useMemo(() => {
|
||||
if (tasks.length === 0) return 0;
|
||||
return tasks.reduce((count, item) => {
|
||||
const summary = resolveSegmentSummary(item);
|
||||
if (!summary) return count;
|
||||
return summary.done < summary.total ? count + 1 : count;
|
||||
}, 0);
|
||||
}, [tasks]);
|
||||
|
||||
const handleSegmentSelect = useCallback((keys: Array<string | number>) => {
|
||||
const [first] = keys;
|
||||
if (first === undefined || first === null) return;
|
||||
const key = String(first);
|
||||
if (!key.startsWith("seg-")) return;
|
||||
const nextIndex = Number(key.replace("seg-", ""));
|
||||
if (!Number.isFinite(nextIndex)) return;
|
||||
handleSegmentChange(nextIndex);
|
||||
}, [handleSegmentChange]);
|
||||
|
||||
useEffect(() => {
|
||||
const handler = (event: MessageEvent<LsfMessage>) => {
|
||||
if (event.origin !== origin) return;
|
||||
@@ -1148,7 +817,7 @@ export default function LabelStudioTextEditor() {
|
||||
|
||||
const canLoadMore = taskTotalPages > 0 && taskPage + 1 < taskTotalPages;
|
||||
const saveDisabled =
|
||||
!iframeReady || !selectedFileId || saving || segmentSwitching || loadingTaskDetail;
|
||||
!iframeReady || !selectedFileId || saving || loadingTaskDetail;
|
||||
const loadMoreNode = canLoadMore ? (
|
||||
<div className="p-2 text-center">
|
||||
<Button
|
||||
@@ -1265,11 +934,6 @@ export default function LabelStudioTextEditor() {
|
||||
>
|
||||
<div className="px-3 py-2 border-b border-gray-200 bg-white font-medium text-sm flex items-center justify-between gap-2">
|
||||
<span>文件列表</span>
|
||||
{segmented && (
|
||||
<Tag color="orange" style={{ margin: 0 }}>
|
||||
标注中 {inProgressSegmentedCount}
|
||||
</Tag>
|
||||
)}
|
||||
</div>
|
||||
<div className="flex-1 min-h-0 overflow-auto">
|
||||
<List
|
||||
@@ -1278,7 +942,6 @@ export default function LabelStudioTextEditor() {
|
||||
dataSource={tasks}
|
||||
loadMore={loadMoreNode}
|
||||
renderItem={(item) => {
|
||||
const segmentSummary = resolveSegmentSummary(item);
|
||||
const statusMeta = resolveTaskStatusMeta(item);
|
||||
return (
|
||||
<List.Item
|
||||
@@ -1300,11 +963,6 @@ export default function LabelStudioTextEditor() {
|
||||
<Typography.Text type={statusMeta.type} style={{ fontSize: 11 }}>
|
||||
{statusMeta.text}
|
||||
</Typography.Text>
|
||||
{segmentSummary && (
|
||||
<Typography.Text type="secondary" style={{ fontSize: 10 }}>
|
||||
已标注 {segmentSummary.done}/{segmentSummary.total}
|
||||
</Typography.Text>
|
||||
)}
|
||||
</div>
|
||||
{item.annotationUpdatedAt && (
|
||||
<Typography.Text type="secondary" style={{ fontSize: 10 }}>
|
||||
@@ -1323,21 +981,28 @@ export default function LabelStudioTextEditor() {
|
||||
<div className="px-3 py-2 border-b border-gray-200 bg-gray-50 font-medium text-sm flex items-center justify-between">
|
||||
<span>段落/分段</span>
|
||||
<Tag color="blue" style={{ margin: 0 }}>
|
||||
{currentSegmentIndex + 1} / {segments.length}
|
||||
{segmentTotal > 0 ? currentSegmentIndex + 1 : 0} / {segmentTotal}
|
||||
</Tag>
|
||||
</div>
|
||||
<div className="flex-1 min-h-0 overflow-auto px-2 py-2">
|
||||
{segments.length > 0 ? (
|
||||
<Tree
|
||||
showLine
|
||||
blockNode
|
||||
selectedKeys={
|
||||
segmented ? [`seg-${currentSegmentIndex}`] : []
|
||||
}
|
||||
expandedKeys={segmentLineKeys}
|
||||
onSelect={handleSegmentSelect}
|
||||
treeData={segmentTreeData}
|
||||
/>
|
||||
{segmentTotal > 0 ? (
|
||||
<div className="grid grid-cols-[repeat(auto-fill,minmax(44px,1fr))] gap-1">
|
||||
{segmentIndices.map((segmentIndex) => {
|
||||
const isCurrent = segmentIndex === currentSegmentIndex;
|
||||
return (
|
||||
<div
|
||||
key={segmentIndex}
|
||||
className={
|
||||
isCurrent
|
||||
? "h-7 leading-7 rounded bg-blue-500 text-white text-center text-xs font-medium"
|
||||
: "h-7 leading-7 rounded bg-gray-100 text-gray-700 text-center text-xs"
|
||||
}
|
||||
>
|
||||
{segmentIndex + 1}
|
||||
</div>
|
||||
);
|
||||
})}
|
||||
</div>
|
||||
) : (
|
||||
<div className="py-6">
|
||||
<Empty
|
||||
@@ -1347,17 +1012,6 @@ export default function LabelStudioTextEditor() {
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
<div className="px-3 py-2 border-t border-gray-200 flex items-center justify-between">
|
||||
<Typography.Text style={{ fontSize: 12 }}>
|
||||
切段自动保存
|
||||
</Typography.Text>
|
||||
<Switch
|
||||
size="small"
|
||||
checked={autoSaveOnSwitch}
|
||||
onChange={(checked) => setAutoSaveOnSwitch(checked)}
|
||||
disabled={segmentSwitching || saving || loadingTaskDetail || !lsReady}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
|
||||
@@ -1,20 +1,23 @@
|
||||
import { get, post, put, del, download } from "@/utils/request";
|
||||
|
||||
// 导出格式类型
|
||||
export type ExportFormat = "json" | "jsonl" | "csv" | "coco" | "yolo";
|
||||
|
||||
// 标注任务管理相关接口
|
||||
export function queryAnnotationTasksUsingGet(params?: any) {
|
||||
return get("/api/annotation/project", params);
|
||||
}
|
||||
|
||||
export function createAnnotationTaskUsingPost(data: any) {
|
||||
return post("/api/annotation/project", data);
|
||||
}
|
||||
|
||||
export function syncAnnotationTaskUsingPost(data: any) {
|
||||
return post(`/api/annotation/task/sync`, data);
|
||||
}
|
||||
// 导出格式类型
|
||||
export type ExportFormat = "json" | "jsonl" | "csv" | "coco" | "yolo";
|
||||
|
||||
type RequestParams = Record<string, unknown>;
|
||||
type RequestPayload = Record<string, unknown>;
|
||||
|
||||
// 标注任务管理相关接口
|
||||
export function queryAnnotationTasksUsingGet(params?: RequestParams) {
|
||||
return get("/api/annotation/project", params);
|
||||
}
|
||||
|
||||
export function createAnnotationTaskUsingPost(data: RequestPayload) {
|
||||
return post("/api/annotation/project", data);
|
||||
}
|
||||
|
||||
export function syncAnnotationTaskUsingPost(data: RequestPayload) {
|
||||
return post(`/api/annotation/task/sync`, data);
|
||||
}
|
||||
|
||||
export function deleteAnnotationTaskByIdUsingDelete(mappingId: string) {
|
||||
// Backend expects mapping UUID as path parameter
|
||||
@@ -25,9 +28,9 @@ export function getAnnotationTaskByIdUsingGet(taskId: string) {
|
||||
return get(`/api/annotation/project/${taskId}`);
|
||||
}
|
||||
|
||||
export function updateAnnotationTaskByIdUsingPut(taskId: string, data: any) {
|
||||
return put(`/api/annotation/project/${taskId}`, data);
|
||||
}
|
||||
export function updateAnnotationTaskByIdUsingPut(taskId: string, data: RequestPayload) {
|
||||
return put(`/api/annotation/project/${taskId}`, data);
|
||||
}
|
||||
|
||||
// 标签配置管理
|
||||
export function getTagConfigUsingGet() {
|
||||
@@ -35,20 +38,20 @@ export function getTagConfigUsingGet() {
|
||||
}
|
||||
|
||||
// 标注模板管理
|
||||
export function queryAnnotationTemplatesUsingGet(params?: any) {
|
||||
return get("/api/annotation/template", params);
|
||||
}
|
||||
export function queryAnnotationTemplatesUsingGet(params?: RequestParams) {
|
||||
return get("/api/annotation/template", params);
|
||||
}
|
||||
|
||||
export function createAnnotationTemplateUsingPost(data: RequestPayload) {
|
||||
return post("/api/annotation/template", data);
|
||||
}
|
||||
|
||||
export function createAnnotationTemplateUsingPost(data: any) {
|
||||
return post("/api/annotation/template", data);
|
||||
}
|
||||
|
||||
export function updateAnnotationTemplateByIdUsingPut(
|
||||
templateId: string | number,
|
||||
data: any
|
||||
) {
|
||||
return put(`/api/annotation/template/${templateId}`, data);
|
||||
}
|
||||
export function updateAnnotationTemplateByIdUsingPut(
|
||||
templateId: string | number,
|
||||
data: RequestPayload
|
||||
) {
|
||||
return put(`/api/annotation/template/${templateId}`, data);
|
||||
}
|
||||
|
||||
export function deleteAnnotationTemplateByIdUsingDelete(
|
||||
templateId: string | number
|
||||
@@ -65,27 +68,35 @@ export function getEditorProjectInfoUsingGet(projectId: string) {
|
||||
return get(`/api/annotation/editor/projects/${projectId}`);
|
||||
}
|
||||
|
||||
export function listEditorTasksUsingGet(projectId: string, params?: any) {
|
||||
return get(`/api/annotation/editor/projects/${projectId}/tasks`, params);
|
||||
}
|
||||
export function listEditorTasksUsingGet(projectId: string, params?: RequestParams) {
|
||||
return get(`/api/annotation/editor/projects/${projectId}/tasks`, params);
|
||||
}
|
||||
|
||||
export function getEditorTaskUsingGet(
|
||||
projectId: string,
|
||||
fileId: string,
|
||||
params?: { segmentIndex?: number }
|
||||
) {
|
||||
return get(`/api/annotation/editor/projects/${projectId}/tasks/${fileId}`, params);
|
||||
}
|
||||
export function getEditorTaskUsingGet(
|
||||
projectId: string,
|
||||
fileId: string,
|
||||
params?: { segmentIndex?: number }
|
||||
) {
|
||||
return get(`/api/annotation/editor/projects/${projectId}/tasks/${fileId}`, params);
|
||||
}
|
||||
|
||||
export function getEditorTaskSegmentUsingGet(
|
||||
projectId: string,
|
||||
fileId: string,
|
||||
params: { segmentIndex: number }
|
||||
) {
|
||||
return get(`/api/annotation/editor/projects/${projectId}/tasks/${fileId}/segments`, params);
|
||||
}
|
||||
|
||||
export function upsertEditorAnnotationUsingPut(
|
||||
projectId: string,
|
||||
fileId: string,
|
||||
data: {
|
||||
annotation: any;
|
||||
expectedUpdatedAt?: string;
|
||||
segmentIndex?: number;
|
||||
}
|
||||
) {
|
||||
projectId: string,
|
||||
fileId: string,
|
||||
data: {
|
||||
annotation: Record<string, unknown>;
|
||||
expectedUpdatedAt?: string;
|
||||
segmentIndex?: number;
|
||||
}
|
||||
) {
|
||||
return put(`/api/annotation/editor/projects/${projectId}/tasks/${fileId}/annotation`, data);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
import { Select, Input, Form, Radio, Modal, Button, UploadFile, Switch, Tooltip } from "antd";
|
||||
import { InboxOutlined, QuestionCircleOutlined } from "@ant-design/icons";
|
||||
import { dataSourceOptions } from "../../dataset.const";
|
||||
import { Select, Input, Form, Radio, Modal, Button, UploadFile, Switch, Tooltip } from "antd";
|
||||
import { InboxOutlined, QuestionCircleOutlined } from "@ant-design/icons";
|
||||
import { dataSourceOptions } from "../../dataset.const";
|
||||
import { Dataset, DatasetType, DataSource } from "../../dataset.model";
|
||||
import { useCallback, useEffect, useMemo, useState } from "react";
|
||||
import { queryTasksUsingGet } from "@/pages/DataCollection/collection.apis";
|
||||
import { updateDatasetByIdUsingPut } from "../../dataset.api";
|
||||
import { sliceFile } from "@/utils/file.util";
|
||||
import Dragger from "antd/es/upload/Dragger";
|
||||
|
||||
import { queryTasksUsingGet } from "@/pages/DataCollection/collection.apis";
|
||||
import { updateDatasetByIdUsingPut } from "../../dataset.api";
|
||||
import { sliceFile, shouldStreamUpload } from "@/utils/file.util";
|
||||
import Dragger from "antd/es/upload/Dragger";
|
||||
|
||||
const TEXT_FILE_MIME_PREFIX = "text/";
|
||||
const TEXT_FILE_MIME_TYPES = new Set([
|
||||
"application/json",
|
||||
@@ -90,14 +90,16 @@ async function splitFileByLines(file: UploadFile): Promise<UploadFile[]> {
|
||||
const lines = text.split(/\r?\n/).filter((line: string) => line.trim() !== "");
|
||||
if (lines.length === 0) return [];
|
||||
|
||||
// 生成文件名:原文件名_序号.扩展名
|
||||
// 生成文件名:原文件名_序号(不保留后缀)
|
||||
const nameParts = file.name.split(".");
|
||||
const ext = nameParts.length > 1 ? "." + nameParts.pop() : "";
|
||||
if (nameParts.length > 1) {
|
||||
nameParts.pop();
|
||||
}
|
||||
const baseName = nameParts.join(".");
|
||||
const padLength = String(lines.length).length;
|
||||
|
||||
return lines.map((line: string, index: number) => {
|
||||
const newFileName = `${baseName}_${String(index + 1).padStart(padLength, "0")}${ext}`;
|
||||
const newFileName = `${baseName}_${String(index + 1).padStart(padLength, "0")}`;
|
||||
const blob = new Blob([line], { type: "text/plain" });
|
||||
const newFile = new File([blob], newFileName, { type: "text/plain" });
|
||||
return {
|
||||
@@ -131,18 +133,18 @@ type ImportConfig = {
|
||||
};
|
||||
|
||||
export default function ImportConfiguration({
|
||||
data,
|
||||
open,
|
||||
onClose,
|
||||
updateEvent = "update:dataset",
|
||||
prefix,
|
||||
}: {
|
||||
data: Dataset | null;
|
||||
open: boolean;
|
||||
onClose: () => void;
|
||||
updateEvent?: string;
|
||||
prefix?: string;
|
||||
}) {
|
||||
data,
|
||||
open,
|
||||
onClose,
|
||||
updateEvent = "update:dataset",
|
||||
prefix,
|
||||
}: {
|
||||
data: Dataset | null;
|
||||
open: boolean;
|
||||
onClose: () => void;
|
||||
updateEvent?: string;
|
||||
prefix?: string;
|
||||
}) {
|
||||
const [form] = Form.useForm();
|
||||
const [collectionOptions, setCollectionOptions] = useState<SelectOption[]>([]);
|
||||
const availableSourceOptions = dataSourceOptions.filter(
|
||||
@@ -160,23 +162,81 @@ export default function ImportConfiguration({
|
||||
return files.some((file) => !isTextUploadFile(file));
|
||||
}, [importConfig.files]);
|
||||
const isTextDataset = data?.datasetType === DatasetType.TEXT;
|
||||
|
||||
// 本地上传文件相关逻辑
|
||||
|
||||
const handleUpload = async (dataset: Dataset) => {
|
||||
let filesToUpload =
|
||||
|
||||
// 本地上传文件相关逻辑
|
||||
|
||||
const handleUpload = async (dataset: Dataset) => {
|
||||
const filesToUpload =
|
||||
(form.getFieldValue("files") as UploadFile[] | undefined) || [];
|
||||
|
||||
// 如果启用分行分割,处理文件
|
||||
|
||||
// 如果启用分行分割,对大文件使用流式处理
|
||||
if (importConfig.splitByLine && !hasNonTextFile) {
|
||||
const splitResults = await Promise.all(
|
||||
filesToUpload.map((file) => splitFileByLines(file))
|
||||
);
|
||||
filesToUpload = splitResults.flat();
|
||||
// 检查是否有大文件需要流式分割上传
|
||||
const filesForStreamUpload: File[] = [];
|
||||
const filesForNormalUpload: UploadFile[] = [];
|
||||
|
||||
for (const file of filesToUpload) {
|
||||
const originFile = file.originFileObj ?? file;
|
||||
if (originFile instanceof File && shouldStreamUpload(originFile)) {
|
||||
filesForStreamUpload.push(originFile);
|
||||
} else {
|
||||
filesForNormalUpload.push(file);
|
||||
}
|
||||
}
|
||||
|
||||
// 大文件使用流式分割上传
|
||||
if (filesForStreamUpload.length > 0) {
|
||||
window.dispatchEvent(
|
||||
new CustomEvent("upload:dataset-stream", {
|
||||
detail: {
|
||||
dataset,
|
||||
files: filesForStreamUpload,
|
||||
updateEvent,
|
||||
hasArchive: importConfig.hasArchive,
|
||||
prefix: currentPrefix,
|
||||
},
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
// 小文件使用传统分割方式
|
||||
if (filesForNormalUpload.length > 0) {
|
||||
const splitResults = await Promise.all(
|
||||
filesForNormalUpload.map((file) => splitFileByLines(file))
|
||||
);
|
||||
const smallFilesToUpload = splitResults.flat();
|
||||
|
||||
// 计算分片列表
|
||||
const sliceList = smallFilesToUpload.map((file) => {
|
||||
const originFile = (file.originFileObj ?? file) as Blob;
|
||||
const slices = sliceFile(originFile);
|
||||
return {
|
||||
originFile: originFile,
|
||||
slices,
|
||||
name: file.name,
|
||||
size: originFile.size || 0,
|
||||
};
|
||||
});
|
||||
|
||||
console.log("[ImportConfiguration] Uploading small files with currentPrefix:", currentPrefix);
|
||||
window.dispatchEvent(
|
||||
new CustomEvent("upload:dataset", {
|
||||
detail: {
|
||||
dataset,
|
||||
files: sliceList,
|
||||
updateEvent,
|
||||
hasArchive: importConfig.hasArchive,
|
||||
prefix: currentPrefix,
|
||||
},
|
||||
})
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 计算分片列表
|
||||
const sliceList = filesToUpload.map((file) => {
|
||||
|
||||
// 未启用分行分割,使用普通上传
|
||||
// 计算分片列表
|
||||
const sliceList = filesToUpload.map((file) => {
|
||||
const originFile = (file.originFileObj ?? file) as Blob;
|
||||
const slices = sliceFile(originFile);
|
||||
return {
|
||||
@@ -185,22 +245,22 @@ export default function ImportConfiguration({
|
||||
name: file.name,
|
||||
size: originFile.size || 0,
|
||||
};
|
||||
});
|
||||
|
||||
console.log("[ImportConfiguration] Uploading with currentPrefix:", currentPrefix);
|
||||
window.dispatchEvent(
|
||||
new CustomEvent("upload:dataset", {
|
||||
detail: {
|
||||
dataset,
|
||||
files: sliceList,
|
||||
updateEvent,
|
||||
hasArchive: importConfig.hasArchive,
|
||||
prefix: currentPrefix,
|
||||
},
|
||||
})
|
||||
);
|
||||
};
|
||||
|
||||
});
|
||||
|
||||
console.log("[ImportConfiguration] Uploading with currentPrefix:", currentPrefix);
|
||||
window.dispatchEvent(
|
||||
new CustomEvent("upload:dataset", {
|
||||
detail: {
|
||||
dataset,
|
||||
files: sliceList,
|
||||
updateEvent,
|
||||
hasArchive: importConfig.hasArchive,
|
||||
prefix: currentPrefix,
|
||||
},
|
||||
})
|
||||
);
|
||||
};
|
||||
|
||||
const fetchCollectionTasks = useCallback(async () => {
|
||||
if (importConfig.source !== DataSource.COLLECTION) return;
|
||||
try {
|
||||
@@ -212,7 +272,7 @@ export default function ImportConfiguration({
|
||||
label: task.name,
|
||||
value: task.id,
|
||||
}));
|
||||
setCollectionOptions(options);
|
||||
setCollectionOptions(options);
|
||||
} catch (error) {
|
||||
console.error("Error fetching collection tasks:", error);
|
||||
}
|
||||
@@ -229,27 +289,31 @@ export default function ImportConfiguration({
|
||||
});
|
||||
console.log('[ImportConfiguration] resetState done, currentPrefix still:', currentPrefix);
|
||||
}, [currentPrefix, form]);
|
||||
|
||||
const handleImportData = async () => {
|
||||
if (!data) return;
|
||||
console.log('[ImportConfiguration] handleImportData called, currentPrefix:', currentPrefix);
|
||||
if (importConfig.source === DataSource.UPLOAD) {
|
||||
await handleUpload(data);
|
||||
} else if (importConfig.source === DataSource.COLLECTION) {
|
||||
await updateDatasetByIdUsingPut(data.id, {
|
||||
...importConfig,
|
||||
});
|
||||
}
|
||||
onClose();
|
||||
};
|
||||
|
||||
|
||||
const handleImportData = async () => {
|
||||
if (!data) return;
|
||||
console.log('[ImportConfiguration] handleImportData called, currentPrefix:', currentPrefix);
|
||||
if (importConfig.source === DataSource.UPLOAD) {
|
||||
// 立即显示任务中心,让用户感知上传已开始(在文件分割等耗时操作之前)
|
||||
window.dispatchEvent(
|
||||
new CustomEvent("show:task-popover", { detail: { show: true } })
|
||||
);
|
||||
await handleUpload(data);
|
||||
} else if (importConfig.source === DataSource.COLLECTION) {
|
||||
await updateDatasetByIdUsingPut(data.id, {
|
||||
...importConfig,
|
||||
});
|
||||
}
|
||||
onClose();
|
||||
};
|
||||
|
||||
useEffect(() => {
|
||||
if (open) {
|
||||
setCurrentPrefix(prefix || "");
|
||||
console.log('[ImportConfiguration] Modal opened with prefix:', prefix);
|
||||
resetState();
|
||||
fetchCollectionTasks();
|
||||
}
|
||||
console.log('[ImportConfiguration] Modal opened with prefix:', prefix);
|
||||
resetState();
|
||||
fetchCollectionTasks();
|
||||
}
|
||||
}, [fetchCollectionTasks, open, prefix, resetState]);
|
||||
|
||||
useEffect(() => {
|
||||
@@ -259,111 +323,111 @@ export default function ImportConfiguration({
|
||||
form.setFieldsValue({ splitByLine: false });
|
||||
setImportConfig((prev) => ({ ...prev, splitByLine: false }));
|
||||
}, [form, hasNonTextFile, importConfig.files, importConfig.splitByLine]);
|
||||
|
||||
// Separate effect for fetching collection tasks when source changes
|
||||
useEffect(() => {
|
||||
if (open && importConfig.source === DataSource.COLLECTION) {
|
||||
fetchCollectionTasks();
|
||||
}
|
||||
|
||||
// Separate effect for fetching collection tasks when source changes
|
||||
useEffect(() => {
|
||||
if (open && importConfig.source === DataSource.COLLECTION) {
|
||||
fetchCollectionTasks();
|
||||
}
|
||||
}, [fetchCollectionTasks, importConfig.source, open]);
|
||||
|
||||
return (
|
||||
<Modal
|
||||
title="导入数据"
|
||||
open={open}
|
||||
width={600}
|
||||
onCancel={() => {
|
||||
onClose();
|
||||
resetState();
|
||||
}}
|
||||
maskClosable={false}
|
||||
footer={
|
||||
<>
|
||||
<Button onClick={onClose}>取消</Button>
|
||||
<Button
|
||||
type="primary"
|
||||
disabled={!importConfig?.files?.length && !importConfig.dataSource}
|
||||
onClick={handleImportData}
|
||||
>
|
||||
确定
|
||||
</Button>
|
||||
</>
|
||||
}
|
||||
>
|
||||
<Form
|
||||
form={form}
|
||||
layout="vertical"
|
||||
initialValues={importConfig || {}}
|
||||
onValuesChange={(_, allValues) => setImportConfig(allValues)}
|
||||
>
|
||||
<Form.Item
|
||||
label="数据源"
|
||||
name="source"
|
||||
rules={[{ required: true, message: "请选择数据源" }]}
|
||||
>
|
||||
|
||||
return (
|
||||
<Modal
|
||||
title="导入数据"
|
||||
open={open}
|
||||
width={600}
|
||||
onCancel={() => {
|
||||
onClose();
|
||||
resetState();
|
||||
}}
|
||||
maskClosable={false}
|
||||
footer={
|
||||
<>
|
||||
<Button onClick={onClose}>取消</Button>
|
||||
<Button
|
||||
type="primary"
|
||||
disabled={!importConfig?.files?.length && !importConfig.dataSource}
|
||||
onClick={handleImportData}
|
||||
>
|
||||
确定
|
||||
</Button>
|
||||
</>
|
||||
}
|
||||
>
|
||||
<Form
|
||||
form={form}
|
||||
layout="vertical"
|
||||
initialValues={importConfig || {}}
|
||||
onValuesChange={(_, allValues) => setImportConfig(allValues)}
|
||||
>
|
||||
<Form.Item
|
||||
label="数据源"
|
||||
name="source"
|
||||
rules={[{ required: true, message: "请选择数据源" }]}
|
||||
>
|
||||
<Radio.Group
|
||||
buttonStyle="solid"
|
||||
options={availableSourceOptions}
|
||||
optionType="button"
|
||||
/>
|
||||
</Form.Item>
|
||||
{importConfig?.source === DataSource.COLLECTION && (
|
||||
<Form.Item name="dataSource" label="归集任务" required>
|
||||
<Select placeholder="请选择归集任务" options={collectionOptions} />
|
||||
</Form.Item>
|
||||
)}
|
||||
|
||||
{/* obs import */}
|
||||
{importConfig?.source === DataSource.OBS && (
|
||||
<div className="grid grid-cols-2 gap-3 p-4 bg-blue-50 rounded-lg">
|
||||
<Form.Item
|
||||
name="endpoint"
|
||||
rules={[{ required: true }]}
|
||||
label="Endpoint"
|
||||
>
|
||||
<Input
|
||||
className="h-8 text-xs"
|
||||
placeholder="obs.cn-north-4.myhuaweicloud.com"
|
||||
/>
|
||||
</Form.Item>
|
||||
<Form.Item
|
||||
name="bucket"
|
||||
rules={[{ required: true }]}
|
||||
label="Bucket"
|
||||
>
|
||||
<Input className="h-8 text-xs" placeholder="my-bucket" />
|
||||
</Form.Item>
|
||||
<Form.Item
|
||||
name="accessKey"
|
||||
rules={[{ required: true }]}
|
||||
label="Access Key"
|
||||
>
|
||||
<Input className="h-8 text-xs" placeholder="Access Key" />
|
||||
</Form.Item>
|
||||
<Form.Item
|
||||
name="secretKey"
|
||||
rules={[{ required: true }]}
|
||||
label="Secret Key"
|
||||
>
|
||||
<Input
|
||||
type="password"
|
||||
className="h-8 text-xs"
|
||||
placeholder="Secret Key"
|
||||
/>
|
||||
</Form.Item>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Local Upload Component */}
|
||||
{importConfig?.source === DataSource.UPLOAD && (
|
||||
<>
|
||||
<Form.Item
|
||||
label="自动解压上传的压缩包"
|
||||
name="hasArchive"
|
||||
valuePropName="checked"
|
||||
>
|
||||
<Switch />
|
||||
</Form.Item>
|
||||
{importConfig?.source === DataSource.COLLECTION && (
|
||||
<Form.Item name="dataSource" label="归集任务" required>
|
||||
<Select placeholder="请选择归集任务" options={collectionOptions} />
|
||||
</Form.Item>
|
||||
)}
|
||||
|
||||
{/* obs import */}
|
||||
{importConfig?.source === DataSource.OBS && (
|
||||
<div className="grid grid-cols-2 gap-3 p-4 bg-blue-50 rounded-lg">
|
||||
<Form.Item
|
||||
name="endpoint"
|
||||
rules={[{ required: true }]}
|
||||
label="Endpoint"
|
||||
>
|
||||
<Input
|
||||
className="h-8 text-xs"
|
||||
placeholder="obs.cn-north-4.myhuaweicloud.com"
|
||||
/>
|
||||
</Form.Item>
|
||||
<Form.Item
|
||||
name="bucket"
|
||||
rules={[{ required: true }]}
|
||||
label="Bucket"
|
||||
>
|
||||
<Input className="h-8 text-xs" placeholder="my-bucket" />
|
||||
</Form.Item>
|
||||
<Form.Item
|
||||
name="accessKey"
|
||||
rules={[{ required: true }]}
|
||||
label="Access Key"
|
||||
>
|
||||
<Input className="h-8 text-xs" placeholder="Access Key" />
|
||||
</Form.Item>
|
||||
<Form.Item
|
||||
name="secretKey"
|
||||
rules={[{ required: true }]}
|
||||
label="Secret Key"
|
||||
>
|
||||
<Input
|
||||
type="password"
|
||||
className="h-8 text-xs"
|
||||
placeholder="Secret Key"
|
||||
/>
|
||||
</Form.Item>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Local Upload Component */}
|
||||
{importConfig?.source === DataSource.UPLOAD && (
|
||||
<>
|
||||
<Form.Item
|
||||
label="自动解压上传的压缩包"
|
||||
name="hasArchive"
|
||||
valuePropName="checked"
|
||||
>
|
||||
<Switch />
|
||||
</Form.Item>
|
||||
{isTextDataset && (
|
||||
<Form.Item
|
||||
label={
|
||||
@@ -386,10 +450,10 @@ export default function ImportConfiguration({
|
||||
<Switch disabled={hasNonTextFile} />
|
||||
</Form.Item>
|
||||
)}
|
||||
<Form.Item
|
||||
label="上传文件"
|
||||
name="files"
|
||||
valuePropName="fileList"
|
||||
<Form.Item
|
||||
label="上传文件"
|
||||
name="files"
|
||||
valuePropName="fileList"
|
||||
getValueFromEvent={(
|
||||
event: { fileList?: UploadFile[] } | UploadFile[]
|
||||
) => {
|
||||
@@ -398,69 +462,69 @@ export default function ImportConfiguration({
|
||||
}
|
||||
return event?.fileList;
|
||||
}}
|
||||
rules={[
|
||||
{
|
||||
required: true,
|
||||
message: "请上传文件",
|
||||
},
|
||||
]}
|
||||
>
|
||||
<Dragger
|
||||
className="w-full"
|
||||
beforeUpload={() => false}
|
||||
multiple
|
||||
>
|
||||
<p className="ant-upload-drag-icon">
|
||||
<InboxOutlined />
|
||||
</p>
|
||||
<p className="ant-upload-text">本地文件上传</p>
|
||||
<p className="ant-upload-hint">拖拽文件到此处或点击选择文件</p>
|
||||
</Dragger>
|
||||
</Form.Item>
|
||||
</>
|
||||
)}
|
||||
|
||||
{/* Target Configuration */}
|
||||
{importConfig?.target && importConfig?.target !== DataSource.UPLOAD && (
|
||||
<div className="space-y-3 p-4 bg-blue-50 rounded-lg">
|
||||
{importConfig?.target === DataSource.DATABASE && (
|
||||
<div className="grid grid-cols-2 gap-3">
|
||||
<Form.Item
|
||||
name="databaseType"
|
||||
rules={[{ required: true }]}
|
||||
label="数据库类型"
|
||||
>
|
||||
<Select
|
||||
className="w-full"
|
||||
options={[
|
||||
{ label: "MySQL", value: "mysql" },
|
||||
{ label: "PostgreSQL", value: "postgresql" },
|
||||
{ label: "MongoDB", value: "mongodb" },
|
||||
]}
|
||||
></Select>
|
||||
</Form.Item>
|
||||
<Form.Item
|
||||
name="tableName"
|
||||
rules={[{ required: true }]}
|
||||
label="表名"
|
||||
>
|
||||
<Input className="h-8 text-xs" placeholder="dataset_table" />
|
||||
</Form.Item>
|
||||
<Form.Item
|
||||
name="connectionString"
|
||||
rules={[{ required: true }]}
|
||||
label="连接字符串"
|
||||
>
|
||||
<Input
|
||||
className="h-8 text-xs col-span-2"
|
||||
placeholder="数据库连接字符串"
|
||||
/>
|
||||
</Form.Item>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)}
|
||||
</Form>
|
||||
</Modal>
|
||||
);
|
||||
}
|
||||
rules={[
|
||||
{
|
||||
required: true,
|
||||
message: "请上传文件",
|
||||
},
|
||||
]}
|
||||
>
|
||||
<Dragger
|
||||
className="w-full"
|
||||
beforeUpload={() => false}
|
||||
multiple
|
||||
>
|
||||
<p className="ant-upload-drag-icon">
|
||||
<InboxOutlined />
|
||||
</p>
|
||||
<p className="ant-upload-text">本地文件上传</p>
|
||||
<p className="ant-upload-hint">拖拽文件到此处或点击选择文件</p>
|
||||
</Dragger>
|
||||
</Form.Item>
|
||||
</>
|
||||
)}
|
||||
|
||||
{/* Target Configuration */}
|
||||
{importConfig?.target && importConfig?.target !== DataSource.UPLOAD && (
|
||||
<div className="space-y-3 p-4 bg-blue-50 rounded-lg">
|
||||
{importConfig?.target === DataSource.DATABASE && (
|
||||
<div className="grid grid-cols-2 gap-3">
|
||||
<Form.Item
|
||||
name="databaseType"
|
||||
rules={[{ required: true }]}
|
||||
label="数据库类型"
|
||||
>
|
||||
<Select
|
||||
className="w-full"
|
||||
options={[
|
||||
{ label: "MySQL", value: "mysql" },
|
||||
{ label: "PostgreSQL", value: "postgresql" },
|
||||
{ label: "MongoDB", value: "mongodb" },
|
||||
]}
|
||||
></Select>
|
||||
</Form.Item>
|
||||
<Form.Item
|
||||
name="tableName"
|
||||
rules={[{ required: true }]}
|
||||
label="表名"
|
||||
>
|
||||
<Input className="h-8 text-xs" placeholder="dataset_table" />
|
||||
</Form.Item>
|
||||
<Form.Item
|
||||
name="connectionString"
|
||||
rules={[{ required: true }]}
|
||||
label="连接字符串"
|
||||
>
|
||||
<Input
|
||||
className="h-8 text-xs col-span-2"
|
||||
placeholder="数据库连接字符串"
|
||||
/>
|
||||
</Form.Item>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)}
|
||||
</Form>
|
||||
</Modal>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -102,6 +102,13 @@ export interface DatasetTask {
|
||||
executionHistory?: { time: string; status: string }[];
|
||||
}
|
||||
|
||||
export interface StreamUploadInfo {
|
||||
currentFile: string;
|
||||
fileIndex: number;
|
||||
totalFiles: number;
|
||||
uploadedLines: number;
|
||||
}
|
||||
|
||||
export interface TaskItem {
|
||||
key: string;
|
||||
title: string;
|
||||
@@ -113,4 +120,6 @@ export interface TaskItem {
|
||||
updateEvent?: string;
|
||||
size?: number;
|
||||
hasArchive?: boolean;
|
||||
prefix?: string;
|
||||
streamUploadInfo?: StreamUploadInfo;
|
||||
}
|
||||
|
||||
@@ -1,69 +1,93 @@
|
||||
import {
|
||||
cancelUploadUsingPut,
|
||||
preUploadUsingPost,
|
||||
uploadFileChunkUsingPost,
|
||||
} from "@/pages/DataManagement/dataset.api";
|
||||
import { Button, Empty, Progress } from "antd";
|
||||
import { DeleteOutlined } from "@ant-design/icons";
|
||||
import { useEffect } from "react";
|
||||
import { useFileSliceUpload } from "@/hooks/useSliceUpload";
|
||||
|
||||
export default function TaskUpload() {
|
||||
const { createTask, taskList, removeTask, handleUpload } = useFileSliceUpload(
|
||||
{
|
||||
preUpload: preUploadUsingPost,
|
||||
uploadChunk: uploadFileChunkUsingPost,
|
||||
cancelUpload: cancelUploadUsingPut,
|
||||
}
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
const uploadHandler = (e: any) => {
|
||||
console.log('[TaskUpload] Received upload event detail:', e.detail);
|
||||
const { files } = e.detail;
|
||||
const task = createTask(e.detail);
|
||||
console.log('[TaskUpload] Created task with prefix:', task.prefix);
|
||||
handleUpload({ task, files });
|
||||
};
|
||||
window.addEventListener("upload:dataset", uploadHandler);
|
||||
return () => {
|
||||
window.removeEventListener("upload:dataset", uploadHandler);
|
||||
};
|
||||
}, []);
|
||||
|
||||
return (
|
||||
<div
|
||||
className="w-90 max-w-90 max-h-96 overflow-y-auto p-2"
|
||||
id="header-task-popover"
|
||||
>
|
||||
{taskList.length > 0 &&
|
||||
taskList.map((task) => (
|
||||
<div key={task.key} className="border-b border-gray-200 pb-2">
|
||||
<div className="flex items-center justify-between">
|
||||
<div>{task.title}</div>
|
||||
<Button
|
||||
type="text"
|
||||
danger
|
||||
disabled={!task?.cancelFn}
|
||||
onClick={() =>
|
||||
removeTask({
|
||||
...task,
|
||||
isCancel: true,
|
||||
})
|
||||
}
|
||||
icon={<DeleteOutlined />}
|
||||
></Button>
|
||||
</div>
|
||||
|
||||
<Progress size="small" percent={task.percent} />
|
||||
</div>
|
||||
))}
|
||||
{taskList.length === 0 && (
|
||||
<Empty
|
||||
image={Empty.PRESENTED_IMAGE_SIMPLE}
|
||||
description="暂无上传任务"
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
import {
|
||||
cancelUploadUsingPut,
|
||||
preUploadUsingPost,
|
||||
uploadFileChunkUsingPost,
|
||||
} from "@/pages/DataManagement/dataset.api";
|
||||
import { Button, Empty, Progress, Tag } from "antd";
|
||||
import { DeleteOutlined, FileTextOutlined } from "@ant-design/icons";
|
||||
import { useEffect } from "react";
|
||||
import { useFileSliceUpload } from "@/hooks/useSliceUpload";
|
||||
|
||||
export default function TaskUpload() {
|
||||
const { createTask, taskList, removeTask, handleUpload, registerStreamUploadListener } = useFileSliceUpload(
|
||||
{
|
||||
preUpload: preUploadUsingPost,
|
||||
uploadChunk: uploadFileChunkUsingPost,
|
||||
cancelUpload: cancelUploadUsingPut,
|
||||
},
|
||||
true, // showTaskCenter
|
||||
true // enableStreamUpload
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
const uploadHandler = (e: Event) => {
|
||||
const customEvent = e as CustomEvent;
|
||||
console.log('[TaskUpload] Received upload event detail:', customEvent.detail);
|
||||
const { files } = customEvent.detail;
|
||||
const task = createTask(customEvent.detail);
|
||||
console.log('[TaskUpload] Created task with prefix:', task.prefix);
|
||||
handleUpload({ task, files });
|
||||
};
|
||||
window.addEventListener("upload:dataset", uploadHandler);
|
||||
return () => {
|
||||
window.removeEventListener("upload:dataset", uploadHandler);
|
||||
};
|
||||
}, [createTask, handleUpload]);
|
||||
|
||||
// 注册流式上传监听器
|
||||
useEffect(() => {
|
||||
const unregister = registerStreamUploadListener();
|
||||
return unregister;
|
||||
}, [registerStreamUploadListener]);
|
||||
|
||||
return (
|
||||
<div
|
||||
className="w-90 max-w-90 max-h-96 overflow-y-auto p-2"
|
||||
id="header-task-popover"
|
||||
>
|
||||
{taskList.length > 0 &&
|
||||
taskList.map((task) => (
|
||||
<div key={task.key} className="border-b border-gray-200 pb-2">
|
||||
<div className="flex items-center justify-between">
|
||||
<div>{task.title}</div>
|
||||
<Button
|
||||
type="text"
|
||||
danger
|
||||
disabled={!task?.cancelFn}
|
||||
onClick={() =>
|
||||
removeTask({
|
||||
...task,
|
||||
isCancel: true,
|
||||
})
|
||||
}
|
||||
icon={<DeleteOutlined />}
|
||||
></Button>
|
||||
</div>
|
||||
|
||||
<Progress size="small" percent={Number(task.percent)} />
|
||||
{task.streamUploadInfo && (
|
||||
<div className="flex items-center gap-2 text-xs text-gray-500 mt-1">
|
||||
<Tag icon={<FileTextOutlined />} size="small">
|
||||
按行分割
|
||||
</Tag>
|
||||
<span>
|
||||
已上传: {task.streamUploadInfo.uploadedLines} 行
|
||||
</span>
|
||||
{task.streamUploadInfo.totalFiles > 1 && (
|
||||
<span>
|
||||
({task.streamUploadInfo.fileIndex}/{task.streamUploadInfo.totalFiles} 文件)
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
))}
|
||||
{taskList.length === 0 && (
|
||||
<Empty
|
||||
image={Empty.PRESENTED_IMAGE_SIMPLE}
|
||||
description="暂无上传任务"
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,79 +1,657 @@
|
||||
import { UploadFile } from "antd";
|
||||
import jsSHA from "jssha";
|
||||
|
||||
const CHUNK_SIZE = 1024 * 1024 * 60;
|
||||
// 默认分片大小:5MB(适合大多数网络环境)
|
||||
export const DEFAULT_CHUNK_SIZE = 1024 * 1024 * 5;
|
||||
// 大文件阈值:10MB
|
||||
export const LARGE_FILE_THRESHOLD = 1024 * 1024 * 10;
|
||||
// 最大并发上传数
|
||||
export const MAX_CONCURRENT_UPLOADS = 3;
|
||||
// 文本文件读取块大小:20MB(用于计算 SHA256)
|
||||
const BUFFER_CHUNK_SIZE = 1024 * 1024 * 20;
|
||||
|
||||
export function sliceFile(file, chunkSize = CHUNK_SIZE): Blob[] {
|
||||
/**
|
||||
* 将文件分割为多个分片
|
||||
* @param file 文件对象
|
||||
* @param chunkSize 分片大小(字节),默认 5MB
|
||||
* @returns 分片数组(Blob 列表)
|
||||
*/
|
||||
export function sliceFile(file: Blob, chunkSize = DEFAULT_CHUNK_SIZE): Blob[] {
|
||||
const totalSize = file.size;
|
||||
const chunks: Blob[] = [];
|
||||
|
||||
// 小文件不需要分片
|
||||
if (totalSize <= chunkSize) {
|
||||
return [file];
|
||||
}
|
||||
|
||||
let start = 0;
|
||||
let end = start + chunkSize;
|
||||
const chunks = [];
|
||||
while (start < totalSize) {
|
||||
const end = Math.min(start + chunkSize, totalSize);
|
||||
const blob = file.slice(start, end);
|
||||
chunks.push(blob);
|
||||
|
||||
start = end;
|
||||
end = start + chunkSize;
|
||||
}
|
||||
|
||||
return chunks;
|
||||
}
|
||||
|
||||
export function calculateSHA256(file: Blob): Promise<string> {
|
||||
let count = 0;
|
||||
const hash = new jsSHA("SHA-256", "ARRAYBUFFER", { encoding: "UTF8" });
|
||||
/**
|
||||
* 计算文件的 SHA256 哈希值
|
||||
* @param file 文件 Blob
|
||||
* @param onProgress 进度回调(可选)
|
||||
* @returns SHA256 哈希字符串
|
||||
*/
|
||||
export function calculateSHA256(
|
||||
file: Blob,
|
||||
onProgress?: (percent: number) => void
|
||||
): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const hash = new jsSHA("SHA-256", "ARRAYBUFFER", { encoding: "UTF8" });
|
||||
const reader = new FileReader();
|
||||
let processedSize = 0;
|
||||
|
||||
function readChunk(start: number, end: number) {
|
||||
const slice = file.slice(start, end);
|
||||
reader.readAsArrayBuffer(slice);
|
||||
}
|
||||
|
||||
const bufferChunkSize = 1024 * 1024 * 20;
|
||||
|
||||
function processChunk(offset: number) {
|
||||
const start = offset;
|
||||
const end = Math.min(start + bufferChunkSize, file.size);
|
||||
count = end;
|
||||
|
||||
const end = Math.min(start + BUFFER_CHUNK_SIZE, file.size);
|
||||
readChunk(start, end);
|
||||
}
|
||||
|
||||
reader.onloadend = function () {
|
||||
const arraybuffer = reader.result;
|
||||
reader.onloadend = function (e) {
|
||||
const arraybuffer = reader.result as ArrayBuffer;
|
||||
if (!arraybuffer) {
|
||||
reject(new Error("Failed to read file"));
|
||||
return;
|
||||
}
|
||||
|
||||
hash.update(arraybuffer);
|
||||
if (count < file.size) {
|
||||
processChunk(count);
|
||||
processedSize += (e.target as FileReader).result?.byteLength || 0;
|
||||
|
||||
if (onProgress) {
|
||||
const percent = Math.min(100, Math.round((processedSize / file.size) * 100));
|
||||
onProgress(percent);
|
||||
}
|
||||
|
||||
if (processedSize < file.size) {
|
||||
processChunk(processedSize);
|
||||
} else {
|
||||
resolve(hash.getHash("HEX", { outputLen: 256 }));
|
||||
}
|
||||
};
|
||||
|
||||
reader.onerror = () => reject(new Error("File reading failed"));
|
||||
processChunk(0);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量计算多个文件的 SHA256
|
||||
* @param files 文件列表
|
||||
* @param onFileProgress 单个文件进度回调(可选)
|
||||
* @returns 哈希值数组
|
||||
*/
|
||||
export async function calculateSHA256Batch(
|
||||
files: Blob[],
|
||||
onFileProgress?: (index: number, percent: number) => void
|
||||
): Promise<string[]> {
|
||||
const results: string[] = [];
|
||||
|
||||
for (let i = 0; i < files.length; i++) {
|
||||
const hash = await calculateSHA256(files[i], (percent) => {
|
||||
onFileProgress?.(i, percent);
|
||||
});
|
||||
results.push(hash);
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查文件是否存在(未被修改或删除)
|
||||
* @param fileList 文件列表
|
||||
* @returns 返回第一个不存在的文件,或 null(如果都存在)
|
||||
*/
|
||||
export function checkIsFilesExist(
|
||||
fileList: UploadFile[]
|
||||
): Promise<UploadFile | null> {
|
||||
fileList: Array<{ originFile?: Blob }>
|
||||
): Promise<{ originFile?: Blob } | null> {
|
||||
return new Promise((resolve) => {
|
||||
const loadEndFn = (file: UploadFile, reachEnd: boolean, e) => {
|
||||
const fileNotExist = !e.target.result;
|
||||
if (!fileList.length) {
|
||||
resolve(null);
|
||||
return;
|
||||
}
|
||||
|
||||
let checkedCount = 0;
|
||||
const totalCount = fileList.length;
|
||||
|
||||
const loadEndFn = (file: { originFile?: Blob }, e: ProgressEvent<FileReader>) => {
|
||||
checkedCount++;
|
||||
const fileNotExist = !e.target?.result;
|
||||
if (fileNotExist) {
|
||||
resolve(file);
|
||||
return;
|
||||
}
|
||||
if (reachEnd) {
|
||||
if (checkedCount >= totalCount) {
|
||||
resolve(null);
|
||||
}
|
||||
};
|
||||
|
||||
for (let i = 0; i < fileList.length; i++) {
|
||||
const { originFile: file } = fileList[i];
|
||||
for (const file of fileList) {
|
||||
const fileReader = new FileReader();
|
||||
fileReader.readAsArrayBuffer(file);
|
||||
fileReader.onloadend = (e) =>
|
||||
loadEndFn(fileList[i], i === fileList.length - 1, e);
|
||||
const actualFile = file.originFile;
|
||||
|
||||
if (!actualFile) {
|
||||
checkedCount++;
|
||||
if (checkedCount >= totalCount) {
|
||||
resolve(null);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
fileReader.readAsArrayBuffer(actualFile.slice(0, 1));
|
||||
fileReader.onloadend = (e) => loadEndFn(file, e);
|
||||
fileReader.onerror = () => {
|
||||
checkedCount++;
|
||||
resolve(file);
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断文件是否为大文件
|
||||
* @param size 文件大小(字节)
|
||||
* @param threshold 阈值(字节),默认 10MB
|
||||
*/
|
||||
export function isLargeFile(size: number, threshold = LARGE_FILE_THRESHOLD): boolean {
|
||||
return size > threshold;
|
||||
}
|
||||
|
||||
/**
|
||||
* 格式化文件大小为人类可读格式
|
||||
* @param bytes 字节数
|
||||
* @param decimals 小数位数
|
||||
*/
|
||||
export function formatFileSize(bytes: number, decimals = 2): string {
|
||||
if (bytes === 0) return "0 B";
|
||||
|
||||
const k = 1024;
|
||||
const sizes = ["B", "KB", "MB", "GB", "TB", "PB"];
|
||||
const i = Math.floor(Math.log(bytes) / Math.log(k));
|
||||
|
||||
return `${parseFloat((bytes / Math.pow(k, i)).toFixed(decimals))} ${sizes[i]}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* 并发执行异步任务
|
||||
* @param tasks 任务函数数组
|
||||
* @param maxConcurrency 最大并发数
|
||||
* @param onTaskComplete 单个任务完成回调(可选)
|
||||
*/
|
||||
export async function runConcurrentTasks<T>(
|
||||
tasks: (() => Promise<T>)[],
|
||||
maxConcurrency: number,
|
||||
onTaskComplete?: (index: number, result: T) => void
|
||||
): Promise<T[]> {
|
||||
const results: T[] = new Array(tasks.length);
|
||||
let index = 0;
|
||||
|
||||
async function runNext(): Promise<void> {
|
||||
const currentIndex = index++;
|
||||
if (currentIndex >= tasks.length) return;
|
||||
|
||||
const result = await tasks[currentIndex]();
|
||||
results[currentIndex] = result;
|
||||
onTaskComplete?.(currentIndex, result);
|
||||
|
||||
await runNext();
|
||||
}
|
||||
|
||||
const workers = Array(Math.min(maxConcurrency, tasks.length))
|
||||
.fill(null)
|
||||
.map(() => runNext());
|
||||
|
||||
await Promise.all(workers);
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* 按行分割文本文件内容
|
||||
* @param text 文本内容
|
||||
* @param skipEmptyLines 是否跳过空行,默认 true
|
||||
* @returns 行数组
|
||||
*/
|
||||
export function splitTextByLines(text: string, skipEmptyLines = true): string[] {
|
||||
const lines = text.split(/\r?\n/);
|
||||
if (skipEmptyLines) {
|
||||
return lines.filter((line) => line.trim() !== "");
|
||||
}
|
||||
return lines;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建分片信息对象
|
||||
* @param file 原始文件
|
||||
* @param chunkSize 分片大小
|
||||
*/
|
||||
export function createFileSliceInfo(
|
||||
file: File | Blob,
|
||||
chunkSize = DEFAULT_CHUNK_SIZE
|
||||
): {
|
||||
originFile: Blob;
|
||||
slices: Blob[];
|
||||
name: string;
|
||||
size: number;
|
||||
totalChunks: number;
|
||||
} {
|
||||
const slices = sliceFile(file, chunkSize);
|
||||
return {
|
||||
originFile: file,
|
||||
slices,
|
||||
name: (file as File).name || "unnamed",
|
||||
size: file.size,
|
||||
totalChunks: slices.length,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 支持的文本文件 MIME 类型前缀
|
||||
*/
|
||||
export const TEXT_FILE_MIME_PREFIX = "text/";
|
||||
|
||||
/**
|
||||
* 支持的文本文件 MIME 类型集合
|
||||
*/
|
||||
export const TEXT_FILE_MIME_TYPES = new Set([
|
||||
"application/json",
|
||||
"application/xml",
|
||||
"application/csv",
|
||||
"application/ndjson",
|
||||
"application/x-ndjson",
|
||||
"application/x-yaml",
|
||||
"application/yaml",
|
||||
"application/javascript",
|
||||
"application/x-javascript",
|
||||
"application/sql",
|
||||
"application/rtf",
|
||||
"application/xhtml+xml",
|
||||
"application/svg+xml",
|
||||
]);
|
||||
|
||||
/**
|
||||
* 支持的文本文件扩展名集合
|
||||
*/
|
||||
export const TEXT_FILE_EXTENSIONS = new Set([
|
||||
".txt",
|
||||
".md",
|
||||
".markdown",
|
||||
".csv",
|
||||
".tsv",
|
||||
".json",
|
||||
".jsonl",
|
||||
".ndjson",
|
||||
".log",
|
||||
".xml",
|
||||
".yaml",
|
||||
".yml",
|
||||
".sql",
|
||||
".js",
|
||||
".ts",
|
||||
".jsx",
|
||||
".tsx",
|
||||
".html",
|
||||
".htm",
|
||||
".css",
|
||||
".scss",
|
||||
".less",
|
||||
".py",
|
||||
".java",
|
||||
".c",
|
||||
".cpp",
|
||||
".h",
|
||||
".hpp",
|
||||
".go",
|
||||
".rs",
|
||||
".rb",
|
||||
".php",
|
||||
".sh",
|
||||
".bash",
|
||||
".zsh",
|
||||
".ps1",
|
||||
".bat",
|
||||
".cmd",
|
||||
".svg",
|
||||
".rtf",
|
||||
]);
|
||||
|
||||
/**
|
||||
* 判断文件是否为文本文件(支持 UploadFile 类型)
|
||||
* @param file UploadFile 对象
|
||||
*/
|
||||
export function isTextUploadFile(file: UploadFile): boolean {
|
||||
const mimeType = (file.type || "").toLowerCase();
|
||||
if (mimeType) {
|
||||
if (mimeType.startsWith(TEXT_FILE_MIME_PREFIX)) return true;
|
||||
if (TEXT_FILE_MIME_TYPES.has(mimeType)) return true;
|
||||
}
|
||||
|
||||
const fileName = file.name || "";
|
||||
const dotIndex = fileName.lastIndexOf(".");
|
||||
if (dotIndex < 0) return false;
|
||||
const ext = fileName.slice(dotIndex).toLowerCase();
|
||||
return TEXT_FILE_EXTENSIONS.has(ext);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断文件名是否为文本文件
|
||||
* @param fileName 文件名
|
||||
*/
|
||||
export function isTextFileByName(fileName: string): boolean {
|
||||
const lowerName = fileName.toLowerCase();
|
||||
|
||||
// 先检查 MIME 类型(如果有)
|
||||
// 这里简化处理,主要通过扩展名判断
|
||||
|
||||
const dotIndex = lowerName.lastIndexOf(".");
|
||||
if (dotIndex < 0) return false;
|
||||
const ext = lowerName.slice(dotIndex);
|
||||
return TEXT_FILE_EXTENSIONS.has(ext);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取文件扩展名
|
||||
* @param fileName 文件名
|
||||
*/
|
||||
export function getFileExtension(fileName: string): string {
|
||||
const dotIndex = fileName.lastIndexOf(".");
|
||||
if (dotIndex < 0) return "";
|
||||
return fileName.slice(dotIndex).toLowerCase();
|
||||
}
|
||||
|
||||
/**
|
||||
* 安全地读取文件为文本
|
||||
* @param file 文件对象
|
||||
* @param encoding 编码,默认 UTF-8
|
||||
*/
|
||||
export function readFileAsText(
|
||||
file: File | Blob,
|
||||
encoding = "UTF-8"
|
||||
): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const reader = new FileReader();
|
||||
reader.onload = (e) => resolve(e.target?.result as string);
|
||||
reader.onerror = () => reject(new Error("Failed to read file"));
|
||||
reader.readAsText(file, encoding);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 流式分割文件并逐行上传
|
||||
* 使用 Blob.slice 逐块读取,避免一次性加载大文件到内存
|
||||
* @param file 文件对象
|
||||
* @param datasetId 数据集ID
|
||||
* @param uploadFn 上传函数,接收 FormData 和配置,返回 Promise
|
||||
* @param onProgress 进度回调 (currentBytes, totalBytes, uploadedLines)
|
||||
* @param chunkSize 每次读取的块大小,默认 1MB
|
||||
* @param options 其他选项
|
||||
* @returns 上传结果统计
|
||||
*/
|
||||
export interface StreamUploadOptions {
|
||||
reqId?: number;
|
||||
resolveReqId?: (params: { totalFileNum: number; totalSize: number }) => Promise<number>;
|
||||
onReqIdResolved?: (reqId: number) => void;
|
||||
fileNamePrefix?: string;
|
||||
hasArchive?: boolean;
|
||||
prefix?: string;
|
||||
signal?: AbortSignal;
|
||||
maxConcurrency?: number;
|
||||
}
|
||||
|
||||
export interface StreamUploadResult {
|
||||
uploadedCount: number;
|
||||
totalBytes: number;
|
||||
skippedEmptyCount: number;
|
||||
}
|
||||
|
||||
async function processFileLines(
|
||||
file: File,
|
||||
chunkSize: number,
|
||||
signal: AbortSignal | undefined,
|
||||
onLine?: (line: string, index: number) => Promise<void> | void,
|
||||
onProgress?: (currentBytes: number, totalBytes: number, processedLines: number) => void
|
||||
): Promise<{ lineCount: number; skippedEmptyCount: number }> {
|
||||
const fileSize = file.size;
|
||||
let offset = 0;
|
||||
let buffer = "";
|
||||
let skippedEmptyCount = 0;
|
||||
let lineIndex = 0;
|
||||
|
||||
while (offset < fileSize) {
|
||||
if (signal?.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
|
||||
const end = Math.min(offset + chunkSize, fileSize);
|
||||
const chunk = file.slice(offset, end);
|
||||
const text = await readFileAsText(chunk);
|
||||
const combined = buffer + text;
|
||||
const lines = combined.split(/\r?\n/);
|
||||
buffer = lines.pop() || "";
|
||||
|
||||
for (const line of lines) {
|
||||
if (signal?.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
if (!line.trim()) {
|
||||
skippedEmptyCount++;
|
||||
continue;
|
||||
}
|
||||
const currentIndex = lineIndex;
|
||||
lineIndex += 1;
|
||||
if (onLine) {
|
||||
await onLine(line, currentIndex);
|
||||
}
|
||||
}
|
||||
|
||||
offset = end;
|
||||
onProgress?.(offset, fileSize, lineIndex);
|
||||
}
|
||||
|
||||
if (buffer.trim()) {
|
||||
const currentIndex = lineIndex;
|
||||
lineIndex += 1;
|
||||
if (onLine) {
|
||||
await onLine(buffer, currentIndex);
|
||||
}
|
||||
} else if (buffer.length > 0) {
|
||||
skippedEmptyCount++;
|
||||
}
|
||||
|
||||
return { lineCount: lineIndex, skippedEmptyCount };
|
||||
}
|
||||
|
||||
export async function streamSplitAndUpload(
|
||||
file: File,
|
||||
uploadFn: (formData: FormData, config?: { onUploadProgress?: (e: { loaded: number; total: number }) => void }) => Promise<unknown>,
|
||||
onProgress?: (currentBytes: number, totalBytes: number, uploadedLines: number) => void,
|
||||
chunkSize: number = 1024 * 1024, // 1MB
|
||||
options: StreamUploadOptions
|
||||
): Promise<StreamUploadResult> {
|
||||
const {
|
||||
reqId: initialReqId,
|
||||
resolveReqId,
|
||||
onReqIdResolved,
|
||||
fileNamePrefix,
|
||||
prefix,
|
||||
signal,
|
||||
maxConcurrency = 3,
|
||||
} = options;
|
||||
|
||||
const fileSize = file.size;
|
||||
let uploadedCount = 0;
|
||||
let skippedEmptyCount = 0;
|
||||
|
||||
// 获取文件名基础部分和扩展名
|
||||
const originalFileName = fileNamePrefix || file.name;
|
||||
const lastDotIndex = originalFileName.lastIndexOf(".");
|
||||
const baseName = lastDotIndex > 0 ? originalFileName.slice(0, lastDotIndex) : originalFileName;
|
||||
const fileExtension = lastDotIndex > 0 ? originalFileName.slice(lastDotIndex) : "";
|
||||
|
||||
let resolvedReqId = initialReqId;
|
||||
if (!resolvedReqId) {
|
||||
const scanResult = await processFileLines(file, chunkSize, signal);
|
||||
const totalFileNum = scanResult.lineCount;
|
||||
skippedEmptyCount = scanResult.skippedEmptyCount;
|
||||
if (totalFileNum === 0) {
|
||||
return {
|
||||
uploadedCount: 0,
|
||||
totalBytes: fileSize,
|
||||
skippedEmptyCount,
|
||||
};
|
||||
}
|
||||
if (signal?.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
if (!resolveReqId) {
|
||||
throw new Error("Missing pre-upload request id");
|
||||
}
|
||||
resolvedReqId = await resolveReqId({ totalFileNum, totalSize: fileSize });
|
||||
if (!resolvedReqId) {
|
||||
throw new Error("Failed to resolve pre-upload request id");
|
||||
}
|
||||
onReqIdResolved?.(resolvedReqId);
|
||||
}
|
||||
if (!resolvedReqId) {
|
||||
throw new Error("Missing pre-upload request id");
|
||||
}
|
||||
|
||||
/**
|
||||
* 上传单行内容
|
||||
* 每行作为独立文件上传,fileNo 对应行序号,chunkNo 固定为 1
|
||||
*/
|
||||
async function uploadLine(line: string, index: number): Promise<void> {
|
||||
// 检查是否已取消
|
||||
if (signal?.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
|
||||
if (!line.trim()) {
|
||||
skippedEmptyCount++;
|
||||
return;
|
||||
}
|
||||
|
||||
// 保留原始文件扩展名
|
||||
const fileIndex = index + 1;
|
||||
const newFileName = `${baseName}_${String(fileIndex).padStart(6, "0")}${fileExtension}`;
|
||||
const blob = new Blob([line], { type: "text/plain" });
|
||||
const lineFile = new File([blob], newFileName, { type: "text/plain" });
|
||||
|
||||
// 计算分片(小文件通常只需要一个分片)
|
||||
const slices = sliceFile(lineFile, DEFAULT_CHUNK_SIZE);
|
||||
const checkSum = await calculateSHA256(slices[0]);
|
||||
|
||||
// 检查是否已取消(计算哈希后)
|
||||
if (signal?.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
|
||||
const formData = new FormData();
|
||||
formData.append("file", slices[0]);
|
||||
formData.append("reqId", resolvedReqId.toString());
|
||||
// 每行作为独立文件上传
|
||||
formData.append("fileNo", fileIndex.toString());
|
||||
formData.append("chunkNo", "1");
|
||||
formData.append("fileName", newFileName);
|
||||
formData.append("fileSize", lineFile.size.toString());
|
||||
formData.append("totalChunkNum", "1");
|
||||
formData.append("checkSumHex", checkSum);
|
||||
if (prefix !== undefined) {
|
||||
formData.append("prefix", prefix);
|
||||
}
|
||||
|
||||
await uploadFn(formData, {
|
||||
onUploadProgress: () => {
|
||||
// 单行文件很小,进度主要用于追踪上传状态
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
const inFlight = new Set<Promise<void>>();
|
||||
let uploadError: unknown = null;
|
||||
const enqueueUpload = async (line: string, index: number) => {
|
||||
if (signal?.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
if (uploadError) {
|
||||
throw uploadError;
|
||||
}
|
||||
const uploadPromise = uploadLine(line, index)
|
||||
.then(() => {
|
||||
uploadedCount++;
|
||||
})
|
||||
.catch((err) => {
|
||||
uploadError = err;
|
||||
});
|
||||
inFlight.add(uploadPromise);
|
||||
uploadPromise.finally(() => inFlight.delete(uploadPromise));
|
||||
if (inFlight.size >= maxConcurrency) {
|
||||
await Promise.race(inFlight);
|
||||
if (uploadError) {
|
||||
throw uploadError;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let uploadResult: { lineCount: number; skippedEmptyCount: number } | null = null;
|
||||
try {
|
||||
uploadResult = await processFileLines(
|
||||
file,
|
||||
chunkSize,
|
||||
signal,
|
||||
enqueueUpload,
|
||||
(currentBytes, totalBytes) => {
|
||||
onProgress?.(currentBytes, totalBytes, uploadedCount);
|
||||
}
|
||||
);
|
||||
if (uploadError) {
|
||||
throw uploadError;
|
||||
}
|
||||
} finally {
|
||||
if (inFlight.size > 0) {
|
||||
await Promise.allSettled(inFlight);
|
||||
}
|
||||
}
|
||||
|
||||
if (!uploadResult || (initialReqId && uploadResult.lineCount === 0)) {
|
||||
return {
|
||||
uploadedCount: 0,
|
||||
totalBytes: fileSize,
|
||||
skippedEmptyCount: uploadResult?.skippedEmptyCount ?? 0,
|
||||
};
|
||||
}
|
||||
|
||||
if (!initialReqId) {
|
||||
skippedEmptyCount = skippedEmptyCount || uploadResult.skippedEmptyCount;
|
||||
} else {
|
||||
skippedEmptyCount = uploadResult.skippedEmptyCount;
|
||||
}
|
||||
|
||||
return {
|
||||
uploadedCount,
|
||||
totalBytes: fileSize,
|
||||
skippedEmptyCount,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断文件是否需要流式分割上传
|
||||
* @param file 文件对象
|
||||
* @param threshold 阈值,默认 5MB
|
||||
*/
|
||||
export function shouldStreamUpload(file: File, threshold: number = 5 * 1024 * 1024): boolean {
|
||||
return file.size > threshold;
|
||||
}
|
||||
|
||||
@@ -92,6 +92,14 @@ class Request {
|
||||
});
|
||||
}
|
||||
|
||||
// 监听 AbortSignal 来中止请求
|
||||
if (config.signal) {
|
||||
config.signal.addEventListener("abort", () => {
|
||||
xhr.abort();
|
||||
reject(new Error("上传已取消"));
|
||||
});
|
||||
}
|
||||
|
||||
// 监听上传进度
|
||||
xhr.upload.addEventListener("progress", function (event) {
|
||||
if (event.lengthComputable) {
|
||||
|
||||
@@ -19,6 +19,7 @@ from app.db.session import get_db
|
||||
from app.module.annotation.schema.editor import (
|
||||
EditorProjectInfo,
|
||||
EditorTaskListResponse,
|
||||
EditorTaskSegmentResponse,
|
||||
EditorTaskResponse,
|
||||
UpsertAnnotationRequest,
|
||||
UpsertAnnotationResponse,
|
||||
@@ -87,6 +88,21 @@ async def get_editor_task(
|
||||
return StandardResponse(code=200, message="success", data=task)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/projects/{project_id}/tasks/{file_id}/segments",
|
||||
response_model=StandardResponse[EditorTaskSegmentResponse],
|
||||
)
|
||||
async def get_editor_task_segment(
|
||||
project_id: str = Path(..., description="标注项目ID(t_dm_labeling_projects.id)"),
|
||||
file_id: str = Path(..., description="文件ID(t_dm_dataset_files.id)"),
|
||||
segment_index: int = Query(..., ge=0, alias="segmentIndex", description="段落索引(从0开始)"),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
service = AnnotationEditorService(db)
|
||||
result = await service.get_task_segment(project_id, file_id, segment_index)
|
||||
return StandardResponse(code=200, message="success", data=result)
|
||||
|
||||
|
||||
@router.put(
|
||||
"/projects/{project_id}/tasks/{file_id}/annotation",
|
||||
response_model=StandardResponse[UpsertAnnotationResponse],
|
||||
|
||||
@@ -150,6 +150,18 @@ async def create_mapping(
|
||||
labeling_project, snapshot_file_ids
|
||||
)
|
||||
|
||||
# 如果启用了分段且为文本数据集,预生成切片结构
|
||||
if dataset_type == TEXT_DATASET_TYPE and request.segmentation_enabled:
|
||||
try:
|
||||
from ..service.editor import AnnotationEditorService
|
||||
editor_service = AnnotationEditorService(db)
|
||||
# 异步预计算切片(不阻塞创建响应)
|
||||
segmentation_result = await editor_service.precompute_segmentation_for_project(labeling_project.id)
|
||||
logger.info(f"Precomputed segmentation for project {labeling_project.id}: {segmentation_result}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to precompute segmentation for project {labeling_project.id}: {e}")
|
||||
# 不影响项目创建,只记录警告
|
||||
|
||||
response_data = DatasetMappingCreateResponse(
|
||||
id=mapping.id,
|
||||
labeling_project_id=str(mapping.labeling_project_id),
|
||||
|
||||
@@ -79,12 +79,9 @@ class EditorTaskListResponse(BaseModel):
|
||||
|
||||
|
||||
class SegmentInfo(BaseModel):
|
||||
"""段落信息(用于文本分段标注)"""
|
||||
"""段落摘要(用于文本分段标注)"""
|
||||
|
||||
idx: int = Field(..., description="段落索引")
|
||||
text: str = Field(..., description="段落文本")
|
||||
start: int = Field(..., description="在原文中的起始位置")
|
||||
end: int = Field(..., description="在原文中的结束位置")
|
||||
has_annotation: bool = Field(False, alias="hasAnnotation", description="该段落是否已有标注")
|
||||
line_index: int = Field(0, alias="lineIndex", description="JSONL 行索引(从0开始)")
|
||||
chunk_index: int = Field(0, alias="chunkIndex", description="行内分片索引(从0开始)")
|
||||
@@ -100,7 +97,29 @@ class EditorTaskResponse(BaseModel):
|
||||
|
||||
# 分段相关字段
|
||||
segmented: bool = Field(False, description="是否启用分段模式")
|
||||
segments: Optional[List[SegmentInfo]] = Field(None, description="段落列表")
|
||||
total_segments: int = Field(0, alias="totalSegments", description="总段落数")
|
||||
current_segment_index: int = Field(0, alias="currentSegmentIndex", description="当前段落索引")
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
|
||||
class SegmentDetail(BaseModel):
|
||||
"""段落内容"""
|
||||
|
||||
idx: int = Field(..., description="段落索引")
|
||||
text: str = Field(..., description="段落文本")
|
||||
has_annotation: bool = Field(False, alias="hasAnnotation", description="该段落是否已有标注")
|
||||
line_index: int = Field(0, alias="lineIndex", description="JSONL 行索引(从0开始)")
|
||||
chunk_index: int = Field(0, alias="chunkIndex", description="行内分片索引(从0开始)")
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
|
||||
class EditorTaskSegmentResponse(BaseModel):
|
||||
"""编辑器单段内容响应"""
|
||||
|
||||
segmented: bool = Field(False, description="是否启用分段模式")
|
||||
segment: Optional[SegmentDetail] = Field(None, description="段落内容")
|
||||
total_segments: int = Field(0, alias="totalSegments", description="总段落数")
|
||||
current_segment_index: int = Field(0, alias="currentSegmentIndex", description="当前段落索引")
|
||||
|
||||
|
||||
@@ -36,7 +36,9 @@ from app.module.annotation.schema.editor import (
|
||||
EditorProjectInfo,
|
||||
EditorTaskListItem,
|
||||
EditorTaskListResponse,
|
||||
EditorTaskSegmentResponse,
|
||||
EditorTaskResponse,
|
||||
SegmentDetail,
|
||||
SegmentInfo,
|
||||
UpsertAnnotationRequest,
|
||||
UpsertAnnotationResponse,
|
||||
@@ -538,6 +540,50 @@ class AnnotationEditorService:
|
||||
return value
|
||||
return raw_text
|
||||
|
||||
def _build_segment_contexts(
|
||||
self,
|
||||
records: List[Tuple[Optional[Dict[str, Any]], str]],
|
||||
record_texts: List[str],
|
||||
segment_annotation_keys: set[str],
|
||||
) -> Tuple[List[SegmentInfo], List[Tuple[Optional[Dict[str, Any]], str, str, int, int]]]:
|
||||
splitter = AnnotationTextSplitter(max_chars=self.SEGMENT_THRESHOLD)
|
||||
segments: List[SegmentInfo] = []
|
||||
segment_contexts: List[Tuple[Optional[Dict[str, Any]], str, str, int, int]] = []
|
||||
segment_cursor = 0
|
||||
|
||||
for record_index, ((payload, raw_text), record_text) in enumerate(zip(records, record_texts)):
|
||||
normalized_text = record_text or ""
|
||||
if len(normalized_text) > self.SEGMENT_THRESHOLD:
|
||||
raw_segments = splitter.split(normalized_text)
|
||||
for chunk_index, seg in enumerate(raw_segments):
|
||||
segments.append(
|
||||
SegmentInfo(
|
||||
idx=segment_cursor,
|
||||
hasAnnotation=str(segment_cursor) in segment_annotation_keys,
|
||||
lineIndex=record_index,
|
||||
chunkIndex=chunk_index,
|
||||
)
|
||||
)
|
||||
segment_contexts.append((payload, raw_text, seg["text"], record_index, chunk_index))
|
||||
segment_cursor += 1
|
||||
else:
|
||||
segments.append(
|
||||
SegmentInfo(
|
||||
idx=segment_cursor,
|
||||
hasAnnotation=str(segment_cursor) in segment_annotation_keys,
|
||||
lineIndex=record_index,
|
||||
chunkIndex=0,
|
||||
)
|
||||
)
|
||||
segment_contexts.append((payload, raw_text, normalized_text, record_index, 0))
|
||||
segment_cursor += 1
|
||||
|
||||
if not segments:
|
||||
segments = [SegmentInfo(idx=0, hasAnnotation=False, lineIndex=0, chunkIndex=0)]
|
||||
segment_contexts = [(None, "", "", 0, 0)]
|
||||
|
||||
return segments, segment_contexts
|
||||
|
||||
async def get_project_info(self, project_id: str) -> EditorProjectInfo:
|
||||
project = await self._get_project_or_404(project_id)
|
||||
|
||||
@@ -668,6 +714,124 @@ class AnnotationEditorService:
|
||||
|
||||
return await self._build_text_task(project, file_record, file_id, segment_index)
|
||||
|
||||
async def get_task_segment(
|
||||
self,
|
||||
project_id: str,
|
||||
file_id: str,
|
||||
segment_index: int,
|
||||
) -> EditorTaskSegmentResponse:
|
||||
project = await self._get_project_or_404(project_id)
|
||||
|
||||
dataset_type = self._normalize_dataset_type(await self._get_dataset_type(project.dataset_id))
|
||||
if dataset_type != DATASET_TYPE_TEXT:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="当前仅支持 TEXT 项目的段落内容",
|
||||
)
|
||||
|
||||
file_result = await self.db.execute(
|
||||
select(DatasetFiles).where(
|
||||
DatasetFiles.id == file_id,
|
||||
DatasetFiles.dataset_id == project.dataset_id,
|
||||
)
|
||||
)
|
||||
file_record = file_result.scalar_one_or_none()
|
||||
if not file_record:
|
||||
raise HTTPException(status_code=404, detail=f"文件不存在或不属于该项目: {file_id}")
|
||||
|
||||
if not self._resolve_segmentation_enabled(project):
|
||||
return EditorTaskSegmentResponse(
|
||||
segmented=False,
|
||||
segment=None,
|
||||
totalSegments=0,
|
||||
currentSegmentIndex=0,
|
||||
)
|
||||
|
||||
text_content = await self._fetch_text_content_via_download_api(project.dataset_id, file_id)
|
||||
assert isinstance(text_content, str)
|
||||
label_config = await self._resolve_project_label_config(project)
|
||||
primary_text_key = self._resolve_primary_text_key(label_config)
|
||||
file_name = str(getattr(file_record, "file_name", "")).lower()
|
||||
|
||||
records: List[Tuple[Optional[Dict[str, Any]], str]] = []
|
||||
if file_name.endswith(JSONL_EXTENSION):
|
||||
records = self._parse_jsonl_records(text_content)
|
||||
else:
|
||||
parsed_payload = self._try_parse_json_payload(text_content)
|
||||
if parsed_payload:
|
||||
records = [(parsed_payload, text_content)]
|
||||
|
||||
if not records:
|
||||
records = [(None, text_content)]
|
||||
|
||||
record_texts = [
|
||||
self._resolve_primary_text_value(payload, raw_text, primary_text_key)
|
||||
for payload, raw_text in records
|
||||
]
|
||||
if not record_texts:
|
||||
record_texts = [text_content]
|
||||
|
||||
needs_segmentation = len(records) > 1 or any(
|
||||
len(text or "") > self.SEGMENT_THRESHOLD for text in record_texts
|
||||
)
|
||||
if not needs_segmentation:
|
||||
return EditorTaskSegmentResponse(
|
||||
segmented=False,
|
||||
segment=None,
|
||||
totalSegments=0,
|
||||
currentSegmentIndex=0,
|
||||
)
|
||||
|
||||
ann_result = await self.db.execute(
|
||||
select(AnnotationResult).where(
|
||||
AnnotationResult.project_id == project.id,
|
||||
AnnotationResult.file_id == file_id,
|
||||
)
|
||||
)
|
||||
ann = ann_result.scalar_one_or_none()
|
||||
segment_annotations: Dict[str, Dict[str, Any]] = {}
|
||||
if ann and isinstance(ann.annotation, dict):
|
||||
segment_annotations = self._extract_segment_annotations(ann.annotation)
|
||||
segment_annotation_keys = set(segment_annotations.keys())
|
||||
|
||||
segments, segment_contexts = self._build_segment_contexts(
|
||||
records,
|
||||
record_texts,
|
||||
segment_annotation_keys,
|
||||
)
|
||||
|
||||
total_segments = len(segment_contexts)
|
||||
if total_segments == 0:
|
||||
return EditorTaskSegmentResponse(
|
||||
segmented=False,
|
||||
segment=None,
|
||||
totalSegments=0,
|
||||
currentSegmentIndex=0,
|
||||
)
|
||||
|
||||
if segment_index < 0 or segment_index >= total_segments:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"segmentIndex 超出范围: {segment_index}",
|
||||
)
|
||||
|
||||
segment_info = segments[segment_index]
|
||||
_, _, segment_text, line_index, chunk_index = segment_contexts[segment_index]
|
||||
segment_detail = SegmentDetail(
|
||||
idx=segment_info.idx,
|
||||
text=segment_text,
|
||||
hasAnnotation=segment_info.has_annotation,
|
||||
lineIndex=line_index,
|
||||
chunkIndex=chunk_index,
|
||||
)
|
||||
|
||||
return EditorTaskSegmentResponse(
|
||||
segmented=True,
|
||||
segment=segment_detail,
|
||||
totalSegments=total_segments,
|
||||
currentSegmentIndex=segment_index,
|
||||
)
|
||||
|
||||
async def _build_text_task(
|
||||
self,
|
||||
project: LabelingProject,
|
||||
@@ -723,7 +887,8 @@ class AnnotationEditorService:
|
||||
needs_segmentation = segmentation_enabled and (
|
||||
len(records) > 1 or any(len(text or "") > self.SEGMENT_THRESHOLD for text in record_texts)
|
||||
)
|
||||
segments: Optional[List[SegmentInfo]] = None
|
||||
segments: List[SegmentInfo] = []
|
||||
segment_contexts: List[Tuple[Optional[Dict[str, Any]], str, str, int, int]] = []
|
||||
current_segment_index = 0
|
||||
display_text = record_texts[0] if record_texts else text_content
|
||||
selected_payload = records[0][0] if records else None
|
||||
@@ -732,46 +897,13 @@ class AnnotationEditorService:
|
||||
display_text = "\n".join(record_texts) if record_texts else text_content
|
||||
|
||||
if needs_segmentation:
|
||||
splitter = AnnotationTextSplitter(max_chars=self.SEGMENT_THRESHOLD)
|
||||
segment_contexts: List[Tuple[Optional[Dict[str, Any]], str, str, int, int]] = []
|
||||
segments = []
|
||||
segment_cursor = 0
|
||||
|
||||
for record_index, ((payload, raw_text), record_text) in enumerate(zip(records, record_texts)):
|
||||
normalized_text = record_text or ""
|
||||
if len(normalized_text) > self.SEGMENT_THRESHOLD:
|
||||
raw_segments = splitter.split(normalized_text)
|
||||
for chunk_index, seg in enumerate(raw_segments):
|
||||
segments.append(SegmentInfo(
|
||||
idx=segment_cursor,
|
||||
text=seg["text"],
|
||||
start=seg["start"],
|
||||
end=seg["end"],
|
||||
hasAnnotation=str(segment_cursor) in segment_annotation_keys,
|
||||
lineIndex=record_index,
|
||||
chunkIndex=chunk_index,
|
||||
))
|
||||
segment_contexts.append((payload, raw_text, seg["text"], record_index, chunk_index))
|
||||
segment_cursor += 1
|
||||
else:
|
||||
segments.append(SegmentInfo(
|
||||
idx=segment_cursor,
|
||||
text=normalized_text,
|
||||
start=0,
|
||||
end=len(normalized_text),
|
||||
hasAnnotation=str(segment_cursor) in segment_annotation_keys,
|
||||
lineIndex=record_index,
|
||||
chunkIndex=0,
|
||||
))
|
||||
segment_contexts.append((payload, raw_text, normalized_text, record_index, 0))
|
||||
segment_cursor += 1
|
||||
|
||||
if not segments:
|
||||
segments = [SegmentInfo(idx=0, text="", start=0, end=0, hasAnnotation=False, lineIndex=0, chunkIndex=0)]
|
||||
segment_contexts = [(None, "", "", 0, 0)]
|
||||
|
||||
_, segment_contexts = self._build_segment_contexts(
|
||||
records,
|
||||
record_texts,
|
||||
segment_annotation_keys,
|
||||
)
|
||||
current_segment_index = segment_index if segment_index is not None else 0
|
||||
if current_segment_index < 0 or current_segment_index >= len(segments):
|
||||
if current_segment_index < 0 or current_segment_index >= len(segment_contexts):
|
||||
current_segment_index = 0
|
||||
|
||||
selected_payload, _, display_text, _, _ = segment_contexts[current_segment_index]
|
||||
@@ -849,8 +981,7 @@ class AnnotationEditorService:
|
||||
task=task,
|
||||
annotationUpdatedAt=annotation_updated_at,
|
||||
segmented=needs_segmentation,
|
||||
segments=segments,
|
||||
totalSegments=len(segments) if segments else 1,
|
||||
totalSegments=len(segment_contexts) if needs_segmentation else 1,
|
||||
currentSegmentIndex=current_segment_index,
|
||||
)
|
||||
|
||||
@@ -1185,3 +1316,195 @@ class AnnotationEditorService:
|
||||
except Exception as exc:
|
||||
logger.warning("标注同步知识管理失败:%s", exc)
|
||||
|
||||
async def precompute_segmentation_for_project(
|
||||
self,
|
||||
project_id: str,
|
||||
max_retries: int = 3
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
为指定项目的所有文本文件预计算切片结构并持久化到数据库
|
||||
|
||||
Args:
|
||||
project_id: 标注项目ID
|
||||
max_retries: 失败重试次数
|
||||
|
||||
Returns:
|
||||
统计信息:{total_files, succeeded, failed}
|
||||
"""
|
||||
project = await self._get_project_or_404(project_id)
|
||||
dataset_type = self._normalize_dataset_type(await self._get_dataset_type(project.dataset_id))
|
||||
|
||||
# 只处理文本数据集
|
||||
if dataset_type != DATASET_TYPE_TEXT:
|
||||
logger.info(f"项目 {project_id} 不是文本数据集,跳过切片预生成")
|
||||
return {"total_files": 0, "succeeded": 0, "failed": 0}
|
||||
|
||||
# 检查是否启用分段
|
||||
if not self._resolve_segmentation_enabled(project):
|
||||
logger.info(f"项目 {project_id} 未启用分段,跳过切片预生成")
|
||||
return {"total_files": 0, "succeeded": 0, "failed": 0}
|
||||
|
||||
# 获取项目的所有文本文件(排除源文档)
|
||||
files_result = await self.db.execute(
|
||||
select(DatasetFiles)
|
||||
.join(LabelingProjectFile, LabelingProjectFile.file_id == DatasetFiles.id)
|
||||
.where(
|
||||
LabelingProjectFile.project_id == project_id,
|
||||
DatasetFiles.dataset_id == project.dataset_id,
|
||||
)
|
||||
)
|
||||
file_records = files_result.scalars().all()
|
||||
|
||||
if not file_records:
|
||||
logger.info(f"项目 {project_id} 没有文件,跳过切片预生成")
|
||||
return {"total_files": 0, "succeeded": 0, "failed": 0}
|
||||
|
||||
# 过滤源文档文件
|
||||
valid_files = []
|
||||
for file_record in file_records:
|
||||
file_type = str(getattr(file_record, "file_type", "") or "").lower()
|
||||
file_name = str(getattr(file_record, "file_name", "")).lower()
|
||||
is_source_document = (
|
||||
file_type in SOURCE_DOCUMENT_TYPES or
|
||||
any(file_name.endswith(ext) for ext in SOURCE_DOCUMENT_EXTENSIONS)
|
||||
)
|
||||
if not is_source_document:
|
||||
valid_files.append(file_record)
|
||||
|
||||
total_files = len(valid_files)
|
||||
succeeded = 0
|
||||
failed = 0
|
||||
|
||||
label_config = await self._resolve_project_label_config(project)
|
||||
primary_text_key = self._resolve_primary_text_key(label_config)
|
||||
|
||||
for file_record in valid_files:
|
||||
file_id = str(file_record.id) # type: ignore
|
||||
file_name = str(getattr(file_record, "file_name", ""))
|
||||
|
||||
for retry in range(max_retries):
|
||||
try:
|
||||
# 读取文本内容
|
||||
text_content = await self._fetch_text_content_via_download_api(project.dataset_id, file_id)
|
||||
if not isinstance(text_content, str):
|
||||
logger.warning(f"文件 {file_id} 内容不是字符串,跳过切片")
|
||||
failed += 1
|
||||
break
|
||||
|
||||
# 解析文本记录
|
||||
records: List[Tuple[Optional[Dict[str, Any]], str]] = []
|
||||
if file_name.lower().endswith(JSONL_EXTENSION):
|
||||
records = self._parse_jsonl_records(text_content)
|
||||
else:
|
||||
parsed_payload = self._try_parse_json_payload(text_content)
|
||||
if parsed_payload:
|
||||
records = [(parsed_payload, text_content)]
|
||||
|
||||
if not records:
|
||||
records = [(None, text_content)]
|
||||
|
||||
record_texts = [
|
||||
self._resolve_primary_text_value(payload, raw_text, primary_text_key)
|
||||
for payload, raw_text in records
|
||||
]
|
||||
if not record_texts:
|
||||
record_texts = [text_content]
|
||||
|
||||
# 判断是否需要分段
|
||||
needs_segmentation = len(records) > 1 or any(
|
||||
len(text or "") > self.SEGMENT_THRESHOLD for text in record_texts
|
||||
)
|
||||
|
||||
if not needs_segmentation:
|
||||
# 不需要分段的文件,跳过
|
||||
succeeded += 1
|
||||
break
|
||||
|
||||
# 执行切片
|
||||
splitter = AnnotationTextSplitter(max_chars=self.SEGMENT_THRESHOLD)
|
||||
segment_cursor = 0
|
||||
segments = {}
|
||||
|
||||
for record_index, ((payload, raw_text), record_text) in enumerate(zip(records, record_texts)):
|
||||
normalized_text = record_text or ""
|
||||
|
||||
if len(normalized_text) > self.SEGMENT_THRESHOLD:
|
||||
raw_segments = splitter.split(normalized_text)
|
||||
for chunk_index, seg in enumerate(raw_segments):
|
||||
segments[str(segment_cursor)] = {
|
||||
SEGMENT_RESULT_KEY: [],
|
||||
SEGMENT_CREATED_AT_KEY: datetime.utcnow().isoformat() + "Z",
|
||||
SEGMENT_UPDATED_AT_KEY: datetime.utcnow().isoformat() + "Z",
|
||||
}
|
||||
segment_cursor += 1
|
||||
else:
|
||||
segments[str(segment_cursor)] = {
|
||||
SEGMENT_RESULT_KEY: [],
|
||||
SEGMENT_CREATED_AT_KEY: datetime.utcnow().isoformat() + "Z",
|
||||
SEGMENT_UPDATED_AT_KEY: datetime.utcnow().isoformat() + "Z",
|
||||
}
|
||||
segment_cursor += 1
|
||||
|
||||
if not segments:
|
||||
succeeded += 1
|
||||
break
|
||||
|
||||
# 构造分段标注结构
|
||||
final_payload = {
|
||||
SEGMENTED_KEY: True,
|
||||
"version": 1,
|
||||
SEGMENTS_KEY: segments,
|
||||
SEGMENT_TOTAL_KEY: segment_cursor,
|
||||
}
|
||||
|
||||
# 检查是否已存在标注
|
||||
existing_result = await self.db.execute(
|
||||
select(AnnotationResult).where(
|
||||
AnnotationResult.project_id == project_id,
|
||||
AnnotationResult.file_id == file_id,
|
||||
)
|
||||
)
|
||||
existing = existing_result.scalar_one_or_none()
|
||||
|
||||
now = datetime.utcnow()
|
||||
|
||||
if existing:
|
||||
# 更新现有标注
|
||||
existing.annotation = final_payload # type: ignore[assignment]
|
||||
existing.annotation_status = ANNOTATION_STATUS_IN_PROGRESS # type: ignore[assignment]
|
||||
existing.updated_at = now # type: ignore[assignment]
|
||||
else:
|
||||
# 创建新标注记录
|
||||
record = AnnotationResult(
|
||||
id=str(uuid.uuid4()),
|
||||
project_id=project_id,
|
||||
file_id=file_id,
|
||||
annotation=final_payload,
|
||||
annotation_status=ANNOTATION_STATUS_IN_PROGRESS,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
self.db.add(record)
|
||||
|
||||
await self.db.commit()
|
||||
succeeded += 1
|
||||
logger.info(f"成功为文件 {file_id} 预生成 {segment_cursor} 个切片")
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"为文件 {file_id} 预生成切片失败 (重试 {retry + 1}/{max_retries}): {e}"
|
||||
)
|
||||
if retry == max_retries - 1:
|
||||
failed += 1
|
||||
await self.db.rollback()
|
||||
|
||||
logger.info(
|
||||
f"项目 {project_id} 切片预生成完成: 总计 {total_files}, 成功 {succeeded}, 失败 {failed}"
|
||||
)
|
||||
return {
|
||||
"total_files": total_files,
|
||||
"succeeded": succeeded,
|
||||
"failed": failed,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user