feature: 北向接口:支持通过模板创建清洗任务 (#111)

feature: 北向接口:支持通过模板创建清洗任务
This commit is contained in:
hhhhsc701
2025-11-26 17:30:54 +08:00
committed by GitHub
parent 0ca5f29885
commit 91390cace0
15 changed files with 312 additions and 253 deletions

View File

@@ -209,7 +209,7 @@ endif
.PHONY: uninstall .PHONY: uninstall
uninstall: uninstall:
ifeq ($(origin INSTALLER), undefined) ifeq ($(origin INSTALLER), undefined)
$(call prompt-uninstaller,milvus-$$INSTALLER-uninstall label-studio-$$INSTALLER-uninstall datamate-$$INSTALLER-uninstall) $(call prompt-uninstaller,milvus-$$INSTALLER-uninstall datamate-$$INSTALLER-uninstall)
else else
@echo "Delete volumes? (This will remove all data)"; \ @echo "Delete volumes? (This will remove all data)"; \
echo "1. Yes - Delete volumes"; \ echo "1. Yes - Delete volumes"; \

View File

@@ -248,4 +248,8 @@ public class CleaningTaskService {
public void stopTask(String taskId) { public void stopTask(String taskId) {
taskScheduler.stopTask(taskId); taskScheduler.stopTask(taskId);
} }
public List<OperatorInstanceDto> getInstanceByTemplateId(String templateId) {
return operatorInstanceRepo.findInstanceByInstanceId(templateId);
}
} }

View File

