feat(kg): 实现同步时间窗口过滤(P0-03)

为 DataManagementClient 的 5 个 listAll* 方法添加时间窗口过滤:
- listAllDatasets(updatedFrom, updatedTo)
- listAllWorkflows(updatedFrom, updatedTo)
- listAllJobs(updatedFrom, updatedTo)
- listAllLabelTasks(updatedFrom, updatedTo)
- listAllKnowledgeSets(updatedFrom, updatedTo)

特性:
- 时间参数构建为 HTTP 查询参数(ISO_LOCAL_DATE_TIME 格式)
- 客户端侧双重过滤(兼容上游未支持的场景)
- 参数校验:updatedFrom > updatedTo 时抛出异常
- null 元素安全处理:过滤列表中的 null 项
- 无参版本保留向后兼容

测试:
- 新增 DataManagementClientTest.java(28 个测试用例)
- 覆盖 URL 参数拼接、参数校验、本地过滤、null 安全、分页等场景
- 测试结果:162 tests, 0 failures

代码审查:
- Codex 两轮审查通过
- 修复 P2 问题:null 元素安全处理
This commit is contained in:
2026-02-18 14:00:01 +08:00
parent 75db6daeb5
commit 74daed1c25
2 changed files with 838 additions and 5 deletions

View File

