Merge branch 'main' of github.com:ModelEngine-Group/DataMate

This commit is contained in:
o0Shark0o
2025-12-10 09:56:57 +08:00
17 changed files with 362 additions and 51 deletions

View File

@@ -37,6 +37,7 @@
<lombok-mapstruct-binding.version>0.2.0</lombok-mapstruct-binding.version>
<poi.version>5.4.0</poi.version>
<log4j2.version>2.21.1</log4j2.version>
<commons-compress.version>1.26.1</commons-compress.version>
</properties>
<modules>
@@ -151,6 +152,12 @@
<version>2.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>${commons-compress.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@@ -5,8 +5,10 @@ import com.datamate.common.domain.model.ChunkUploadPreRequest;
import com.datamate.common.domain.model.FileUploadResult;
import com.datamate.common.domain.service.FileService;
import com.datamate.common.domain.utils.AnalyzerUtils;
import com.datamate.common.domain.utils.ArchiveAnalyzer;
import com.datamate.common.infrastructure.exception.BusinessAssert;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.CommonErrorCode;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.datamate.common.interfaces.PagedResponse;
import com.datamate.common.interfaces.PagingQuery;
@@ -213,6 +215,9 @@ public class DatasetFileApplicationService {
*/
@Transactional
public String preUpload(UploadFilesPreRequest chunkUploadRequest, String datasetId) {
if (Objects.isNull(datasetRepository.getById(datasetId))) {
throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND);
}
ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build();
request.setUploadPath(datasetBasePath + File.separator + datasetId);
request.setTotalFileNum(chunkUploadRequest.getTotalFileNum());
@@ -225,7 +230,7 @@ public class DatasetFileApplicationService {
String checkInfoJson = objectMapper.writeValueAsString(checkInfo);
request.setCheckInfo(checkInfoJson);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Failed to serialize checkInfo to JSON", e);
log.warn("Failed to serialize checkInfo to JSON", e);
}
return fileService.preUpload(request);
}
@@ -238,31 +243,54 @@ public class DatasetFileApplicationService {
@Transactional
public void chunkUpload(String datasetId, UploadFileRequest uploadFileRequest) {
FileUploadResult uploadResult = fileService.chunkUpload(DatasetConverter.INSTANCE.toChunkUploadRequest(uploadFileRequest));
saveFileInfoToDb(uploadResult, uploadFileRequest, datasetId);
saveFileInfoToDb(uploadResult, datasetId);
}
private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileRequest uploadFile, String datasetId) {
private void saveFileInfoToDb(FileUploadResult fileUploadResult, String datasetId) {
if (Objects.isNull(fileUploadResult.getSavedFile())) {
// 文件切片上传没有完成
return;
}
DatasetFileUploadCheckInfo checkInfo;
try {
ObjectMapper objectMapper = new ObjectMapper();
checkInfo = objectMapper.readValue(fileUploadResult.getCheckInfo(), DatasetFileUploadCheckInfo.class);
if (!Objects.equals(checkInfo.getDatasetId(), datasetId)) {
throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND);
}
} catch (IllegalArgumentException | JsonProcessingException e) {
log.warn("Failed to convert checkInfo to DatasetFileUploadCheckInfo", e);
throw BusinessException.of(CommonErrorCode.PRE_UPLOAD_REQUEST_NOT_EXIST);
}
List<FileUploadResult> files;
if (checkInfo.isHasArchive() && AnalyzerUtils.isPackage(fileUploadResult.getSavedFile().getPath())) {
files = ArchiveAnalyzer.process(fileUploadResult);
} else {
files = Collections.singletonList(fileUploadResult);
}
addFileToDataset(datasetId, files);
}
private void addFileToDataset(String datasetId, List<FileUploadResult> unpacked) {
Dataset dataset = datasetRepository.getById(datasetId);
File savedFile = fileUploadResult.getSavedFile();
LocalDateTime currentTime = LocalDateTime.now();
DatasetFile datasetFile = DatasetFile.builder()
dataset.setFiles(datasetFileRepository.findAllByDatasetId(datasetId));
for (FileUploadResult file : unpacked) {
File savedFile = file.getSavedFile();
LocalDateTime currentTime = LocalDateTime.now();
DatasetFile datasetFile = DatasetFile.builder()
.id(UUID.randomUUID().toString())
.datasetId(datasetId)
.fileSize(savedFile.length())
.uploadTime(currentTime)
.lastAccessTime(currentTime)
.fileName(uploadFile.getFileName())
.fileName(file.getFileName())
.filePath(savedFile.getPath())
.fileType(AnalyzerUtils.getExtension(uploadFile.getFileName()))
.fileType(AnalyzerUtils.getExtension(file.getFileName()))
.build();
dataset.setFiles(datasetFileRepository.findAllByDatasetId(datasetId));
setDatasetFileId(datasetFile, dataset);
datasetFileRepository.saveOrUpdate(datasetFile);
dataset.addFile(datasetFile);
setDatasetFileId(datasetFile, dataset);
datasetFileRepository.saveOrUpdate(datasetFile);
dataset.addFile(datasetFile);
}
dataset.active();
datasetRepository.updateById(dataset);
}

View File

@@ -1,7 +1,8 @@
package com.datamate.datamanagement.domain.model.dataset;
import com.datamate.common.domain.model.UploadCheckInfo;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/**
@@ -9,7 +10,9 @@ import lombok.Setter;
*/
@Getter
@Setter
public class DatasetFileUploadCheckInfo extends UploadCheckInfo {
@NoArgsConstructor
@AllArgsConstructor
public class DatasetFileUploadCheckInfo {
/** 数据集id */
private String datasetId;

View File

@@ -48,7 +48,6 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.26.1</version>
</dependency>
<!-- 核心服务依赖 -->

View File

@@ -42,5 +42,9 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -1,7 +0,0 @@
package com.datamate.common.domain.model;
/**
* 上传检查信息基类
*/
public abstract class UploadCheckInfo {
}

View File

@@ -4,6 +4,8 @@ import com.datamate.common.domain.model.ChunkUploadPreRequest;
import com.datamate.common.domain.model.ChunkUploadRequest;
import com.datamate.common.domain.model.FileUploadResult;
import com.datamate.common.domain.utils.ChunksSaver;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.CommonErrorCode;
import com.datamate.common.infrastructure.mapper.ChunkUploadRequestMapper;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@@ -46,7 +48,7 @@ public class FileService {
uploadFileRequest.setFileSize(uploadFileRequest.getFile().getSize());
ChunkUploadPreRequest preRequest = chunkUploadRequestMapper.findById(uploadFileRequest.getReqId());
if (preRequest == null || preRequest.isUploadComplete() || preRequest.isRequestTimeout()) {
throw new IllegalArgumentException("预上传请求不存在");
throw BusinessException.of(CommonErrorCode.PRE_UPLOAD_REQUEST_NOT_EXIST);
}
File savedFile;
if (uploadFileRequest.getTotalChunkNum() > 1) {
@@ -55,7 +57,7 @@ public class FileService {
savedFile = uploadFile(uploadFileRequest, preRequest);
}
if (chunkUploadRequestMapper.update(preRequest) == 0) {
throw new IllegalArgumentException("预上传请求不存在");
throw BusinessException.of(CommonErrorCode.PRE_UPLOAD_REQUEST_NOT_EXIST);
}
boolean isFinish = Objects.equals(preRequest.getUploadedFileNum(), preRequest.getTotalFileNum());
if (isFinish) {

View File

@@ -37,4 +37,15 @@ public class AnalyzerUtils {
}
return filename.substring(firstDotIndex + 1).toLowerCase(Locale.ROOT);
}
/**
* 判断是否为压缩包
*
* @param filePath 文件路径
* @return 返回信息
*/
public static boolean isPackage(String filePath) {
String extension = getExtension(filePath);
return extension.toLowerCase(Locale.ROOT).equals(TYPE_ZIP) || extension.toLowerCase(Locale.ROOT).equals(TYPE_TAR_GZ);
}
}

View File

@@ -0,0 +1,192 @@
package com.datamate.common.domain.utils;
import com.datamate.common.domain.model.FileUploadResult;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
/**
* Responsible for validating and unpacking archive files.
*
* @since 2023-11-17
*/
@Slf4j
public class ArchiveAnalyzer {
private static final int DEFAULT_BUFFER_SIZE = 4096;
/**
* Process list.
*
* @param fileDto The uploaded file DTO
* @return the list
*/
public static List<FileUploadResult> process(FileUploadResult fileDto) {
log.info("Start unpacking [{}]", fileDto.getFileName());
File file = fileDto.getSavedFile();
Path archivePath;
try {
archivePath = Paths.get(file.getCanonicalPath());
} catch (IOException e) {
log.error("Failed to get the archive file path.");
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Failed to get the archive file path.");
}
List<FileUploadResult> unpacked;
String extension = AnalyzerUtils.getExtension(fileDto.getFileName());
if (AnalyzerUtils.TYPE_ZIP.equalsIgnoreCase(extension)) {
log.info("ZIP unpacking [{}]", fileDto.getFileName());
unpacked = processZip(archivePath);
log.info("ZIP unpacking FINISHED [{}]", fileDto.getFileName());
} else if (AnalyzerUtils.TYPE_TAR_GZ.equalsIgnoreCase(extension)) {
unpacked = processTarGz(archivePath);
} else {
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Unrecognized archive format.");
}
if (!archivePath.toFile().delete()) {
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Unable to delete the archive file " + archivePath.toAbsolutePath());
}
log.info("Finished unpacking [{}]", fileDto.getFileName());
return unpacked;
}
private static List<FileUploadResult> processZip(Path archivePath) {
try (ArchiveInputStream<ZipArchiveEntry> inputStream = new ZipArchiveInputStream(
new BufferedInputStream(Files.newInputStream(archivePath)))) {
return unpackArchive(inputStream, archivePath);
} catch (IOException e) {
log.error("Failed to unpack zip archive:", e);
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Failed to unpack zip archive.");
}
}
private static List<FileUploadResult> processTarGz(Path archivePath) {
try (ArchiveInputStream<TarArchiveEntry> inputStream = new TarArchiveInputStream(
new GzipCompressorInputStream(new BufferedInputStream(Files.newInputStream(archivePath))),
StandardCharsets.UTF_8.toString())) {
return unpackArchive(inputStream, archivePath);
} catch (IOException e) {
log.error("Failed to unpack tar.gz archive:", e);
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Failed to unpack tar.gz archive.");
}
}
private static List<FileUploadResult> unpackArchive(ArchiveInputStream<?> archiveInputStream, Path archivePath) throws IOException {
List<FileUploadResult> unpacked = new ArrayList<>();
long unpackedSize = 0L;
try {
ArchiveEntry archiveEntry;
int entryCount = 0;
while ((archiveEntry = archiveInputStream.getNextEntry()) != null) {
if (isSymlink(archiveEntry)) {
// 解压时跳过symlink文件
continue;
}
entryCount++;
if (checkUnpackSizeAndFileSize(entryCount, unpacked) || checkVersionSize(unpackedSize, archiveEntry.getSize())) {
break;
}
if (!archiveEntry.isDirectory()) {
unpackedSize = addFileAndCountFileSize(archiveInputStream, archiveEntry, unpacked,
unpackedSize, archivePath);
}
}
} catch (IOException e) {
unpacked.forEach(v -> deleteFile(v.getSavedFile()));
throw e;
}
return unpacked;
}
private static boolean checkVersionSize(long unpackedSize, long currFileSize) {
return false;
}
private static long addFileAndCountFileSize(ArchiveInputStream<?> archiveInputStream, ArchiveEntry archiveEntry,
List<FileUploadResult> unpacked, long unpackedSize, Path archivePath) throws IOException {
Optional<FileUploadResult> uploadFileDto = extractEntity(archiveInputStream, archiveEntry, archivePath);
long newSize = unpackedSize;
if (uploadFileDto.isPresent()) {
FileUploadResult dto = uploadFileDto.get();
unpacked.add(dto);
newSize += dto.getSavedFile().length();
}
return newSize;
}
private static boolean checkUnpackSizeAndFileSize(int entryCount, List<FileUploadResult> unpacked) {
return false;
}
private static Optional<FileUploadResult> extractEntity(ArchiveInputStream<?> archiveInputStream, ArchiveEntry archiveEntry, Path archivePath)
throws IOException {
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
Path path = Paths.get(archivePath.getParent().toString(), archiveEntry.getName());
File file = path.toFile();
long fileSize = 0L;
String extension = AnalyzerUtils.getExtension(archiveEntry.getName());
long supportFileSize = 1024*1024*1024; // 上传大小暂定为1个G
try (OutputStream outputStream = new BufferedOutputStream(Files.newOutputStream(file.toPath()))) {
int byteRead;
while ((byteRead = archiveInputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, byteRead);
fileSize += byteRead;
if (fileSize > supportFileSize) {
break;
}
}
} catch (IOException e) {
log.error("error happened while write entry to file system");
file.delete();
throw e;
}
if (fileSize > supportFileSize) {
// 文件大小超过限制,删除
log.info("file {} size exceeds limit", archiveEntry.getName());
file.delete();
return Optional.empty();
}
return Optional.of(FileUploadResult.builder().savedFile(file).fileName(CommonUtils.trimFilePath(archiveEntry.getName())).build());
}
private static void deleteFile(File file) {
Path fileToDeletePath = Paths.get(file.getPath());
if (Files.exists(fileToDeletePath)) {
try {
Files.delete(fileToDeletePath);
} catch (IOException e1) {
log.error("Failed to delete file.", e1);
}
}
}
private static boolean isSymlink(ArchiveEntry archiveEntry) {
if (archiveEntry instanceof TarArchiveEntry) {
return ((TarArchiveEntry) archiveEntry).isSymbolicLink();
}
return false;
}
}

View File

@@ -0,0 +1,17 @@
package com.datamate.common.infrastructure.exception;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* CommonErrorCode
*
* @since 2025/12/5
*/
@Getter
@AllArgsConstructor
public enum CommonErrorCode implements ErrorCode{
PRE_UPLOAD_REQUEST_NOT_EXIST("common.0101", "预上传请求不存在");
private final String code;
private final String message;
}

View File

@@ -23,6 +23,13 @@ data:
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
location /api/evaluation/ {
proxy_pass http://datamate-backend-python:18000/api/evaluation/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
location /api/annotation/ {
proxy_pass http://datamate-backend-python:18000/api/annotation/;
proxy_set_header Host $host;
@@ -91,6 +98,13 @@ data:
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
location /api/evaluation/ {
proxy_pass http://datamate-backend-python:18000/api/evaluation/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
location /api/annotation/ {
proxy_pass http://datamate-backend-python:18000/api/annotation/;
proxy_set_header Host $host;
@@ -110,4 +124,4 @@ data:
try_files $uri $uri/ /index.html;
}
}
{{- end }}
{{- end }}

View File

@@ -95,6 +95,24 @@ backend:
- name: operator-volume
mountPath: /operators
backend-python:
env:
- name: DB_PASSWORD
value: *dbPass
volumes:
- *datasetVolume
- *flowVolume
- *logVolume
volumeMounts:
- name: dataset-volume
mountPath: /dataset
- name: flow-volume
mountPath: /flow
- name: log-volume
mountPath: /var/log/datamate
- name: operator-volume
mountPath: /operators
frontend:
service:
type: NodePort

View File

@@ -31,6 +31,7 @@ export function useFileSliceUpload(
controller,
size: 0,
updateEvent: detail.updateEvent,
hasArchive: detail.hasArchive,
};
taskListRef.current = [task, ...taskListRef.current];
@@ -112,6 +113,7 @@ export function useFileSliceUpload(
totalFileNum: files.length,
totalSize,
datasetId: task.key,
hasArchive: task.hasArchive,
});
const newTask: TaskItem = {

View File

@@ -222,7 +222,7 @@ export default function DatasetDetail() {
<Tabs activeKey={activeTab} items={tabList} onChange={setActiveTab} />
<div className="h-full overflow-auto">
{activeTab === "overview" && (
<Overview dataset={dataset} filesOperation={filesOperation} />
<Overview dataset={dataset} filesOperation={filesOperation} fetchDataset={fetchDataset}/>
)}
{activeTab === "lineage" && <DataLineageFlow dataset={dataset} />}
{activeTab === "quality" && <DataQuality />}

View File

@@ -1,4 +1,4 @@
import { Select, Input, Form, Radio, Modal, Button, UploadFile } from "antd";
import { Select, Input, Form, Radio, Modal, Button, UploadFile, Switch } from "antd";
import { InboxOutlined } from "@ant-design/icons";
import { dataSourceOptions } from "../../dataset.const";
import { Dataset, DataSource } from "../../dataset.model";
@@ -51,6 +51,7 @@ export default function ImportConfiguration({
dataset,
files: fileSliceList,
updateEvent,
hasArchive: importConfig.hasArchive,
},
})
);
@@ -195,29 +196,39 @@ export default function ImportConfiguration({
{/* Local Upload Component */}
{importConfig?.source === DataSource.UPLOAD && (
<Form.Item
label="上传文件"
name="files"
rules={[
{
required: true,
message: "请上传文件",
},
]}
>
<Dragger
className="w-full"
onRemove={handleRemoveFile}
beforeUpload={handleBeforeUpload}
multiple
<>
<Form.Item
label="自动解压上传的压缩包"
name="hasArchive"
valuePropName="checked"
initialValue={true}
>
<p className="ant-upload-drag-icon">
<InboxOutlined />
</p>
<p className="ant-upload-text"></p>
<p className="ant-upload-hint"></p>
</Dragger>
</Form.Item>
<Switch />
</Form.Item>
<Form.Item
label="上传文件"
name="files"
rules={[
{
required: true,
message: "请上传文件",
},
]}
>
<Dragger
className="w-full"
onRemove={handleRemoveFile}
beforeUpload={handleBeforeUpload}
multiple
>
<p className="ant-upload-drag-icon">
<InboxOutlined />
</p>
<p className="ant-upload-text"></p>
<p className="ant-upload-hint"></p>
</Dragger>
</Form.Item>
</>
)}
{/* Target Configuration */}

View File

@@ -3,7 +3,7 @@ import { formatBytes, formatDateTime } from "@/utils/unit";
import { Download, Trash2 } from "lucide-react";
import { datasetTypeMap } from "../../dataset.const";
export default function Overview({ dataset, filesOperation }) {
export default function Overview({ dataset, filesOperation, fetchDataset }) {
const {
fileList,
pagination,
@@ -73,6 +73,11 @@ export default function Overview({ dataset, filesOperation }) {
label: "存储路径",
children: dataset.targetLocation || "未知",
},
{
key: "pvcName",
label: "存储名称",
children: dataset.pvcName || "未知",
},
{
key: "createdAt",
label: "创建时间",
@@ -129,7 +134,11 @@ export default function Overview({ dataset, filesOperation }) {
<Button
size="small"
type="link"
onClick={() => handleDeleteFile(record)}
onClick={async () => {
await handleDeleteFile(record);
fetchDataset()
}
}
>
</Button>

View File

@@ -101,4 +101,5 @@ export interface TaskItem {
cancelFn?: () => void;
updateEvent?: string;
size?: number;
hasArchive?: boolean;
}