@@ -3,6 +3,7 @@ package com.datamate.cleaning.application;
import com.datamate.cleaning.domain.repository.CleaningTemplateRepository; import com.datamate.cleaning.domain.repository.CleaningTemplateRepository;
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository; import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
import com.datamate.cleaning.infrastructure.validator.CleanTaskValidator;
import com.datamate.cleaning.interfaces.dto.*; import com.datamate.cleaning.interfaces.dto.*;
import com.datamate.cleaning.domain.model.entity.TemplateWithInstance; import com.datamate.cleaning.domain.model.entity.TemplateWithInstance;
import com.datamate.operator.domain.repository.OperatorViewRepository; import com.datamate.operator.domain.repository.OperatorViewRepository;
@@ -28,6 +29,8 @@ public class CleaningTemplateService {
private final OperatorViewRepository operatorViewRepo; private final OperatorViewRepository operatorViewRepo;
private final CleanTaskValidator cleanTaskValidator;
public List<CleaningTemplateDto> getTemplates(String keywords) { public List<CleaningTemplateDto> getTemplates(String keywords) {
List<OperatorDto> allOperators = List<OperatorDto> allOperators =
operatorViewRepo.findOperatorsByCriteria(null, null, null, null, null); operatorViewRepo.findOperatorsByCriteria(null, null, null, null, null);
@@ -59,6 +62,7 @@ public class CleaningTemplateService {
@Transactional @Transactional
public CleaningTemplateDto createTemplate(CreateCleaningTemplateRequest request) { public CleaningTemplateDto createTemplate(CreateCleaningTemplateRequest request) {
cleanTaskValidator.checkInputAndOutput(request.getInstance());
CleaningTemplateDto template = new CleaningTemplateDto(); CleaningTemplateDto template = new CleaningTemplateDto();
String templateId = UUID.randomUUID().toString(); String templateId = UUID.randomUUID().toString();
template.setId(templateId); template.setId(templateId);

View File

@@ -13,4 +13,6 @@ public interface OperatorInstanceRepository extends IRepository<OperatorInstance
void deleteByInstanceId(String instanceId); void deleteByInstanceId(String instanceId);
List<OperatorDto> findOperatorByInstanceId(String instanceId); List<OperatorDto> findOperatorByInstanceId(String instanceId);
List<OperatorInstanceDto> findInstanceByInstanceId(String instanceId);
} }

View File

@@ -8,6 +8,7 @@ import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.datamate.operator.domain.model.OperatorView; import com.datamate.operator.domain.model.OperatorView;
import com.datamate.operator.interfaces.dto.OperatorDto; import com.datamate.operator.interfaces.dto.OperatorDto;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.mapstruct.Mapper; import org.mapstruct.Mapper;
import org.mapstruct.Mapping; import org.mapstruct.Mapping;
@@ -23,15 +24,34 @@ import java.util.Map;
public interface OperatorInstanceConverter { public interface OperatorInstanceConverter {
OperatorInstanceConverter INSTANCE = Mappers.getMapper(OperatorInstanceConverter.class); OperatorInstanceConverter INSTANCE = Mappers.getMapper(OperatorInstanceConverter.class);
ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Mapping(target = "settingsOverride", source = "overrides", qualifiedByName = "mapToString") @Mapping(target = "settingsOverride", source = "overrides", qualifiedByName = "mapToString")
@Mapping(target = "operatorId", source = "id") @Mapping(target = "operatorId", source = "id")
OperatorInstance fromDtoToEntity(OperatorInstanceDto instance); OperatorInstance fromDtoToEntity(OperatorInstanceDto instance);
@Mapping(target = "overrides", source = "settingsOverride", qualifiedByName = "stringToMap")
@Mapping(target = "id", source = "operatorId")
OperatorInstanceDto fromEntityToDto(OperatorInstance instance);
List<OperatorInstanceDto> fromEntityToDtoList(List<OperatorInstance> instance);
@Named("mapToString") @Named("mapToString")
static String mapToString(Map<String, Object> objects) { static String mapToString(Map<String, Object> objects) {
ObjectMapper objectMapper = new ObjectMapper();
try { try {
return objectMapper.writeValueAsString(objects); return OBJECT_MAPPER.writeValueAsString(objects);
} catch (JsonProcessingException e) {
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR);
}
}
@Named("stringToMap")
static Map<String, Object> stringToMap(String json) {
if (json == null) {
return Collections.emptyMap();
}
try {
return OBJECT_MAPPER.readValue(json, new TypeReference<>() {});
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR); throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR);
} }

View File

@@ -42,4 +42,12 @@ public class OperatorInstanceRepositoryImpl extends CrudRepository<OperatorInsta
public List<OperatorDto> findOperatorByInstanceId(String instanceId) { public List<OperatorDto> findOperatorByInstanceId(String instanceId) {
return OperatorInstanceConverter.INSTANCE.fromEntityToDto(mapper.findOperatorByInstanceId(instanceId)); return OperatorInstanceConverter.INSTANCE.fromEntityToDto(mapper.findOperatorByInstanceId(instanceId));
} }
@Override
public List<OperatorInstanceDto> findInstanceByInstanceId(String instanceId) {
LambdaQueryWrapper<OperatorInstance> lambdaWrapper = new LambdaQueryWrapper<>();
lambdaWrapper.eq(OperatorInstance::getInstanceId, instanceId)
.orderByAsc(OperatorInstance::getOpIndex);
return OperatorInstanceConverter.INSTANCE.fromEntityToDtoList(mapper.selectList(lambdaWrapper));
}
} }

View File

@@ -27,6 +27,8 @@ public class CreateCleaningTaskRequest {
private String destDatasetType; private String destDatasetType;
private String templateId;
private List<OperatorInstanceDto> instance = new ArrayList<>(); private List<OperatorInstanceDto> instance = new ArrayList<>();
} }

View File

@@ -1,12 +1,10 @@
package com.datamate.cleaning.interfaces.rest; package com.datamate.cleaning.interfaces.rest;
import com.datamate.cleaning.application.CleaningTaskService; import com.datamate.cleaning.application.CleaningTaskService;
import com.datamate.cleaning.interfaces.dto.CleaningResultDto; import com.datamate.cleaning.interfaces.dto.*;
import com.datamate.cleaning.interfaces.dto.CleaningTaskDto;
import com.datamate.cleaning.interfaces.dto.CleaningTaskLog;
import com.datamate.cleaning.interfaces.dto.CreateCleaningTaskRequest;
import com.datamate.common.interfaces.PagedResponse; import com.datamate.common.interfaces.PagedResponse;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.util.List; import java.util.List;
@@ -31,6 +29,9 @@ public class CleaningTaskController {
@PostMapping @PostMapping
public CleaningTaskDto cleaningTasksPost(@RequestBody CreateCleaningTaskRequest request) { public CleaningTaskDto cleaningTasksPost(@RequestBody CreateCleaningTaskRequest request) {
if (request.getInstance().isEmpty() && StringUtils.isNotBlank(request.getTemplateId())) {
request.setInstance(cleaningTaskService.getInstanceByTemplateId(request.getTemplateId()));
}
return cleaningTaskService.createTask(request); return cleaningTaskService.createTask(request);
} }
@@ -57,6 +58,13 @@ public class CleaningTaskController {
return taskId; return taskId;
} }
@DeleteMapping
public void cleaningTasksDelete(@RequestParam List<String> taskIds) {
for (String taskId : taskIds) {
cleaningTaskService.deleteTask(taskId);
}
}
@GetMapping("/{taskId}/result") @GetMapping("/{taskId}/result")
public List<CleaningResultDto> cleaningTasksTaskIdGetResult(@PathVariable("taskId") String taskId) { public List<CleaningResultDto> cleaningTasksTaskIdGetResult(@PathVariable("taskId") String taskId) {
return cleaningTaskService.getTaskResults(taskId); return cleaningTaskService.getTaskResults(taskId);

View File

@@ -113,7 +113,7 @@ spec:
image: {{ include "etcd.image" . }} image: {{ include "etcd.image" . }}
imagePullPolicy: {{ .Values.image.pullPolicy | quote }} imagePullPolicy: {{ .Values.image.pullPolicy | quote }}
{{- if .Values.containerSecurityContext.enabled }} {{- if .Values.containerSecurityContext.enabled }}
securityContext: {{- omit .Values.containerSecurityContext "enabled" | toYaml | nindent 12 }} securityContext: {{- omit .Values.containerSecurityContext "enabled" | toYaml | nindent 14 }}
{{- end }} {{- end }}
{{- if .Values.diagnosticMode.enabled }} {{- if .Values.diagnosticMode.enabled }}
command: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.command "context" $) | nindent 14 }} command: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.command "context" $) | nindent 14 }}
@@ -356,7 +356,7 @@ spec:
{{- include "common.tplvalues.render" (dict "value" .Values.extraVolumeMounts "context" $) | nindent 12 }} {{- include "common.tplvalues.render" (dict "value" .Values.extraVolumeMounts "context" $) | nindent 12 }}
{{- end }} {{- end }}
{{- if .Values.sidecars }} {{- if .Values.sidecars }}
{{- include "common.tplvalues.render" (dict "value" .Values.sidecars "context" $) | nindent 8 }} {{- include "common.tplvalues.render" (dict "value" .Values.sidecars "context" $) | nindent 10 }}
{{- end }} {{- end }}
volumes: volumes:
{{- if and (eq .Values.auth.token.enabled true) (eq .Values.auth.token.type "jwt") }} {{- if and (eq .Values.auth.token.enabled true) (eq .Values.auth.token.type "jwt") }}

