You've already forked DataMate
fix: 修复知识库问题 (#89)
* feat: Refactor system parameter management with new data structure and update logic * feat: Enhance dataset file management with improved file copying * feat: Enhance dataset file management with improved file copying * fix: 修复知识库相关问题 * feat: Integrate Milvus service for enhanced knowledge base management and file deletion
This commit is contained in:
@@ -2,6 +2,10 @@ package com.datamate.rag.indexer.application;
|
|||||||
|
|
||||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||||
|
import com.datamate.common.infrastructure.exception.BusinessException;
|
||||||
|
import com.datamate.common.infrastructure.exception.KnowledgeBaseErrorCode;
|
||||||
|
import com.datamate.common.interfaces.PagedResponse;
|
||||||
|
import com.datamate.common.interfaces.PagingQuery;
|
||||||
import com.datamate.common.setting.domain.repository.ModelConfigRepository;
|
import com.datamate.common.setting.domain.repository.ModelConfigRepository;
|
||||||
import com.datamate.rag.indexer.domain.model.FileStatus;
|
import com.datamate.rag.indexer.domain.model.FileStatus;
|
||||||
import com.datamate.rag.indexer.domain.model.KnowledgeBase;
|
import com.datamate.rag.indexer.domain.model.KnowledgeBase;
|
||||||
@@ -10,11 +14,10 @@ import com.datamate.rag.indexer.domain.model.RagFile;
|
|||||||
import com.datamate.rag.indexer.domain.repository.KnowledgeBaseRepository;
|
import com.datamate.rag.indexer.domain.repository.KnowledgeBaseRepository;
|
||||||
import com.datamate.rag.indexer.domain.repository.RagFileRepository;
|
import com.datamate.rag.indexer.domain.repository.RagFileRepository;
|
||||||
import com.datamate.rag.indexer.infrastructure.event.DataInsertedEvent;
|
import com.datamate.rag.indexer.infrastructure.event.DataInsertedEvent;
|
||||||
import com.datamate.common.infrastructure.exception.BusinessException;
|
|
||||||
import com.datamate.common.infrastructure.exception.KnowledgeBaseErrorCode;
|
|
||||||
import com.datamate.common.interfaces.PagedResponse;
|
|
||||||
import com.datamate.common.interfaces.PagingQuery;
|
|
||||||
import com.datamate.rag.indexer.interfaces.dto.*;
|
import com.datamate.rag.indexer.interfaces.dto.*;
|
||||||
|
import io.milvus.client.MilvusClient;
|
||||||
|
import io.milvus.param.collection.DropCollectionParam;
|
||||||
|
import io.milvus.param.dml.DeleteParam;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.springframework.beans.BeanUtils;
|
import org.springframework.beans.BeanUtils;
|
||||||
@@ -39,7 +42,7 @@ public class KnowledgeBaseService {
|
|||||||
private final RagFileRepository ragFileRepository;
|
private final RagFileRepository ragFileRepository;
|
||||||
private final ApplicationEventPublisher eventPublisher;
|
private final ApplicationEventPublisher eventPublisher;
|
||||||
private final ModelConfigRepository modelConfigRepository;
|
private final ModelConfigRepository modelConfigRepository;
|
||||||
|
private final MilvusClient milvusClient;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 创建知识库
|
* 创建知识库
|
||||||
@@ -72,10 +75,13 @@ public class KnowledgeBaseService {
|
|||||||
knowledgeBaseRepository.updateById(knowledgeBase);
|
knowledgeBaseRepository.updateById(knowledgeBase);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
public void delete(String knowledgeBaseId) {
|
public void delete(String knowledgeBaseId) {
|
||||||
|
KnowledgeBase knowledgeBase = Optional.ofNullable(knowledgeBaseRepository.getById(knowledgeBaseId))
|
||||||
|
.orElseThrow(() -> BusinessException.of(KnowledgeBaseErrorCode.KNOWLEDGE_BASE_NOT_FOUND));
|
||||||
knowledgeBaseRepository.removeById(knowledgeBaseId);
|
knowledgeBaseRepository.removeById(knowledgeBaseId);
|
||||||
ragFileRepository.removeByKnowledgeBaseId(knowledgeBaseId);
|
ragFileRepository.removeByKnowledgeBaseId(knowledgeBaseId);
|
||||||
// TODO: 删除知识库关联的所有文档
|
milvusClient.dropCollection(DropCollectionParam.newBuilder().withCollectionName(knowledgeBase.getName()).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
public KnowledgeBaseResp getById(String knowledgeBaseId) {
|
public KnowledgeBaseResp getById(String knowledgeBaseId) {
|
||||||
@@ -136,8 +142,15 @@ public class KnowledgeBaseService {
|
|||||||
return PagedResponse.of(page.getRecords(), page.getCurrent(), page.getTotal(), page.getPages());
|
return PagedResponse.of(page.getRecords(), page.getCurrent(), page.getTotal(), page.getPages());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Transactional(rollbackFor = Exception.class)
|
||||||
public void deleteFiles(String knowledgeBaseId, DeleteFilesReq request) {
|
public void deleteFiles(String knowledgeBaseId, DeleteFilesReq request) {
|
||||||
|
KnowledgeBase knowledgeBase = Optional.ofNullable(knowledgeBaseRepository.getById(knowledgeBaseId))
|
||||||
|
.orElseThrow(() -> BusinessException.of(KnowledgeBaseErrorCode.KNOWLEDGE_BASE_NOT_FOUND));
|
||||||
ragFileRepository.removeByIds(request.getIds());
|
ragFileRepository.removeByIds(request.getIds());
|
||||||
|
milvusClient.delete(DeleteParam.newBuilder()
|
||||||
|
.withCollectionName(knowledgeBase.getName())
|
||||||
|
.withExpr("metadata[\"rag_file_id\"] in [" + org.apache.commons.lang3.StringUtils.join(request.getIds().stream().map(id -> "\"" + id + "\"").toArray(), ",") + "]")
|
||||||
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
public PagedResponse<RagChunk> getChunks(String knowledgeBaseId, String ragFileId, PagingQuery pagingQuery) {
|
public PagedResponse<RagChunk> getChunks(String knowledgeBaseId, String ragFileId, PagingQuery pagingQuery) {
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import com.datamate.datamanagement.infrastructure.persistence.repository.Dataset
|
|||||||
import com.datamate.rag.indexer.domain.model.FileStatus;
|
import com.datamate.rag.indexer.domain.model.FileStatus;
|
||||||
import com.datamate.rag.indexer.domain.model.RagFile;
|
import com.datamate.rag.indexer.domain.model.RagFile;
|
||||||
import com.datamate.rag.indexer.domain.repository.RagFileRepository;
|
import com.datamate.rag.indexer.domain.repository.RagFileRepository;
|
||||||
|
import com.datamate.rag.indexer.infrastructure.milvus.MilvusService;
|
||||||
import com.datamate.rag.indexer.interfaces.dto.ProcessType;
|
import com.datamate.rag.indexer.interfaces.dto.ProcessType;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import dev.langchain4j.data.document.Document;
|
import dev.langchain4j.data.document.Document;
|
||||||
@@ -27,11 +28,8 @@ import dev.langchain4j.data.document.transformer.jsoup.HtmlToTextDocumentTransfo
|
|||||||
import dev.langchain4j.data.embedding.Embedding;
|
import dev.langchain4j.data.embedding.Embedding;
|
||||||
import dev.langchain4j.data.segment.TextSegment;
|
import dev.langchain4j.data.segment.TextSegment;
|
||||||
import dev.langchain4j.model.embedding.EmbeddingModel;
|
import dev.langchain4j.model.embedding.EmbeddingModel;
|
||||||
import dev.langchain4j.store.embedding.EmbeddingStore;
|
|
||||||
import dev.langchain4j.store.embedding.milvus.MilvusEmbeddingStore;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.scheduling.annotation.Async;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.event.TransactionPhase;
|
import org.springframework.transaction.event.TransactionPhase;
|
||||||
@@ -55,10 +53,7 @@ import java.util.concurrent.Semaphore;
|
|||||||
public class RagEtlService {
|
public class RagEtlService {
|
||||||
private static final Semaphore SEMAPHORE = new Semaphore(10);
|
private static final Semaphore SEMAPHORE = new Semaphore(10);
|
||||||
|
|
||||||
@Value("${datamate.rag.milvus-host:milvus-standalone}")
|
private final MilvusService milvusService;
|
||||||
private String milvusHost;
|
|
||||||
@Value("${datamate.rag.milvus-port:19530}")
|
|
||||||
private int milvusPort;
|
|
||||||
|
|
||||||
private final RagFileRepository ragFileRepository;
|
private final RagFileRepository ragFileRepository;
|
||||||
|
|
||||||
@@ -112,7 +107,7 @@ public class RagEtlService {
|
|||||||
if (Arrays.asList("html", "htm").contains(file.getFileType().toLowerCase())) {
|
if (Arrays.asList("html", "htm").contains(file.getFileType().toLowerCase())) {
|
||||||
document = new HtmlToTextDocumentTransformer().transform(document);
|
document = new HtmlToTextDocumentTransformer().transform(document);
|
||||||
}
|
}
|
||||||
document.metadata().put("fileId", ragFile.getFileId());
|
document.metadata().put("rag_file_id", ragFile.getId());
|
||||||
// 使用文档分块器对文档进行分块
|
// 使用文档分块器对文档进行分块
|
||||||
DocumentSplitter splitter = documentSplitter(event.addFilesReq().getProcessType());
|
DocumentSplitter splitter = documentSplitter(event.addFilesReq().getProcessType());
|
||||||
List<TextSegment> split = splitter.split(document);
|
List<TextSegment> split = splitter.split(document);
|
||||||
@@ -129,7 +124,7 @@ public class RagEtlService {
|
|||||||
Lists.partition(split, 20).forEach(partition -> {
|
Lists.partition(split, 20).forEach(partition -> {
|
||||||
List<Embedding> content = embeddingModel.embedAll(partition).content();
|
List<Embedding> content = embeddingModel.embedAll(partition).content();
|
||||||
// 存储嵌入向量到 Milvus
|
// 存储嵌入向量到 Milvus
|
||||||
embeddingStore(embeddingModel, event.knowledgeBase().getName()).addAll(content, partition);
|
milvusService.embeddingStore(embeddingModel, event.knowledgeBase().getName()).addAll(content, partition);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -159,13 +154,4 @@ public class RagEtlService {
|
|||||||
case DEFAULT_CHUNK -> new DocumentByLineSplitter(1000, 100);
|
case DEFAULT_CHUNK -> new DocumentByLineSplitter(1000, 100);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
public EmbeddingStore<TextSegment> embeddingStore(EmbeddingModel embeddingModel, String knowledgeBaseName) {
|
|
||||||
return MilvusEmbeddingStore.builder()
|
|
||||||
.host(milvusHost)
|
|
||||||
.port(milvusPort)
|
|
||||||
.collectionName(knowledgeBaseName)
|
|
||||||
.dimension(embeddingModel.dimension())
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,44 @@
|
|||||||
|
package com.datamate.rag.indexer.infrastructure.milvus;
|
||||||
|
|
||||||
|
import dev.langchain4j.data.segment.TextSegment;
|
||||||
|
import dev.langchain4j.model.embedding.EmbeddingModel;
|
||||||
|
import dev.langchain4j.store.embedding.EmbeddingStore;
|
||||||
|
import dev.langchain4j.store.embedding.milvus.MilvusEmbeddingStore;
|
||||||
|
import io.milvus.client.MilvusClient;
|
||||||
|
import io.milvus.client.MilvusServiceClient;
|
||||||
|
import io.milvus.param.ConnectParam;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Milvus 服务类
|
||||||
|
*
|
||||||
|
* @author dallas
|
||||||
|
* @since 2025-11-17
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class MilvusService {
|
||||||
|
@Value("${datamate.rag.milvus-host:milvus-standalone}")
|
||||||
|
private String milvusHost;
|
||||||
|
@Value("${datamate.rag.milvus-port:19530}")
|
||||||
|
private int milvusPort;
|
||||||
|
|
||||||
|
public EmbeddingStore<TextSegment> embeddingStore(EmbeddingModel embeddingModel, String knowledgeBaseName) {
|
||||||
|
return MilvusEmbeddingStore.builder()
|
||||||
|
.host(milvusHost)
|
||||||
|
.port(milvusPort)
|
||||||
|
.collectionName(knowledgeBaseName)
|
||||||
|
.dimension(embeddingModel.dimension())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public MilvusClient milvusClient() {
|
||||||
|
ConnectParam connectParam = ConnectParam.newBuilder()
|
||||||
|
.withHost(milvusHost)
|
||||||
|
.withPort(milvusPort)
|
||||||
|
.build();
|
||||||
|
return new MilvusServiceClient(connectParam);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -48,7 +48,7 @@ const KnowledgeBaseDetailPage: React.FC = () => {
|
|||||||
setSearchParams,
|
setSearchParams,
|
||||||
handleFiltersChange,
|
handleFiltersChange,
|
||||||
} = useFetchData<KBFile>(
|
} = useFetchData<KBFile>(
|
||||||
(params) => queryKnowledgeBaseFilesUsingGet(knowledgeBase?.id, params),
|
(params) => id ? queryKnowledgeBaseFilesUsingGet(id, params) : Promise.resolve({ data: [] }),
|
||||||
mapFileData
|
mapFileData
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -237,4 +237,4 @@ const KnowledgeBaseDetailPage: React.FC = () => {
|
|||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
export default KnowledgeBaseDetailPage;
|
export default KnowledgeBaseDetailPage;
|
||||||
Reference in New Issue
Block a user