You've already forked DataMate
优化清洗重试机制,优化清洗进度展示,修复模板无法展示参数 (#113)
* bugfix: 模板无法展示参数 * bugfix: 优化清洗进度展示 * bugfix: 优化清洗重试机制
This commit is contained in:
@@ -41,6 +41,7 @@ import java.util.*;
|
|||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@@ -116,7 +117,7 @@ public class CleaningTaskService {
|
|||||||
|
|
||||||
prepareTask(task, request.getInstance());
|
prepareTask(task, request.getInstance());
|
||||||
scanDataset(taskId, request.getSrcDatasetId());
|
scanDataset(taskId, request.getSrcDatasetId());
|
||||||
executeTask(taskId);
|
taskScheduler.executeTask(taskId);
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -170,6 +171,11 @@ public class CleaningTaskService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void executeTask(String taskId) {
|
public void executeTask(String taskId) {
|
||||||
|
List<CleaningResultDto> failed = cleaningResultRepo.findByInstanceId(taskId, "FAILED");
|
||||||
|
Set<String> failedSet = failed.stream().map(CleaningResultDto::getSrcFileId).collect(Collectors.toSet());
|
||||||
|
CleaningTaskDto task = cleaningTaskRepo.findTaskById(taskId);
|
||||||
|
scanDataset(taskId, task.getSrcDatasetId(), failedSet);
|
||||||
|
cleaningResultRepo.deleteByInstanceId(taskId, "FAILED");
|
||||||
taskScheduler.executeTask(taskId);
|
taskScheduler.executeTask(taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -226,6 +232,29 @@ public class CleaningTaskService {
|
|||||||
} while (pageNumber < datasetFiles.getTotalPages());
|
} while (pageNumber < datasetFiles.getTotalPages());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void scanDataset(String taskId, String srcDatasetId, Set<String> failedFiles) {
|
||||||
|
int pageNumber = 0;
|
||||||
|
int pageSize = 500;
|
||||||
|
PagingQuery pageRequest = new PagingQuery(pageNumber, pageSize);
|
||||||
|
PagedResponse<DatasetFile> datasetFiles;
|
||||||
|
do {
|
||||||
|
datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null,null, pageRequest);
|
||||||
|
if (datasetFiles.getContent().isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
List<Map<String, Object>> files = datasetFiles.getContent().stream()
|
||||||
|
.filter(content -> failedFiles.contains(content.getId()))
|
||||||
|
.map(content -> Map.of("fileName", (Object) content.getFileName(),
|
||||||
|
"fileSize", content.getFileSize(),
|
||||||
|
"filePath", content.getFilePath(),
|
||||||
|
"fileType", content.getFileType(),
|
||||||
|
"fileId", content.getId()))
|
||||||
|
.toList();
|
||||||
|
writeListMapToJsonlFile(files, FLOW_PATH + "/" + taskId + "/dataset.jsonl");
|
||||||
|
pageNumber += 1;
|
||||||
|
} while (pageNumber < datasetFiles.getTotalPages());
|
||||||
|
}
|
||||||
|
|
||||||
private void writeListMapToJsonlFile(List<Map<String, Object>> mapList, String fileName) {
|
private void writeListMapToJsonlFile(List<Map<String, Object>> mapList, String fileName) {
|
||||||
ObjectMapper objectMapper = new ObjectMapper();
|
ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
|
||||||
|
|||||||
@@ -6,8 +6,13 @@ import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
|
|||||||
import com.datamate.cleaning.infrastructure.validator.CleanTaskValidator;
|
import com.datamate.cleaning.infrastructure.validator.CleanTaskValidator;
|
||||||
import com.datamate.cleaning.interfaces.dto.*;
|
import com.datamate.cleaning.interfaces.dto.*;
|
||||||
import com.datamate.cleaning.domain.model.entity.TemplateWithInstance;
|
import com.datamate.cleaning.domain.model.entity.TemplateWithInstance;
|
||||||
|
import com.datamate.common.infrastructure.exception.BusinessException;
|
||||||
|
import com.datamate.operator.application.OperatorService;
|
||||||
import com.datamate.operator.domain.repository.OperatorViewRepository;
|
import com.datamate.operator.domain.repository.OperatorViewRepository;
|
||||||
|
import com.datamate.operator.infrastructure.exception.OperatorErrorCode;
|
||||||
import com.datamate.operator.interfaces.dto.OperatorDto;
|
import com.datamate.operator.interfaces.dto.OperatorDto;
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
@@ -31,6 +36,10 @@ public class CleaningTemplateService {
|
|||||||
|
|
||||||
private final CleanTaskValidator cleanTaskValidator;
|
private final CleanTaskValidator cleanTaskValidator;
|
||||||
|
|
||||||
|
private final OperatorService operatorService;
|
||||||
|
|
||||||
|
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
|
||||||
public List<CleaningTemplateDto> getTemplates(String keywords) {
|
public List<CleaningTemplateDto> getTemplates(String keywords) {
|
||||||
List<OperatorDto> allOperators =
|
List<OperatorDto> allOperators =
|
||||||
operatorViewRepo.findOperatorsByCriteria(null, null, null, null, null);
|
operatorViewRepo.findOperatorsByCriteria(null, null, null, null, null);
|
||||||
@@ -50,7 +59,12 @@ public class CleaningTemplateService {
|
|||||||
.map(v -> {
|
.map(v -> {
|
||||||
OperatorDto operator = operatorsMap.get(v.getOperatorId());
|
OperatorDto operator = operatorsMap.get(v.getOperatorId());
|
||||||
if (StringUtils.isNotBlank(v.getSettingsOverride())) {
|
if (StringUtils.isNotBlank(v.getSettingsOverride())) {
|
||||||
operator.setSettings(v.getSettingsOverride());
|
try {
|
||||||
|
operator.setOverrides(objectMapper.readValue(v.getSettingsOverride(), Map.class));
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
throw BusinessException.of(OperatorErrorCode.SETTINGS_PARSE_FAILED, e.getMessage());
|
||||||
|
}
|
||||||
|
operatorService.overrideSettings(operator);
|
||||||
}
|
}
|
||||||
return operator;
|
return operator;
|
||||||
}).toList());
|
}).toList());
|
||||||
|
|||||||
@@ -10,7 +10,11 @@ import java.util.List;
|
|||||||
public interface CleaningResultRepository extends IRepository<CleaningResult> {
|
public interface CleaningResultRepository extends IRepository<CleaningResult> {
|
||||||
void deleteByInstanceId(String instanceId);
|
void deleteByInstanceId(String instanceId);
|
||||||
|
|
||||||
|
void deleteByInstanceId(String instanceId, String status);
|
||||||
|
|
||||||
int[] countByInstanceId(String instanceId);
|
int[] countByInstanceId(String instanceId);
|
||||||
|
|
||||||
List<CleaningResultDto> findByInstanceId(String instanceId);
|
List<CleaningResultDto> findByInstanceId(String instanceId);
|
||||||
|
|
||||||
|
List<CleaningResultDto> findByInstanceId(String instanceId, String status);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,8 +22,14 @@ public class CleaningResultRepositoryImpl extends CrudRepository<CleaningResultM
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteByInstanceId(String instanceId) {
|
public void deleteByInstanceId(String instanceId) {
|
||||||
|
deleteByInstanceId(instanceId, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteByInstanceId(String instanceId, String status) {
|
||||||
LambdaQueryWrapper<CleaningResult> queryWrapper = new LambdaQueryWrapper<>();
|
LambdaQueryWrapper<CleaningResult> queryWrapper = new LambdaQueryWrapper<>();
|
||||||
queryWrapper.eq(CleaningResult::getInstanceId, instanceId);
|
queryWrapper.eq(CleaningResult::getInstanceId, instanceId)
|
||||||
|
.eq(StringUtils.isNotBlank(status), CleaningResult::getStatus, status);
|
||||||
mapper.delete(queryWrapper);
|
mapper.delete(queryWrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -40,8 +46,13 @@ public class CleaningResultRepositoryImpl extends CrudRepository<CleaningResultM
|
|||||||
}
|
}
|
||||||
|
|
||||||
public List<CleaningResultDto> findByInstanceId(String instanceId) {
|
public List<CleaningResultDto> findByInstanceId(String instanceId) {
|
||||||
|
return findByInstanceId(instanceId, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<CleaningResultDto> findByInstanceId(String instanceId, String status) {
|
||||||
LambdaQueryWrapper<CleaningResult> queryWrapper = new LambdaQueryWrapper<>();
|
LambdaQueryWrapper<CleaningResult> queryWrapper = new LambdaQueryWrapper<>();
|
||||||
queryWrapper.eq(CleaningResult::getInstanceId, instanceId);
|
queryWrapper.eq(CleaningResult::getInstanceId, instanceId)
|
||||||
|
.eq(StringUtils.isNotBlank(status), CleaningResult::getStatus, status);
|
||||||
return CleaningResultConverter.INSTANCE.convertEntityToDto(mapper.selectList(queryWrapper));
|
return CleaningResultConverter.INSTANCE.convertEntityToDto(mapper.selectList(queryWrapper));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -128,7 +128,7 @@ public class OperatorService {
|
|||||||
return operatorBasePath + File.separator + "extract" + File.separator + fileName;
|
return operatorBasePath + File.separator + "extract" + File.separator + fileName;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void overrideSettings(OperatorDto operatorDto) {
|
public void overrideSettings(OperatorDto operatorDto) {
|
||||||
if (StringUtils.isBlank(operatorDto.getSettings()) || MapUtils.isEmpty(operatorDto.getOverrides())) {
|
if (StringUtils.isBlank(operatorDto.getSettings()) || MapUtils.isEmpty(operatorDto.getOverrides())) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -110,7 +110,10 @@ export default function BasicInfo({ task }: { task: CleansingTask }) {
|
|||||||
{/* 处理进度 */}
|
{/* 处理进度 */}
|
||||||
<div>
|
<div>
|
||||||
<h3 className="text-lg font-semibold text-gray-900 mb-4">处理进度</h3>
|
<h3 className="text-lg font-semibold text-gray-900 mb-4">处理进度</h3>
|
||||||
<Progress percent={task?.progress?.process} showInfo />
|
{ task?.status?.value === TaskStatus.FAILED ?
|
||||||
|
<Progress percent={task?.progress?.process} size="small" status="exception" />
|
||||||
|
: <Progress percent={task?.progress?.process} size="small"/>
|
||||||
|
}
|
||||||
<div className="grid grid-cols-2 gap-4 text-sm mt-4">
|
<div className="grid grid-cols-2 gap-4 text-sm mt-4">
|
||||||
<div className="flex items-center gap-2">
|
<div className="flex items-center gap-2">
|
||||||
<span className="w-3 h-3 bg-green-500 rounded-full inline-block" />
|
<span className="w-3 h-3 bg-green-500 rounded-full inline-block" />
|
||||||
|
|||||||
@@ -250,7 +250,7 @@ export default function FileTable({result, fetchTaskResult}) {
|
|||||||
key: "action",
|
key: "action",
|
||||||
render: (_text: string, record: any) => (
|
render: (_text: string, record: any) => (
|
||||||
<div className="flex">
|
<div className="flex">
|
||||||
{record.status === "COMPLETED" && (
|
{record.status === "COMPLETED" ? (
|
||||||
<Button
|
<Button
|
||||||
type="link"
|
type="link"
|
||||||
size="small"
|
size="small"
|
||||||
@@ -258,6 +258,14 @@ export default function FileTable({result, fetchTaskResult}) {
|
|||||||
>
|
>
|
||||||
对比
|
对比
|
||||||
</Button>
|
</Button>
|
||||||
|
) : (
|
||||||
|
<Button
|
||||||
|
type="link"
|
||||||
|
size="small"
|
||||||
|
disabled
|
||||||
|
>
|
||||||
|
对比
|
||||||
|
</Button>
|
||||||
)}
|
)}
|
||||||
<Popover content="暂未开放">
|
<Popover content="暂未开放">
|
||||||
<Button type="link" size="small" disabled>下载</Button>
|
<Button type="link" size="small" disabled>下载</Button>
|
||||||
|
|||||||
@@ -177,10 +177,13 @@ export default function TaskList() {
|
|||||||
title: "进度",
|
title: "进度",
|
||||||
dataIndex: "process",
|
dataIndex: "process",
|
||||||
key: "process",
|
key: "process",
|
||||||
width: 200,
|
width: 150,
|
||||||
render: (progress: number) => (
|
render: (_, record: CleansingTask) => {
|
||||||
<Progress percent={progress} size="small" />
|
if (record?.status?.value == TaskStatus.FAILED) {
|
||||||
),
|
return <Progress percent={record?.progress?.process} size="small" status="exception" />;
|
||||||
|
}
|
||||||
|
return <Progress percent={record?.progress?.process} size="small"/>;
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
title: "已处理文件数",
|
title: "已处理文件数",
|
||||||
|
|||||||
@@ -45,13 +45,6 @@ export default function TemplateList() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const templateColumns = [
|
const templateColumns = [
|
||||||
{
|
|
||||||
title: "模板ID",
|
|
||||||
dataIndex: "id",
|
|
||||||
key: "id",
|
|
||||||
fixed: "left",
|
|
||||||
width: 100,
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
title: "模板名称",
|
title: "模板名称",
|
||||||
dataIndex: "name",
|
dataIndex: "name",
|
||||||
@@ -71,6 +64,13 @@ export default function TemplateList() {
|
|||||||
</Button>
|
</Button>
|
||||||
);
|
);
|
||||||
}},
|
}},
|
||||||
|
{
|
||||||
|
title: "模板ID",
|
||||||
|
dataIndex: "id",
|
||||||
|
key: "id",
|
||||||
|
fixed: "left",
|
||||||
|
width: 150,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
title: "算子数量",
|
title: "算子数量",
|
||||||
dataIndex: "num",
|
dataIndex: "num",
|
||||||
|
|||||||
@@ -91,10 +91,10 @@ class ImgDuplicatedImagesCleaner(Filter):
|
|||||||
with self.conn as connection:
|
with self.conn as connection:
|
||||||
connection.execute(text(create_tables_sql))
|
connection.execute(text(create_tables_sql))
|
||||||
# 判断是否有重复文件
|
# 判断是否有重复文件
|
||||||
result = connection.execute(text(query_sql, query_sql_params)).fetchall()
|
result = connection.execute(text(query_sql), query_sql_params).fetchall()
|
||||||
# 查询记录为空,无重复图片, 插入新文件特征
|
# 查询记录为空,无重复图片, 插入新文件特征
|
||||||
if not result:
|
if not result:
|
||||||
connection.execute(text(insert_sql, insert_sql_params))
|
connection.execute(text(insert_sql), insert_sql_params)
|
||||||
return img_bytes
|
return img_bytes
|
||||||
logger.info(f"taskId: {self.task_uuid} fileName: {file_name}, method: Duplicate ImagesCleaner. "
|
logger.info(f"taskId: {self.task_uuid} fileName: {file_name}, method: Duplicate ImagesCleaner. "
|
||||||
f"The image is duplicated and filtered ")
|
f"The image is duplicated and filtered ")
|
||||||
|
|||||||
@@ -288,7 +288,7 @@ class Filter(BaseOp):
|
|||||||
f"{str(get_exception_info(e))}")
|
f"{str(get_exception_info(e))}")
|
||||||
task_info = TaskInfoPersistence()
|
task_info = TaskInfoPersistence()
|
||||||
task_info.persistence_task_info(sample)
|
task_info.persistence_task_info(sample)
|
||||||
return False
|
raise e
|
||||||
|
|
||||||
sample["execute_status"] = execute_status
|
sample["execute_status"] = execute_status
|
||||||
# 文件无内容会被过滤
|
# 文件无内容会被过滤
|
||||||
|
|||||||
Reference in New Issue
Block a user