View File

@@ -17,7 +17,7 @@ spec:
- {{ . | quote }} - {{ . | quote }}
{{- end }} {{- end }}
persistentVolumeReclaimPolicy: Delete persistentVolumeReclaimPolicy: Delete
storageClassName: {{ include "common.storage.class" (dict "persistence" .Values.persistence "global" .Values.global) }} {{ include "common.storage.class" (dict "persistence" .Values.persistence "global" .Values.global) }}
local: # local类型 local: # local类型
path: {{ .Values.persistence.storagePath | default "/opt/milvus/data" }}/etcd path: {{ .Values.persistence.storagePath | default "/opt/milvus/data" }}/etcd
claimRef: claimRef:

View File

@@ -38,6 +38,8 @@ export default function CleansingTaskCreate() {
...item.defaultParams, ...item.defaultParams,
...item.overrides, ...item.overrides,
}, },
inputs: item.inputs,
outputs: item.outputs,
})), })),
}; };
navigate("/data/cleansing?view=task"); navigate("/data/cleansing?view=task");

View File

@@ -44,6 +44,8 @@ export default function CleansingTemplateCreate() {
...item.defaultParams, ...item.defaultParams,
...item.overrides, ...item.overrides,
}, },
inputs: item.inputs,
outputs: item.outputs,
})), })),
}; };

View File

