feat: update knowledge base processing to use KnowledgeBase object and enhance configuration (#46)

* feat: update knowledge base processing to use KnowledgeBase object and enhance configuration
This commit is contained in:
Dallas98
2025-10-31 13:16:05 +08:00
committed by GitHub
parent d0bac68d3f
commit e854a0288a
10 changed files with 50 additions and 83 deletions

View File

@@ -75,6 +75,13 @@
<type>pom</type> <type>pom</type>
<scope>import</scope> <scope>import</scope>
</dependency> </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-bom</artifactId>
<version>3.25.5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId> <artifactId>spring-cloud-dependencies</artifactId>
@@ -165,12 +172,6 @@
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-starter-mcp-server-webmvc</artifactId>
<version>${spring-ai.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.baomidou</groupId> <groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId> <artifactId>mybatis-plus-spring-boot3-starter</artifactId>

View File

@@ -167,13 +167,5 @@ datamate:
# RAG配置 # RAG配置
rag: rag:
embedding: milvus-host: ${MILVUS_HOST:milvus-standalone}
model: ${RAG_EMBEDDING_MODEL:text-embedding-ada-002} milvus-port: ${MILVUS_PORT:19530}
api-key: ${RAG_API_KEY:}
dimension: ${RAG_DIMENSION:1536}
chunk:
size: ${RAG_CHUNK_SIZE:512}
overlap: ${RAG_CHUNK_OVERLAP:50}
retrieval:
top-k: ${RAG_TOP_K:5}
score-threshold: ${RAG_SCORE_THRESHOLD:0.7}

View File

@@ -98,15 +98,12 @@
<dependency> <dependency>
<groupId>dev.langchain4j</groupId> <groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-milvus</artifactId> <artifactId>langchain4j-milvus</artifactId>
</dependency> <exclusions>
<exclusion>
<dependency> <groupId>com.google.protobuf</groupId>
<groupId>dev.langchain4j</groupId> <artifactId>protobuf-java</artifactId>
<artifactId>langchain4j-embeddings-all-minilm-l6-v2</artifactId> </exclusion>
</dependency> </exclusions>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>milvus</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@@ -1,17 +0,0 @@
package com.dataengine.rag.indexer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
*
*
* @author dallas
* @since 2025-10-13
*/
@SpringBootApplication
public class RagApplication {
public static void main(String[] args) {
SpringApplication.run(RagApplication.class, args);
}
}

View File

@@ -93,13 +93,13 @@ public class KnowledgeBaseService {
List<RagFile> ragFiles = request.getFiles().stream().map(fileInfo -> { List<RagFile> ragFiles = request.getFiles().stream().map(fileInfo -> {
RagFile ragFile = new RagFile(); RagFile ragFile = new RagFile();
ragFile.setKnowledgeBaseId(knowledgeBase.getId()); ragFile.setKnowledgeBaseId(knowledgeBase.getId());
ragFile.setFileId(fileInfo.fileId()); ragFile.setFileId(fileInfo.id());
ragFile.setFileName(fileInfo.fileName()); ragFile.setFileName(fileInfo.name());
ragFile.setStatus(FileStatus.UNPROCESSED); ragFile.setStatus(FileStatus.UNPROCESSED);
return ragFile; return ragFile;
}).toList(); }).toList();
ragFileRepository.saveBatch(ragFiles, 100); ragFileRepository.saveBatch(ragFiles, 100);
eventPublisher.publishEvent(new DataInsertedEvent(knowledgeBase.getId(), request.getProcessType())); eventPublisher.publishEvent(new DataInsertedEvent(knowledgeBase, request.getProcessType()));
} }
public PagedResponse<RagFile> listFiles(String knowledgeBaseId, RagFileReq request) { public PagedResponse<RagFile> listFiles(String knowledgeBaseId, RagFileReq request) {

View File

@@ -1,5 +1,6 @@
package com.datamate.rag.indexer.infrastructure.event; package com.datamate.rag.indexer.infrastructure.event;
import com.datamate.rag.indexer.domain.model.KnowledgeBase;
import com.datamate.rag.indexer.interfaces.dto.ProcessType; import com.datamate.rag.indexer.interfaces.dto.ProcessType;
/** /**
@@ -8,5 +9,5 @@ import com.datamate.rag.indexer.interfaces.dto.ProcessType;
* @author dallas * @author dallas
* @since 2025-10-29 * @since 2025-10-29
*/ */
public record DataInsertedEvent(String knowledgeBaseId, ProcessType processType) { public record DataInsertedEvent(KnowledgeBase knowledgeBase, ProcessType processType) {
} }

View File

@@ -23,12 +23,11 @@ import dev.langchain4j.data.document.transformer.jsoup.HtmlToTextDocumentTransfo
import dev.langchain4j.data.embedding.Embedding; import dev.langchain4j.data.embedding.Embedding;
import dev.langchain4j.data.segment.TextSegment; import dev.langchain4j.data.segment.TextSegment;
import dev.langchain4j.model.embedding.EmbeddingModel; import dev.langchain4j.model.embedding.EmbeddingModel;
import dev.langchain4j.model.output.Response;
import dev.langchain4j.store.embedding.EmbeddingStore; import dev.langchain4j.store.embedding.EmbeddingStore;
import dev.langchain4j.store.embedding.milvus.MilvusEmbeddingStore; import dev.langchain4j.store.embedding.milvus.MilvusEmbeddingStore;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionPhase;
@@ -52,6 +51,11 @@ import java.util.concurrent.Semaphore;
public class RagEtlService { public class RagEtlService {
private static final Semaphore SEMAPHORE = new Semaphore(10); private static final Semaphore SEMAPHORE = new Semaphore(10);
@Value("${datamate.rag.milvus-host}")
private String milvusHost;
@Value("${datamate.rag.milvus-port}")
private int milvusPort;
private final RagFileRepository ragFileRepository; private final RagFileRepository ragFileRepository;
private final DatasetFileRepository datasetFileRepository; private final DatasetFileRepository datasetFileRepository;
@@ -64,7 +68,7 @@ public class RagEtlService {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void processAfterCommit(DataInsertedEvent event) { public void processAfterCommit(DataInsertedEvent event) {
// 执行 RAG 处理流水线 // 执行 RAG 处理流水线
List<RagFile> ragFiles = ragFileRepository.findByKnowledgeBaseId(event.knowledgeBaseId()); List<RagFile> ragFiles = ragFileRepository.findByKnowledgeBaseId(event.knowledgeBase().getId());
ragFiles.forEach(ragFile -> { ragFiles.forEach(ragFile -> {
try { try {
@@ -74,12 +78,13 @@ public class RagEtlService {
// 执行 RAG 处理流水线 // 执行 RAG 处理流水线
ragFile.setStatus(FileStatus.PROCESSING); ragFile.setStatus(FileStatus.PROCESSING);
ragFileRepository.updateById(ragFile); ragFileRepository.updateById(ragFile);
processRagFile(ragFile, event.processType()); processRagFile(ragFile, event);
// 更新文件状态为已处理 // 更新文件状态为已处理
ragFile.setStatus(FileStatus.PROCESSED); ragFile.setStatus(FileStatus.PROCESSED);
ragFileRepository.updateById(ragFile); ragFileRepository.updateById(ragFile);
} catch (Exception e) { } catch (Exception e) {
// 处理异常 // 处理异常
log.error("Error processing RAG file: {}", ragFile.getFileId(), e);
ragFile.setStatus(FileStatus.PROCESS_FAILED); ragFile.setStatus(FileStatus.PROCESS_FAILED);
ragFileRepository.updateById(ragFile); ragFileRepository.updateById(ragFile);
} finally { } finally {
@@ -93,7 +98,7 @@ public class RagEtlService {
); );
} }
private void processRagFile(RagFile ragFile, ProcessType processType) { private void processRagFile(RagFile ragFile, DataInsertedEvent event) {
DatasetFile file = datasetFileRepository.getById(ragFile.getFileId()); DatasetFile file = datasetFileRepository.getById(ragFile.getFileId());
// 使用文档解析器解析文档 // 使用文档解析器解析文档
DocumentParser parser = documentParser(file.getFileType()); DocumentParser parser = documentParser(file.getFileType());
@@ -101,10 +106,10 @@ public class RagEtlService {
Document document = FileSystemDocumentLoader.loadDocument(file.getFilePath(), parser); Document document = FileSystemDocumentLoader.loadDocument(file.getFilePath(), parser);
// 对html文档进行转换 // 对html文档进行转换
if (Arrays.asList("html", "htm").contains(file.getFileType().toLowerCase())) { if (Arrays.asList("html", "htm").contains(file.getFileType().toLowerCase())) {
document= new HtmlToTextDocumentTransformer().transform(document); document = new HtmlToTextDocumentTransformer().transform(document);
} }
// 使用文档分块器对文档进行分块 // 使用文档分块器对文档进行分块
DocumentSplitter splitter = documentSplitter(processType); DocumentSplitter splitter = documentSplitter(event.processType());
List<TextSegment> split = splitter.split(document); List<TextSegment> split = splitter.split(document);
// 更新分块数量 // 更新分块数量
@@ -112,12 +117,12 @@ public class RagEtlService {
ragFileRepository.updateById(ragFile); ragFileRepository.updateById(ragFile);
// 调用模型客户端获取嵌入模型 // 调用模型客户端获取嵌入模型
ModelConfig model = modelConfigRepository.getById("1"); ModelConfig model = modelConfigRepository.getById(event.knowledgeBase().getEmbeddingModel());
EmbeddingModel embeddingModel = ModelClient.invokeEmbeddingModel(model); EmbeddingModel embeddingModel = ModelClient.invokeEmbeddingModel(model);
// 调用嵌入模型获取嵌入向量 // 调用嵌入模型获取嵌入向量
Response<@NotNull List<Embedding>> response = embeddingModel.embedAll(split); List<Embedding> content = embeddingModel.embedAll(split).content();
// 存储嵌入向量到 Milvus // 存储嵌入向量到 Milvus
embeddingStore().addAll(response.content(), split); embeddingStore(embeddingModel, ragFile.getKnowledgeBaseId()).addAll(content, split);
} }
/** /**
@@ -139,19 +144,20 @@ public class RagEtlService {
public DocumentSplitter documentSplitter(ProcessType processType) { public DocumentSplitter documentSplitter(ProcessType processType) {
return switch (processType) { return switch (processType) {
case CHAPTER_CHUNK -> new DocumentByParagraphSplitter(1000, 100); case PARAGRAPH_CHUNK -> new DocumentByParagraphSplitter(1000, 100);
case PARAGRAPH_CHUNK -> new DocumentByLineSplitter(1000, 100); case CHAPTER_CHUNK -> new DocumentByLineSplitter(1000, 100);
case LENGTH_CHUNK -> new DocumentBySentenceSplitter(1000, 100); case CUSTOM_SEPARATOR_CHUNK -> new DocumentBySentenceSplitter(1000, 100);
case CUSTOM_SEPARATOR_CHUNK -> new DocumentByWordSplitter(1000, 100); case LENGTH_CHUNK -> new DocumentByWordSplitter(1000, 100);
case DEFAULT_CHUNK -> new DocumentByRegexSplitter("\\n\\n", "",1000, 100); case DEFAULT_CHUNK -> new DocumentByLineSplitter(1000, 100);
}; };
} }
public EmbeddingStore<TextSegment> embeddingStore() { public EmbeddingStore<TextSegment> embeddingStore(EmbeddingModel embeddingModel, String knowledgeBaseId) {
return MilvusEmbeddingStore.builder() return MilvusEmbeddingStore.builder()
.uri("http://milvus:19530") .host(milvusHost)
.collectionName("rag_embeddings") .port(milvusPort)
.dimension(1536) .collectionName("datamate_" + knowledgeBaseId)
.dimension(embeddingModel.dimension())
.build(); .build();
} }
} }

View File

@@ -1,8 +0,0 @@
package com.datamate.rag.indexer.interfaces;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class EmbeddingController {
}

View File

@@ -4,14 +4,14 @@ import com.datamate.rag.indexer.application.KnowledgeBaseService;
import com.datamate.rag.indexer.domain.model.KnowledgeBase; import com.datamate.rag.indexer.domain.model.KnowledgeBase;
import com.datamate.rag.indexer.domain.model.RagChunk; import com.datamate.rag.indexer.domain.model.RagChunk;
import com.datamate.rag.indexer.domain.model.RagFile; import com.datamate.rag.indexer.domain.model.RagFile;
import com.datamate.common.infrastructure.common.Response;
import com.datamate.common.interfaces.PagedResponse; import com.datamate.common.interfaces.PagedResponse;
import com.datamate.common.interfaces.PagingQuery; import com.datamate.common.interfaces.PagingQuery;
import com.datamate.rag.indexer.interfaces.dto.*; import com.datamate.rag.indexer.interfaces.dto.*;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import javax.validation.Valid;
/** /**
* 知识库控制器 * 知识库控制器
@@ -21,15 +21,10 @@ import javax.validation.Valid;
*/ */
@RestController @RestController
@RequiredArgsConstructor @RequiredArgsConstructor
@RequestMapping("/v1/knowledge-base") @RequestMapping("/knowledge-base")
public class KnowledgeBaseController { public class KnowledgeBaseController {
private final KnowledgeBaseService knowledgeBaseService; private final KnowledgeBaseService knowledgeBaseService;
@GetMapping(path = "/test1")
public String test() {
return "test1";
}
/** /**
* 创建知识库 * 创建知识库
* *
@@ -105,7 +100,7 @@ public class KnowledgeBaseController {
*/ */
@GetMapping("/{knowledgeBaseId}/files") @GetMapping("/{knowledgeBaseId}/files")
public PagedResponse<RagFile> listFiles(@PathVariable("knowledgeBaseId") String knowledgeBaseId, public PagedResponse<RagFile> listFiles(@PathVariable("knowledgeBaseId") String knowledgeBaseId,
@RequestBody @Valid RagFileReq request) { RagFileReq request) {
return knowledgeBaseService.listFiles(knowledgeBaseId, request); return knowledgeBaseService.listFiles(knowledgeBaseId, request);
} }

View File

@@ -18,6 +18,6 @@ public class AddFilesReq {
private ProcessType processType; private ProcessType processType;
private List<FileInfo> files; private List<FileInfo> files;
public record FileInfo(String fileId, String fileName) { public record FileInfo(String id, String name) {
} }
} }