You've already forked DataMate
feat: Integrate Milvus service for enhanced knowledge base management and file deletion (#88)
* feat: Refactor system parameter management with new data structure and update logic * fix: 修复知识库相关问题
This commit is contained in:
@@ -2,6 +2,7 @@ package com.datamate.rag.indexer.application;
|
||||
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.datamate.common.setting.domain.repository.ModelConfigRepository;
|
||||
import com.datamate.rag.indexer.domain.model.FileStatus;
|
||||
import com.datamate.rag.indexer.domain.model.KnowledgeBase;
|
||||
import com.datamate.rag.indexer.domain.model.RagChunk;
|
||||
@@ -15,6 +16,7 @@ import com.datamate.common.interfaces.PagedResponse;
|
||||
import com.datamate.common.interfaces.PagingQuery;
|
||||
import com.datamate.rag.indexer.interfaces.dto.*;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -36,6 +38,7 @@ public class KnowledgeBaseService {
|
||||
private final KnowledgeBaseRepository knowledgeBaseRepository;
|
||||
private final RagFileRepository ragFileRepository;
|
||||
private final ApplicationEventPublisher eventPublisher;
|
||||
private final ModelConfigRepository modelConfigRepository;
|
||||
|
||||
|
||||
/**
|
||||
@@ -75,15 +78,39 @@ public class KnowledgeBaseService {
|
||||
// TODO: 删除知识库关联的所有文档
|
||||
}
|
||||
|
||||
public KnowledgeBase getById(String knowledgeBaseId) {
|
||||
return Optional.ofNullable(knowledgeBaseRepository.getById(knowledgeBaseId))
|
||||
public KnowledgeBaseResp getById(String knowledgeBaseId) {
|
||||
KnowledgeBase knowledgeBase = Optional.ofNullable(knowledgeBaseRepository.getById(knowledgeBaseId))
|
||||
.orElseThrow(() -> BusinessException.of(KnowledgeBaseErrorCode.KNOWLEDGE_BASE_NOT_FOUND));
|
||||
KnowledgeBaseResp resp = getKnowledgeBaseResp(knowledgeBase);
|
||||
resp.setEmbedding(modelConfigRepository.getById(knowledgeBase.getEmbeddingModel()));
|
||||
resp.setChat(modelConfigRepository.getById(knowledgeBase.getChatModel()));
|
||||
return resp;
|
||||
}
|
||||
|
||||
public PagedResponse<KnowledgeBase> list(KnowledgeBaseQueryReq request) {
|
||||
@NotNull
|
||||
private KnowledgeBaseResp getKnowledgeBaseResp(KnowledgeBase knowledgeBase) {
|
||||
KnowledgeBaseResp resp = new KnowledgeBaseResp();
|
||||
BeanUtils.copyProperties(knowledgeBase, resp);
|
||||
|
||||
// 获取该知识库的所有文件
|
||||
List<RagFile> files = ragFileRepository.findAllByKnowledgeBaseId(knowledgeBase.getId());
|
||||
resp.setFileCount((long) files.size());
|
||||
|
||||
// 计算分片总数
|
||||
long totalChunkCount = files.stream()
|
||||
.mapToLong(file -> file.getChunkCount() != null ? file.getChunkCount() : 0)
|
||||
.sum();
|
||||
resp.setChunkCount(totalChunkCount);
|
||||
return resp;
|
||||
}
|
||||
|
||||
public PagedResponse<KnowledgeBaseResp> list(KnowledgeBaseQueryReq request) {
|
||||
IPage<KnowledgeBase> page = new Page<>(request.getPage(), request.getSize());
|
||||
page = knowledgeBaseRepository.page(page, request);
|
||||
return PagedResponse.of(page.getRecords(), page.getCurrent(), page.getTotal(), page.getPages());
|
||||
|
||||
// 将 KnowledgeBase 转换为 KnowledgeBaseResp,并计算 fileCount 和 chunkCount
|
||||
List<KnowledgeBaseResp> respList = page.getRecords().stream().map(this::getKnowledgeBaseResp).toList();
|
||||
return PagedResponse.of(respList, page.getCurrent(), page.getTotal(), page.getPages());
|
||||
}
|
||||
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
@@ -104,7 +131,8 @@ public class KnowledgeBaseService {
|
||||
|
||||
public PagedResponse<RagFile> listFiles(String knowledgeBaseId, RagFileReq request) {
|
||||
IPage<RagFile> page = new Page<>(request.getPage(), request.getSize());
|
||||
page = ragFileRepository.page(page);
|
||||
request.setKnowledgeBaseId(knowledgeBaseId);
|
||||
page = ragFileRepository.page(page, request);
|
||||
return PagedResponse.of(page.getRecords(), page.getCurrent(), page.getTotal(), page.getPages());
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package com.datamate.rag.indexer.domain.repository;
|
||||
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.repository.IRepository;
|
||||
import com.datamate.rag.indexer.domain.model.RagFile;
|
||||
import com.datamate.rag.indexer.interfaces.dto.RagFileReq;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@@ -14,5 +16,9 @@ import java.util.List;
|
||||
public interface RagFileRepository extends IRepository<RagFile> {
|
||||
void removeByKnowledgeBaseId(String knowledgeBaseId);
|
||||
|
||||
List<RagFile> findByKnowledgeBaseId(String knowledgeBaseId);
|
||||
List<RagFile> findNotSuccessByKnowledgeBaseId(String knowledgeBaseId);
|
||||
|
||||
List<RagFile> findAllByKnowledgeBaseId(String knowledgeBaseId);
|
||||
|
||||
IPage<RagFile> page(IPage<RagFile> page, RagFileReq request);
|
||||
}
|
||||
|
||||
@@ -3,12 +3,13 @@ package com.datamate.rag.indexer.infrastructure.event;
|
||||
import com.datamate.common.setting.domain.entity.ModelConfig;
|
||||
import com.datamate.common.setting.domain.repository.ModelConfigRepository;
|
||||
import com.datamate.common.setting.infrastructure.client.ModelClient;
|
||||
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
|
||||
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
|
||||
import com.datamate.rag.indexer.domain.model.FileStatus;
|
||||
import com.datamate.rag.indexer.domain.model.RagFile;
|
||||
import com.datamate.rag.indexer.domain.repository.RagFileRepository;
|
||||
import com.datamate.rag.indexer.interfaces.dto.ProcessType;
|
||||
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
|
||||
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
|
||||
import com.google.common.collect.Lists;
|
||||
import dev.langchain4j.data.document.Document;
|
||||
import dev.langchain4j.data.document.DocumentParser;
|
||||
import dev.langchain4j.data.document.DocumentSplitter;
|
||||
@@ -18,7 +19,10 @@ import dev.langchain4j.data.document.parser.apache.pdfbox.ApachePdfBoxDocumentPa
|
||||
import dev.langchain4j.data.document.parser.apache.poi.ApachePoiDocumentParser;
|
||||
import dev.langchain4j.data.document.parser.apache.tika.ApacheTikaDocumentParser;
|
||||
import dev.langchain4j.data.document.parser.markdown.MarkdownDocumentParser;
|
||||
import dev.langchain4j.data.document.splitter.*;
|
||||
import dev.langchain4j.data.document.splitter.DocumentByLineSplitter;
|
||||
import dev.langchain4j.data.document.splitter.DocumentByParagraphSplitter;
|
||||
import dev.langchain4j.data.document.splitter.DocumentBySentenceSplitter;
|
||||
import dev.langchain4j.data.document.splitter.DocumentByWordSplitter;
|
||||
import dev.langchain4j.data.document.transformer.jsoup.HtmlToTextDocumentTransformer;
|
||||
import dev.langchain4j.data.embedding.Embedding;
|
||||
import dev.langchain4j.data.segment.TextSegment;
|
||||
@@ -68,7 +72,7 @@ public class RagEtlService {
|
||||
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
|
||||
public void processAfterCommit(DataInsertedEvent event) {
|
||||
// 执行 RAG 处理流水线
|
||||
List<RagFile> ragFiles = ragFileRepository.findByKnowledgeBaseId(event.knowledgeBase().getId());
|
||||
List<RagFile> ragFiles = ragFileRepository.findNotSuccessByKnowledgeBaseId(event.knowledgeBase().getId());
|
||||
|
||||
ragFiles.forEach(ragFile -> {
|
||||
try {
|
||||
@@ -108,6 +112,7 @@ public class RagEtlService {
|
||||
if (Arrays.asList("html", "htm").contains(file.getFileType().toLowerCase())) {
|
||||
document = new HtmlToTextDocumentTransformer().transform(document);
|
||||
}
|
||||
document.metadata().put("fileId", ragFile.getFileId());
|
||||
// 使用文档分块器对文档进行分块
|
||||
DocumentSplitter splitter = documentSplitter(event.addFilesReq().getProcessType());
|
||||
List<TextSegment> split = splitter.split(document);
|
||||
@@ -120,9 +125,12 @@ public class RagEtlService {
|
||||
ModelConfig model = modelConfigRepository.getById(event.knowledgeBase().getEmbeddingModel());
|
||||
EmbeddingModel embeddingModel = ModelClient.invokeEmbeddingModel(model);
|
||||
// 调用嵌入模型获取嵌入向量
|
||||
List<Embedding> content = embeddingModel.embedAll(split).content();
|
||||
// 存储嵌入向量到 Milvus
|
||||
embeddingStore(embeddingModel, event.knowledgeBase().getName()).addAll(content, split);
|
||||
|
||||
Lists.partition(split, 20).forEach(partition -> {
|
||||
List<Embedding> content = embeddingModel.embedAll(partition).content();
|
||||
// 存储嵌入向量到 Milvus
|
||||
embeddingStore(embeddingModel, event.knowledgeBase().getName()).addAll(content, partition);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
package com.datamate.rag.indexer.infrastructure.persistence.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.repository.CrudRepository;
|
||||
import com.datamate.rag.indexer.domain.model.FileStatus;
|
||||
import com.datamate.rag.indexer.domain.model.RagFile;
|
||||
import com.datamate.rag.indexer.domain.repository.RagFileRepository;
|
||||
import com.datamate.rag.indexer.infrastructure.persistence.mapper.RagFileMapper;
|
||||
import com.datamate.rag.indexer.interfaces.dto.RagFileReq;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@@ -23,10 +26,25 @@ public class RagFileRepositoryImpl extends CrudRepository<RagFileMapper, RagFile
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RagFile> findByKnowledgeBaseId(String knowledgeBaseId) {
|
||||
public List<RagFile> findNotSuccessByKnowledgeBaseId(String knowledgeBaseId) {
|
||||
return lambdaQuery()
|
||||
.eq(RagFile::getKnowledgeBaseId, knowledgeBaseId)
|
||||
.in(RagFile::getStatus, FileStatus.UNPROCESSED, FileStatus.PROCESS_FAILED)
|
||||
.list();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RagFile> findAllByKnowledgeBaseId(String knowledgeBaseId) {
|
||||
return lambdaQuery()
|
||||
.eq(RagFile::getKnowledgeBaseId, knowledgeBaseId)
|
||||
.list();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IPage<RagFile> page(IPage<RagFile> page, RagFileReq request) {
|
||||
return lambdaQuery()
|
||||
.eq(RagFile::getKnowledgeBaseId, request.getKnowledgeBaseId())
|
||||
.like(StringUtils.hasText(request.getFileName()), RagFile::getFileName, request.getFileName())
|
||||
.page(page);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
package com.datamate.rag.indexer.interfaces;
|
||||
|
||||
import com.datamate.rag.indexer.application.KnowledgeBaseService;
|
||||
import com.datamate.rag.indexer.domain.model.KnowledgeBase;
|
||||
import com.datamate.rag.indexer.domain.model.RagChunk;
|
||||
import com.datamate.rag.indexer.domain.model.RagFile;
|
||||
import com.datamate.common.interfaces.PagedResponse;
|
||||
import com.datamate.common.interfaces.PagingQuery;
|
||||
import com.datamate.rag.indexer.application.KnowledgeBaseService;
|
||||
import com.datamate.rag.indexer.domain.model.RagChunk;
|
||||
import com.datamate.rag.indexer.domain.model.RagFile;
|
||||
import com.datamate.rag.indexer.interfaces.dto.*;
|
||||
import jakarta.validation.Valid;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
@@ -65,7 +64,7 @@ public class KnowledgeBaseController {
|
||||
* @return 知识库
|
||||
*/
|
||||
@GetMapping("/{knowledgeBaseId}")
|
||||
public KnowledgeBase get(@PathVariable("knowledgeBaseId") String knowledgeBaseId) {
|
||||
public KnowledgeBaseResp get(@PathVariable("knowledgeBaseId") String knowledgeBaseId) {
|
||||
return knowledgeBaseService.getById(knowledgeBaseId);
|
||||
}
|
||||
|
||||
@@ -75,7 +74,7 @@ public class KnowledgeBaseController {
|
||||
* @return 知识库列表
|
||||
*/
|
||||
@PostMapping("/list")
|
||||
public PagedResponse<KnowledgeBase> list(@RequestBody @Valid KnowledgeBaseQueryReq request) {
|
||||
public PagedResponse<KnowledgeBaseResp> list(@RequestBody @Valid KnowledgeBaseQueryReq request) {
|
||||
return knowledgeBaseService.list(request);
|
||||
}
|
||||
|
||||
@@ -129,4 +128,4 @@ public class KnowledgeBaseController {
|
||||
PagingQuery pagingQuery) {
|
||||
return knowledgeBaseService.getChunks(knowledgeBaseId, ragFileId, pagingQuery);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.datamate.rag.indexer.interfaces.dto;
|
||||
|
||||
import com.datamate.common.setting.domain.entity.ModelConfig;
|
||||
import com.datamate.rag.indexer.domain.model.KnowledgeBase;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
/**
|
||||
* 知识库响应实体
|
||||
*
|
||||
* @author dallas
|
||||
* @since 2025-11-17
|
||||
*/
|
||||
@Setter
|
||||
@Getter
|
||||
public class KnowledgeBaseResp extends KnowledgeBase {
|
||||
private Long fileCount;
|
||||
private Long chunkCount;
|
||||
private ModelConfig embedding;
|
||||
private ModelConfig chat;
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.datamate.rag.indexer.interfaces.dto;
|
||||
|
||||
import com.datamate.common.interfaces.PagingQuery;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
/**
|
||||
* RAG 文件请求
|
||||
@@ -8,6 +10,9 @@ import com.datamate.common.interfaces.PagingQuery;
|
||||
* @author dallas
|
||||
* @since 2025-10-29
|
||||
*/
|
||||
@Setter
|
||||
@Getter
|
||||
public class RagFileReq extends PagingQuery {
|
||||
private String fileName;
|
||||
private String knowledgeBaseId;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user