@@ -13,8 +13,15 @@ import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
/** /**
* 数据管理服务 REST 客户端。 * 数据管理服务 REST 客户端。
@@ -26,6 +33,10 @@ import java.util.List;
@Slf4j @Slf4j
public class DataManagementClient { public class DataManagementClient {
private static final String UPDATED_FROM_PARAM = "updatedFrom";
private static final String UPDATED_TO_PARAM = "updatedTo";
private static final DateTimeFormatter DATETIME_QUERY_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE_TIME;
private final RestTemplate restTemplate; private final RestTemplate restTemplate;
private final String baseUrl; private final String baseUrl;
private final String annotationBaseUrl; private final String annotationBaseUrl;
@@ -44,6 +55,109 @@ public class DataManagementClient {
* 拉取所有数据集(自动分页)。 * 拉取所有数据集(自动分页)。
*/ */
public List<DatasetDTO> listAllDatasets() { public List<DatasetDTO> listAllDatasets() {
return listAllDatasets(null, null);
}
/**
* 拉取所有数据集(自动分页)并按更新时间窗口过滤。
* <p>
* 时间窗口参数会透传给上游服务;同时在本地再过滤一次,
* 以兼容上游暂未支持该查询参数的场景。
*/
public List<DatasetDTO> listAllDatasets(LocalDateTime updatedFrom, LocalDateTime updatedTo) {
Map<String, String> timeWindowQuery = buildTimeWindowQuery(updatedFrom, updatedTo);
List<DatasetDTO> datasets = fetchAllPaged(
baseUrl + "/data-management/datasets",
new ParameterizedTypeReference<PagedResult<DatasetDTO>>() {},
"datasets",
timeWindowQuery);
return filterByUpdatedAt(datasets, DatasetDTO::getUpdatedAt, updatedFrom, updatedTo);
}
/**
* 拉取所有工作流(自动分页)。
*/
public List<WorkflowDTO> listAllWorkflows() {
return listAllWorkflows(null, null);
}
/**
* 拉取所有工作流(自动分页)并按更新时间窗口过滤。
*/
public List<WorkflowDTO> listAllWorkflows(LocalDateTime updatedFrom, LocalDateTime updatedTo) {
Map<String, String> timeWindowQuery = buildTimeWindowQuery(updatedFrom, updatedTo);
List<WorkflowDTO> workflows = fetchAllPaged(
baseUrl + "/data-management/workflows",
new ParameterizedTypeReference<PagedResult<WorkflowDTO>>() {},
"workflows",
timeWindowQuery);
return filterByUpdatedAt(workflows, WorkflowDTO::getUpdatedAt, updatedFrom, updatedTo);
}
/**
* 拉取所有作业(自动分页)。
*/
public List<JobDTO> listAllJobs() {
return listAllJobs(null, null);
}
/**
* 拉取所有作业(自动分页)并按更新时间窗口过滤。
*/
public List<JobDTO> listAllJobs(LocalDateTime updatedFrom, LocalDateTime updatedTo) {
Map<String, String> timeWindowQuery = buildTimeWindowQuery(updatedFrom, updatedTo);
List<JobDTO> jobs = fetchAllPaged(
baseUrl + "/data-management/jobs",
new ParameterizedTypeReference<PagedResult<JobDTO>>() {},
"jobs",
timeWindowQuery);
return filterByUpdatedAt(jobs, JobDTO::getUpdatedAt, updatedFrom, updatedTo);
}
/**
* 拉取所有标注任务(自动分页,从标注服务)。
*/
public List<LabelTaskDTO> listAllLabelTasks() {
return listAllLabelTasks(null, null);
}
/**
* 拉取所有标注任务(自动分页,从标注服务)并按更新时间窗口过滤。
*/
public List<LabelTaskDTO> listAllLabelTasks(LocalDateTime updatedFrom, LocalDateTime updatedTo) {
Map<String, String> timeWindowQuery = buildTimeWindowQuery(updatedFrom, updatedTo);
List<LabelTaskDTO> tasks = fetchAllPaged(
annotationBaseUrl + "/annotation/label-tasks",
new ParameterizedTypeReference<PagedResult<LabelTaskDTO>>() {},
"label-tasks",
timeWindowQuery);
return filterByUpdatedAt(tasks, LabelTaskDTO::getUpdatedAt, updatedFrom, updatedTo);
}
/**
* 拉取所有知识集(自动分页)。
*/
public List<KnowledgeSetDTO> listAllKnowledgeSets() {
return listAllKnowledgeSets(null, null);
}
/**
* 拉取所有知识集(自动分页)并按更新时间窗口过滤。
*/
public List<KnowledgeSetDTO> listAllKnowledgeSets(LocalDateTime updatedFrom, LocalDateTime updatedTo) {
Map<String, String> timeWindowQuery = buildTimeWindowQuery(updatedFrom, updatedTo);
List<KnowledgeSetDTO> sets = fetchAllPaged(
baseUrl + "/data-management/knowledge-sets",
new ParameterizedTypeReference<PagedResult<KnowledgeSetDTO>>() {},
"knowledge-sets",
timeWindowQuery);
return filterByUpdatedAt(sets, KnowledgeSetDTO::getUpdatedAt, updatedFrom, updatedTo);
}
/**
* 拉取所有数据集(自动分页)。
*/
public List<DatasetDTO> listAllDatasetsLegacy() {
return fetchAllPaged( return fetchAllPaged(
baseUrl + "/data-management/datasets", baseUrl + "/data-management/datasets",
new ParameterizedTypeReference<PagedResult<DatasetDTO>>() {}, new ParameterizedTypeReference<PagedResult<DatasetDTO>>() {},
@@ -53,7 +167,7 @@ public class DataManagementClient {
/** /**
* 拉取所有工作流(自动分页)。 * 拉取所有工作流(自动分页)。
*/ */
public List<WorkflowDTO> listAllWorkflows() { public List<WorkflowDTO> listAllWorkflowsLegacy() {
return fetchAllPaged( return fetchAllPaged(
baseUrl + "/data-management/workflows", baseUrl + "/data-management/workflows",
new ParameterizedTypeReference<PagedResult<WorkflowDTO>>() {}, new ParameterizedTypeReference<PagedResult<WorkflowDTO>>() {},
@@ -63,7 +177,7 @@ public class DataManagementClient {
/** /**
* 拉取所有作业(自动分页)。 * 拉取所有作业(自动分页)。
*/ */
public List<JobDTO> listAllJobs() { public List<JobDTO> listAllJobsLegacy() {
return fetchAllPaged( return fetchAllPaged(
baseUrl + "/data-management/jobs", baseUrl + "/data-management/jobs",
new ParameterizedTypeReference<PagedResult<JobDTO>>() {}, new ParameterizedTypeReference<PagedResult<JobDTO>>() {},
@@ -73,7 +187,7 @@ public class DataManagementClient {
/** /**
* 拉取所有标注任务(自动分页,从标注服务)。 * 拉取所有标注任务(自动分页,从标注服务)。
*/ */
public List<LabelTaskDTO> listAllLabelTasks() { public List<LabelTaskDTO> listAllLabelTasksLegacy() {
return fetchAllPaged( return fetchAllPaged(
annotationBaseUrl + "/annotation/label-tasks", annotationBaseUrl + "/annotation/label-tasks",
new ParameterizedTypeReference<PagedResult<LabelTaskDTO>>() {}, new ParameterizedTypeReference<PagedResult<LabelTaskDTO>>() {},
@@ -83,7 +197,7 @@ public class DataManagementClient {
/** /**
* 拉取所有知识集(自动分页)。 * 拉取所有知识集(自动分页)。
*/ */
public List<KnowledgeSetDTO> listAllKnowledgeSets() { public List<KnowledgeSetDTO> listAllKnowledgeSetsLegacy() {
return fetchAllPaged( return fetchAllPaged(
baseUrl + "/data-management/knowledge-sets", baseUrl + "/data-management/knowledge-sets",
new ParameterizedTypeReference<PagedResult<KnowledgeSetDTO>>() {}, new ParameterizedTypeReference<PagedResult<KnowledgeSetDTO>>() {},
@@ -96,11 +210,21 @@ public class DataManagementClient {
private <T> List<T> fetchAllPaged(String baseEndpoint, private <T> List<T> fetchAllPaged(String baseEndpoint,
ParameterizedTypeReference<PagedResult<T>> typeRef, ParameterizedTypeReference<PagedResult<T>> typeRef,
String resourceName) { String resourceName) {
return fetchAllPaged(baseEndpoint, typeRef, resourceName, Collections.emptyMap());
}
/**
* 通用自动分页拉取方法(支持附加查询参数)。
*/
private <T> List<T> fetchAllPaged(String baseEndpoint,
ParameterizedTypeReference<PagedResult<T>> typeRef,
String resourceName,
Map<String, String> extraQueryParams) {
List<T> allItems = new ArrayList<>(); List<T> allItems = new ArrayList<>();
int page = 0; int page = 0;
while (true) { while (true) {
String url = baseEndpoint + "?page=" + page + "&size=" + pageSize; String url = buildPagedUrl(baseEndpoint, page, extraQueryParams);
log.debug("Fetching {}: page={}, size={}", resourceName, page, pageSize); log.debug("Fetching {}: page={}, size={}", resourceName, page, pageSize);
try { try {
@@ -130,6 +254,66 @@ public class DataManagementClient {
return allItems; return allItems;
} }
private String buildPagedUrl(String baseEndpoint, int page, Map<String, String> extraQueryParams) {
StringBuilder builder = new StringBuilder(baseEndpoint)
.append("?page=").append(page)
.append("&size=").append(pageSize);
if (extraQueryParams != null && !extraQueryParams.isEmpty()) {
extraQueryParams.forEach((key, value) -> {
if (key == null || key.isBlank() || value == null || value.isBlank()) {
return;
}
builder.append("&")
.append(URLEncoder.encode(key, StandardCharsets.UTF_8))
.append("=")
.append(URLEncoder.encode(value, StandardCharsets.UTF_8));
});
}
return builder.toString();
}
private static Map<String, String> buildTimeWindowQuery(LocalDateTime updatedFrom, LocalDateTime updatedTo) {
if (updatedFrom != null && updatedTo != null && updatedFrom.isAfter(updatedTo)) {
throw new IllegalArgumentException("updatedFrom must be less than or equal to updatedTo");
}
Map<String, String> query = new LinkedHashMap<>();
if (updatedFrom != null) {
query.put(UPDATED_FROM_PARAM, DATETIME_QUERY_FORMATTER.format(updatedFrom));
}
if (updatedTo != null) {
query.put(UPDATED_TO_PARAM, DATETIME_QUERY_FORMATTER.format(updatedTo));
}
return query;
}
private static <T> List<T> filterByUpdatedAt(
List<T> items,
Function<T, LocalDateTime> updatedAtGetter,
LocalDateTime updatedFrom,
LocalDateTime updatedTo) {
if ((updatedFrom == null && updatedTo == null) || items == null || items.isEmpty()) {
return items;
}
return items.stream()
.filter(item -> {
if (item == null) {
return false;
}
LocalDateTime updatedAt = updatedAtGetter.apply(item);
if (updatedAt == null) {
return false;
}
if (updatedFrom != null && updatedAt.isBefore(updatedFrom)) {
return false;
}
return updatedTo == null || !updatedAt.isAfter(updatedTo);
})
.toList();
}
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
// 响应 DTO(仅包含同步所需字段) // 响应 DTO(仅包含同步所需字段)
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------

View File

@@ -0,0 +1,649 @@
package com.datamate.knowledgegraph.infrastructure.client;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.JobDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.KnowledgeSetDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.LabelTaskDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.PagedResult;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO;
import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.time.LocalDateTime;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class DataManagementClientTest {
private static final String BASE_URL = "http://dm-service:8080";
private static final String ANNOTATION_URL = "http://annotation-service:8081";
private static final int PAGE_SIZE = 10;
@Mock
private RestTemplate restTemplate;
private DataManagementClient client;
@BeforeEach
void setUp() {
KnowledgeGraphProperties properties = new KnowledgeGraphProperties();
KnowledgeGraphProperties.Sync sync = new KnowledgeGraphProperties.Sync();
sync.setDataManagementUrl(BASE_URL);
sync.setAnnotationServiceUrl(ANNOTATION_URL);
sync.setPageSize(PAGE_SIZE);
properties.setSync(sync);
client = new DataManagementClient(restTemplate, properties);
}
// -----------------------------------------------------------------------
// Helper methods
// -----------------------------------------------------------------------
private <T> PagedResult<T> pagedResult(List<T> content, int page, int totalPages) {
PagedResult<T> result = new PagedResult<>();
result.setContent(content);
result.setPage(page);
result.setTotalPages(totalPages);
result.setTotalElements(content.size());
return result;
}
private DatasetDTO dataset(String id, LocalDateTime updatedAt) {
DatasetDTO dto = new DatasetDTO();
dto.setId(id);
dto.setName("dataset-" + id);
dto.setUpdatedAt(updatedAt);
return dto;
}
private WorkflowDTO workflow(String id, LocalDateTime updatedAt) {
WorkflowDTO dto = new WorkflowDTO();
dto.setId(id);
dto.setName("workflow-" + id);
dto.setUpdatedAt(updatedAt);
return dto;
}
private JobDTO job(String id, LocalDateTime updatedAt) {
JobDTO dto = new JobDTO();
dto.setId(id);
dto.setName("job-" + id);
dto.setUpdatedAt(updatedAt);
return dto;
}
private LabelTaskDTO labelTask(String id, LocalDateTime updatedAt) {
LabelTaskDTO dto = new LabelTaskDTO();
dto.setId(id);
dto.setName("label-task-" + id);
dto.setUpdatedAt(updatedAt);
return dto;
}
private KnowledgeSetDTO knowledgeSet(String id, LocalDateTime updatedAt) {
KnowledgeSetDTO dto = new KnowledgeSetDTO();
dto.setId(id);
dto.setName("knowledge-set-" + id);
dto.setUpdatedAt(updatedAt);
return dto;
}
@SuppressWarnings("unchecked")
private <T> void stubSinglePageResponse(List<T> content) {
PagedResult<T> paged = pagedResult(content, 0, 1);
when(restTemplate.exchange(
any(String.class), eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class)))
.thenReturn(ResponseEntity.ok(paged));
}
// -----------------------------------------------------------------------
// Time window URL parameter tests
// -----------------------------------------------------------------------
@Nested
class TimeWindowUrlTests {
@Test
void listAllDatasets_withTimeWindow_passesQueryParams() {
LocalDateTime from = LocalDateTime.of(2025, 1, 1, 0, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 6, 30, 23, 59, 59);
DatasetDTO ds = dataset("ds-1", LocalDateTime.of(2025, 3, 15, 10, 0));
stubSinglePageResponse(List.of(ds));
client.listAllDatasets(from, to);
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
verify(restTemplate).exchange(
urlCaptor.capture(), eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class));
String url = urlCaptor.getValue();
assertThat(url).contains("updatedFrom=2025-01-01T00%3A00%3A00");
assertThat(url).contains("updatedTo=2025-06-30T23%3A59%3A59");
assertThat(url).contains("page=0");
assertThat(url).contains("size=" + PAGE_SIZE);
}
@Test
void listAllDatasets_withOnlyUpdatedFrom_passesOnlyFromParam() {
LocalDateTime from = LocalDateTime.of(2025, 3, 1, 0, 0, 0);
DatasetDTO ds = dataset("ds-1", LocalDateTime.of(2025, 5, 1, 12, 0));
stubSinglePageResponse(List.of(ds));
client.listAllDatasets(from, null);
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
verify(restTemplate).exchange(
urlCaptor.capture(), eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class));
String url = urlCaptor.getValue();
assertThat(url).contains("updatedFrom=2025-03-01T00%3A00%3A00");
assertThat(url).doesNotContain("updatedTo");
}
@Test
void listAllDatasets_withOnlyUpdatedTo_passesOnlyToParam() {
LocalDateTime to = LocalDateTime.of(2025, 12, 31, 23, 59, 59);
DatasetDTO ds = dataset("ds-1", LocalDateTime.of(2025, 6, 1, 0, 0));
stubSinglePageResponse(List.of(ds));
client.listAllDatasets(null, to);
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
verify(restTemplate).exchange(
urlCaptor.capture(), eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class));
String url = urlCaptor.getValue();
assertThat(url).doesNotContain("updatedFrom");
assertThat(url).contains("updatedTo=2025-12-31T23%3A59%3A59");
}
@Test
void listAllDatasets_noTimeWindow_omitsTimeParams() {
DatasetDTO ds = dataset("ds-1", LocalDateTime.of(2025, 1, 1, 0, 0));
stubSinglePageResponse(List.of(ds));
client.listAllDatasets(null, null);
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
verify(restTemplate).exchange(
urlCaptor.capture(), eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class));
String url = urlCaptor.getValue();
assertThat(url).doesNotContain("updatedFrom");
assertThat(url).doesNotContain("updatedTo");
}
@Test
void noArgOverload_delegatesToTimeWindowVersion_omitsTimeParams() {
DatasetDTO ds = dataset("ds-1", LocalDateTime.of(2025, 1, 1, 0, 0));
stubSinglePageResponse(List.of(ds));
client.listAllDatasets();
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
verify(restTemplate).exchange(
urlCaptor.capture(), eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class));
String url = urlCaptor.getValue();
assertThat(url).doesNotContain("updatedFrom");
assertThat(url).doesNotContain("updatedTo");
}
}
// -----------------------------------------------------------------------
// Validation
// -----------------------------------------------------------------------
@Nested
class ValidationTests {
@Test
void listAllDatasets_updatedFromAfterUpdatedTo_throwsIllegalArgument() {
LocalDateTime from = LocalDateTime.of(2025, 12, 31, 23, 59, 59);
LocalDateTime to = LocalDateTime.of(2025, 1, 1, 0, 0, 0);
assertThatThrownBy(() -> client.listAllDatasets(from, to))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("updatedFrom must be less than or equal to updatedTo");
}
@Test
void listAllWorkflows_updatedFromAfterUpdatedTo_throwsIllegalArgument() {
LocalDateTime from = LocalDateTime.of(2025, 6, 1, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 1, 1, 0, 0);
assertThatThrownBy(() -> client.listAllWorkflows(from, to))
.isInstanceOf(IllegalArgumentException.class);
}
@Test
void listAllJobs_updatedFromAfterUpdatedTo_throwsIllegalArgument() {
LocalDateTime from = LocalDateTime.of(2025, 6, 1, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 1, 1, 0, 0);
assertThatThrownBy(() -> client.listAllJobs(from, to))
.isInstanceOf(IllegalArgumentException.class);
}
@Test
void listAllLabelTasks_updatedFromAfterUpdatedTo_throwsIllegalArgument() {
LocalDateTime from = LocalDateTime.of(2025, 6, 1, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 1, 1, 0, 0);
assertThatThrownBy(() -> client.listAllLabelTasks(from, to))
.isInstanceOf(IllegalArgumentException.class);
}
@Test
void listAllKnowledgeSets_updatedFromAfterUpdatedTo_throwsIllegalArgument() {
LocalDateTime from = LocalDateTime.of(2025, 6, 1, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 1, 1, 0, 0);
assertThatThrownBy(() -> client.listAllKnowledgeSets(from, to))
.isInstanceOf(IllegalArgumentException.class);
}
@Test
void listAllDatasets_sameFromAndTo_doesNotThrow() {
LocalDateTime ts = LocalDateTime.of(2025, 6, 1, 12, 0, 0);
DatasetDTO ds = dataset("ds-1", ts);
stubSinglePageResponse(List.of(ds));
List<DatasetDTO> result = client.listAllDatasets(ts, ts);
assertThat(result).hasSize(1);
}
}
// -----------------------------------------------------------------------
// Local filtering (client-side updatedAt filter)
// -----------------------------------------------------------------------
@Nested
class LocalFilteringTests {
@Test
void listAllDatasets_filtersItemsBeforeUpdatedFrom() {
LocalDateTime from = LocalDateTime.of(2025, 6, 1, 0, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 12, 31, 23, 59, 59);
DatasetDTO old = dataset("ds-old", LocalDateTime.of(2025, 1, 1, 0, 0));
DatasetDTO recent = dataset("ds-recent", LocalDateTime.of(2025, 7, 1, 0, 0));
stubSinglePageResponse(List.of(old, recent));
List<DatasetDTO> result = client.listAllDatasets(from, to);
assertThat(result).hasSize(1);
assertThat(result.get(0).getId()).isEqualTo("ds-recent");
}
@Test
void listAllDatasets_filtersItemsAfterUpdatedTo() {
LocalDateTime from = LocalDateTime.of(2025, 1, 1, 0, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 6, 30, 23, 59, 59);
DatasetDTO inRange = dataset("ds-in", LocalDateTime.of(2025, 3, 15, 10, 0));
DatasetDTO tooNew = dataset("ds-new", LocalDateTime.of(2025, 9, 1, 0, 0));
stubSinglePageResponse(List.of(inRange, tooNew));
List<DatasetDTO> result = client.listAllDatasets(from, to);
assertThat(result).hasSize(1);
assertThat(result.get(0).getId()).isEqualTo("ds-in");
}
@Test
void listAllDatasets_filtersItemsWithNullUpdatedAt() {
LocalDateTime from = LocalDateTime.of(2025, 1, 1, 0, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 12, 31, 23, 59, 59);
DatasetDTO withTime = dataset("ds-with", LocalDateTime.of(2025, 6, 1, 0, 0));
DatasetDTO noTime = dataset("ds-null", null);
stubSinglePageResponse(List.of(withTime, noTime));
List<DatasetDTO> result = client.listAllDatasets(from, to);
assertThat(result).hasSize(1);
assertThat(result.get(0).getId()).isEqualTo("ds-with");
}
@Test
void listAllDatasets_includesBoundaryValues() {
LocalDateTime from = LocalDateTime.of(2025, 6, 1, 0, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 6, 30, 23, 59, 59);
DatasetDTO exactFrom = dataset("ds-from", from);
DatasetDTO exactTo = dataset("ds-to", to);
DatasetDTO inBetween = dataset("ds-mid", LocalDateTime.of(2025, 6, 15, 12, 0));
stubSinglePageResponse(List.of(exactFrom, exactTo, inBetween));
List<DatasetDTO> result = client.listAllDatasets(from, to);
assertThat(result).hasSize(3);
assertThat(result).extracting(DatasetDTO::getId)
.containsExactly("ds-from", "ds-to", "ds-mid");
}
@Test
void listAllDatasets_noTimeWindow_returnsAllItems() {
DatasetDTO ds1 = dataset("ds-1", LocalDateTime.of(2020, 1, 1, 0, 0));
DatasetDTO ds2 = dataset("ds-2", null);
stubSinglePageResponse(List.of(ds1, ds2));
List<DatasetDTO> result = client.listAllDatasets(null, null);
assertThat(result).hasSize(2);
}
@SuppressWarnings("unchecked")
@Test
void listAllDatasets_withNullItemInList_doesNotThrowNPE() {
LocalDateTime from = LocalDateTime.of(2025, 1, 1, 0, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 12, 31, 23, 59, 59);
DatasetDTO valid = dataset("ds-valid", LocalDateTime.of(2025, 6, 1, 0, 0));
// Build a list that contains a null element to simulate upstream returning null items
List<DatasetDTO> contentWithNull = new java.util.ArrayList<>();
contentWithNull.add(valid);
contentWithNull.add(null);
contentWithNull.add(dataset("ds-old", LocalDateTime.of(2024, 1, 1, 0, 0)));
PagedResult<DatasetDTO> paged = pagedResult(contentWithNull, 0, 1);
when(restTemplate.exchange(
any(String.class), eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class)))
.thenReturn(ResponseEntity.ok(paged));
List<DatasetDTO> result = client.listAllDatasets(from, to);
assertThat(result).hasSize(1);
assertThat(result.get(0).getId()).isEqualTo("ds-valid");
}
@Test
void listAllWorkflows_filtersCorrectly() {
LocalDateTime from = LocalDateTime.of(2025, 6, 1, 0, 0, 0);
WorkflowDTO old = workflow("wf-old", LocalDateTime.of(2025, 1, 1, 0, 0));
WorkflowDTO recent = workflow("wf-recent", LocalDateTime.of(2025, 7, 1, 0, 0));
stubSinglePageResponse(List.of(old, recent));
List<WorkflowDTO> result = client.listAllWorkflows(from, null);
assertThat(result).hasSize(1);
assertThat(result.get(0).getId()).isEqualTo("wf-recent");
}
@Test
void listAllJobs_filtersCorrectly() {
LocalDateTime to = LocalDateTime.of(2025, 6, 30, 23, 59, 59);
JobDTO inRange = job("j-in", LocalDateTime.of(2025, 3, 1, 0, 0));
JobDTO outOfRange = job("j-out", LocalDateTime.of(2025, 9, 1, 0, 0));
stubSinglePageResponse(List.of(inRange, outOfRange));
List<JobDTO> result = client.listAllJobs(null, to);
assertThat(result).hasSize(1);
assertThat(result.get(0).getId()).isEqualTo("j-in");
}
@Test
void listAllLabelTasks_filtersCorrectly() {
LocalDateTime from = LocalDateTime.of(2025, 6, 1, 0, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 6, 30, 23, 59, 59);
LabelTaskDTO inRange = labelTask("lt-in", LocalDateTime.of(2025, 6, 15, 0, 0));
LabelTaskDTO outOfRange = labelTask("lt-out", LocalDateTime.of(2025, 1, 1, 0, 0));
stubSinglePageResponse(List.of(inRange, outOfRange));
List<LabelTaskDTO> result = client.listAllLabelTasks(from, to);
assertThat(result).hasSize(1);
assertThat(result.get(0).getId()).isEqualTo("lt-in");
}
@Test
void listAllKnowledgeSets_filtersCorrectly() {
LocalDateTime from = LocalDateTime.of(2025, 6, 1, 0, 0, 0);
KnowledgeSetDTO old = knowledgeSet("ks-old", LocalDateTime.of(2025, 1, 1, 0, 0));
KnowledgeSetDTO recent = knowledgeSet("ks-new", LocalDateTime.of(2025, 8, 1, 0, 0));
stubSinglePageResponse(List.of(old, recent));
List<KnowledgeSetDTO> result = client.listAllKnowledgeSets(from, null);
assertThat(result).hasSize(1);
assertThat(result.get(0).getId()).isEqualTo("ks-new");
}
}
// -----------------------------------------------------------------------
// Pagination with time window
// -----------------------------------------------------------------------
@Nested
class PaginationTests {
@SuppressWarnings("unchecked")
@Test
void listAllDatasets_multiplePages_fetchesAllAndFilters() {
LocalDateTime from = LocalDateTime.of(2025, 6, 1, 0, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 12, 31, 23, 59, 59);
DatasetDTO ds1 = dataset("ds-1", LocalDateTime.of(2025, 7, 1, 0, 0));
DatasetDTO ds2 = dataset("ds-2", LocalDateTime.of(2025, 3, 1, 0, 0)); // outside
DatasetDTO ds3 = dataset("ds-3", LocalDateTime.of(2025, 9, 1, 0, 0));
PagedResult<DatasetDTO> page0 = pagedResult(List.of(ds1, ds2), 0, 2);
PagedResult<DatasetDTO> page1 = pagedResult(List.of(ds3), 1, 2);
when(restTemplate.exchange(
(String) argThat(url -> url != null && url.toString().contains("page=0")),
eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class)))
.thenReturn(ResponseEntity.ok(page0));
when(restTemplate.exchange(
(String) argThat(url -> url != null && url.toString().contains("page=1")),
eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class)))
.thenReturn(ResponseEntity.ok(page1));
List<DatasetDTO> result = client.listAllDatasets(from, to);
// ds2 is outside time window, filtered out client-side
assertThat(result).hasSize(2);
assertThat(result).extracting(DatasetDTO::getId)
.containsExactly("ds-1", "ds-3");
}
@SuppressWarnings("unchecked")
@Test
void listAllDatasets_emptyFirstPage_returnsEmptyList() {
PagedResult<DatasetDTO> emptyPage = pagedResult(List.of(), 0, 0);
when(restTemplate.exchange(
any(String.class), eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class)))
.thenReturn(ResponseEntity.ok(emptyPage));
List<DatasetDTO> result = client.listAllDatasets(
LocalDateTime.of(2025, 1, 1, 0, 0),
LocalDateTime.of(2025, 12, 31, 23, 59, 59));
assertThat(result).isEmpty();
}
}
// -----------------------------------------------------------------------
// Error propagation
// -----------------------------------------------------------------------
@Nested
class ErrorTests {
@Test
void listAllDatasets_restClientException_propagates() {
when(restTemplate.exchange(
any(String.class), eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class)))
.thenThrow(new RestClientException("connection refused"));
assertThatThrownBy(() -> client.listAllDatasets(
LocalDateTime.of(2025, 1, 1, 0, 0), null))
.isInstanceOf(RestClientException.class)
.hasMessageContaining("connection refused");
}
}
// -----------------------------------------------------------------------
// URL format for each entity type
// -----------------------------------------------------------------------
@Nested
class UrlEndpointTests {
@Test
void listAllWorkflows_usesCorrectEndpoint() {
WorkflowDTO wf = workflow("wf-1", LocalDateTime.of(2025, 6, 1, 0, 0));
stubSinglePageResponse(List.of(wf));
client.listAllWorkflows(LocalDateTime.of(2025, 1, 1, 0, 0), null);
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
verify(restTemplate).exchange(
urlCaptor.capture(), eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class));
assertThat(urlCaptor.getValue())
.startsWith(BASE_URL + "/data-management/workflows?");
}
@Test
void listAllJobs_usesCorrectEndpoint() {
JobDTO j = job("j-1", LocalDateTime.of(2025, 6, 1, 0, 0));
stubSinglePageResponse(List.of(j));
client.listAllJobs(LocalDateTime.of(2025, 1, 1, 0, 0), null);
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
verify(restTemplate).exchange(
urlCaptor.capture(), eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class));
assertThat(urlCaptor.getValue())
.startsWith(BASE_URL + "/data-management/jobs?");
}
@Test
void listAllLabelTasks_usesAnnotationServiceUrl() {
LabelTaskDTO lt = labelTask("lt-1", LocalDateTime.of(2025, 6, 1, 0, 0));
stubSinglePageResponse(List.of(lt));
client.listAllLabelTasks(LocalDateTime.of(2025, 1, 1, 0, 0), null);
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
verify(restTemplate).exchange(
urlCaptor.capture(), eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class));
assertThat(urlCaptor.getValue())
.startsWith(ANNOTATION_URL + "/annotation/label-tasks?");
}
@Test
void listAllKnowledgeSets_usesCorrectEndpoint() {
KnowledgeSetDTO ks = knowledgeSet("ks-1", LocalDateTime.of(2025, 6, 1, 0, 0));
stubSinglePageResponse(List.of(ks));
client.listAllKnowledgeSets(LocalDateTime.of(2025, 1, 1, 0, 0), null);
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
verify(restTemplate).exchange(
urlCaptor.capture(), eq(HttpMethod.GET), isNull(),
any(ParameterizedTypeReference.class));
assertThat(urlCaptor.getValue())
.startsWith(BASE_URL + "/data-management/knowledge-sets?");
}
}
// -----------------------------------------------------------------------
// No-arg overloads (backward compatibility)
// -----------------------------------------------------------------------
@Nested
class NoArgOverloadTests {
@Test
void listAllWorkflows_noArgs_returnsAll() {
WorkflowDTO wf = workflow("wf-1", LocalDateTime.of(2025, 6, 1, 0, 0));
stubSinglePageResponse(List.of(wf));
List<WorkflowDTO> result = client.listAllWorkflows();
assertThat(result).hasSize(1);
}
@Test
void listAllJobs_noArgs_returnsAll() {
JobDTO j = job("j-1", LocalDateTime.of(2025, 6, 1, 0, 0));
stubSinglePageResponse(List.of(j));
List<JobDTO> result = client.listAllJobs();
assertThat(result).hasSize(1);
}
@Test
void listAllLabelTasks_noArgs_returnsAll() {
LabelTaskDTO lt = labelTask("lt-1", LocalDateTime.of(2025, 6, 1, 0, 0));
stubSinglePageResponse(List.of(lt));
List<LabelTaskDTO> result = client.listAllLabelTasks();
assertThat(result).hasSize(1);
}
@Test
void listAllKnowledgeSets_noArgs_returnsAll() {
KnowledgeSetDTO ks = knowledgeSet("ks-1", LocalDateTime.of(2025, 6, 1, 0, 0));
stubSinglePageResponse(List.of(ks));
List<KnowledgeSetDTO> result = client.listAllKnowledgeSets();
assertThat(result).hasSize(1);
}
}
}