@@ -107,7 +107,7 @@ export default function FileTable({result, fetchTaskResult}) {
onFilter: (value: string, record: any) => onFilter: (value: string, record: any) =>
record.srcName.toLowerCase().includes(value.toLowerCase()), record.srcName.toLowerCase().includes(value.toLowerCase()),
render: (text: string) => ( render: (text: string) => (
<span className="font-mono text-sm">{text?.replace(/\.[^/.]+$/, "")}</span> <span>{text?.replace(/\.[^/.]+$/, "")}</span>
), ),
}, },
{ {

View File

@@ -45,6 +45,13 @@ export default function TemplateList() {
}; };
const templateColumns = [ const templateColumns = [
{
title: "模板ID",
dataIndex: "id",
key: "id",
fixed: "left",
width: 100,
},
{ {
title: "模板名称", title: "模板名称",
dataIndex: "name", dataIndex: "name",

View File

@@ -70,7 +70,7 @@ INSERT IGNORE INTO t_operator
VALUES ('TextFormatter', 'TXT文本抽取', '抽取TXT中的文本。', '1.0.0', 'text', 'text', null, null, '', false), VALUES ('TextFormatter', 'TXT文本抽取', '抽取TXT中的文本。', '1.0.0', 'text', 'text', null, null, '', false),
('UnstructuredFormatter', 'Unstructured文本抽取', '基于Unstructured抽取非结构化文件的文本,目前支持PowerPoint演示文稿、Word文档以及Excel工作簿。', '1.0.0', 'text', 'text', null, null, '', false), ('UnstructuredFormatter', 'Unstructured文本抽取', '基于Unstructured抽取非结构化文件的文本,目前支持PowerPoint演示文稿、Word文档以及Excel工作簿。', '1.0.0', 'text', 'text', null, null, '', false),
('MineruFormatter', 'MinerU PDF文本抽取', '基于MinerU API,抽取PDF中的文本。', '1.0.0', 'text', 'text', null, null, '', false), ('MineruFormatter', 'MinerU PDF文本抽取', '基于MinerU API,抽取PDF中的文本。', '1.0.0', 'text', 'text', null, null, '', false),
('FileExporter', '落盘算子', '将文件保存到本地目录。', '1.0.0', 'all', 'all', null, null, '', false), ('FileExporter', '落盘算子', '将文件保存到本地目录。', '1.0.0', 'multimodal', 'multimodal', null, null, '', false),
('FileWithHighRepeatPhraseRateFilter', '文档词重复率检查', '去除重复词过多的文档。', '1.0.0', 'text', 'text', null, '{"repeatPhraseRatio": {"name": "文档词重复率", "description": "某个词的统计数/文档总词数 > 设定值,该文档被去除。", "type": "slider", "defaultVal": 0.5, "min": 0, "max": 1, "step": 0.1}, "hitStopwords": {"name": "去除停用词", "description": "统计重复词时,选择是否要去除停用词。", "type": "switch", "defaultVal": false, "required": true, "checkedLabel": "去除", "unCheckedLabel": "不去除"}}', '', 'false'), ('FileWithHighRepeatPhraseRateFilter', '文档词重复率检查', '去除重复词过多的文档。', '1.0.0', 'text', 'text', null, '{"repeatPhraseRatio": {"name": "文档词重复率", "description": "某个词的统计数/文档总词数 > 设定值,该文档被去除。", "type": "slider", "defaultVal": 0.5, "min": 0, "max": 1, "step": 0.1}, "hitStopwords": {"name": "去除停用词", "description": "统计重复词时,选择是否要去除停用词。", "type": "switch", "defaultVal": false, "required": true, "checkedLabel": "去除", "unCheckedLabel": "不去除"}}', '', 'false'),
('FileWithHighRepeatWordRateFilter', '文档字重复率检查', '去除重复字过多的文档。', '1.0.0', 'text', 'text', null, '{"repeatWordRatio": {"name": "文档字重复率", "description": "某个字的统计数/文档总字数 > 设定值,该文档被去除。", "type": "slider", "defaultVal": 0.5, "min": 0, "max": 1, "step": 0.1}}', '', 'false'), ('FileWithHighRepeatWordRateFilter', '文档字重复率检查', '去除重复字过多的文档。', '1.0.0', 'text', 'text', null, '{"repeatWordRatio": {"name": "文档字重复率", "description": "某个字的统计数/文档总字数 > 设定值,该文档被去除。", "type": "slider", "defaultVal": 0.5, "min": 0, "max": 1, "step": 0.1}}', '', 'false'),
('FileWithHighSpecialCharRateFilter', '文档特殊字符率检查', '去除特殊字符过多的文档。', '1.0.0', 'text', 'text', null, '{"specialCharRatio": {"name": "文档特殊字符率", "description": "特殊字符的统计数/文档总字数 > 设定值,该文档被去除。", "type": "slider", "defaultVal": 0.3, "min": 0, "max": 1, "step": 0.1}}', '', 'false'), ('FileWithHighSpecialCharRateFilter', '文档特殊字符率检查', '去除特殊字符过多的文档。', '1.0.0', 'text', 'text', null, '{"specialCharRatio": {"name": "文档特殊字符率", "description": "特殊字符的统计数/文档总字数 > 设定值,该文档被去除。", "type": "slider", "defaultVal": 0.3, "min": 0, "max": 1, "step": 0.1}}', '', 'false'),