线程池直接拉大

This commit is contained in:
Jerry Yan 2025-03-02 23:25:37 +08:00
parent 519f9969ec
commit e9890a3856
7 changed files with 50 additions and 18 deletions

View File

@ -0,0 +1,19 @@
package com.ycwl.basic.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
@EnableScheduling
public class SchedulerConfig {
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(256);
scheduler.setThreadNamePrefix("Scheduler-");
return scheduler;
}
}

View File

@ -17,6 +17,8 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@IgnoreToken
@RestController
@Api(tags = "渲染端对接接口")
@ -42,7 +44,9 @@ public class TaskTaskController {
@PostMapping("/{taskId}/uploadUrl")
public ApiResponse<String> getUploadUrl(@PathVariable Long taskId, @RequestBody WorkerAuthReqVo req) {
return ApiResponse.success(taskService.getUploadUrl(taskId, req));
String urlForUpload = taskService.getUploadUrl(taskId, req);
urlForUpload = urlForUpload.replace("-internal.aliyuncs.com", ".aliyuncs.com");
return ApiResponse.success(urlForUpload);
}
@PostMapping("/{taskId}/start")

View File

@ -57,7 +57,9 @@ public class VptController {
adapter = StorageFactory.use("video");
}
String filename = StorageUtil.joinPath(StorageConstant.VIDEO_PIECE_PATH, taskId.toString() + ".mp4");
return adapter.getUrlForUpload(new Date(System.currentTimeMillis() + 1000 * 60 * 60), "video/mp4", filename);
String urlForUpload = adapter.getUrlForUpload(new Date(System.currentTimeMillis() + 1000 * 60 * 60), "video/mp4", filename);
urlForUpload = urlForUpload.replace("-internal.aliyuncs.com", ".aliyuncs.com");
return urlForUpload;
}
@PostMapping("/scenic/{scenicId}/{taskId}/success")
public ApiResponse<String> success(@PathVariable("scenicId") Long scenicId, @PathVariable("taskId") Long taskId, @RequestBody FileObject fileObject) {

View File

@ -59,7 +59,9 @@ public class WvpController {
adapter = StorageFactory.use("video");
}
String filename = StorageUtil.joinPath(StorageConstant.VIDEO_PIECE_PATH, taskId.toString() + ".mp4");
return adapter.getUrlForUpload(new Date(System.currentTimeMillis() + 1000 * 60 * 60), "video/mp4", filename);
String urlForUpload = adapter.getUrlForUpload(new Date(System.currentTimeMillis() + 1000 * 60 * 60), "video/mp4", filename);
urlForUpload = urlForUpload.replace("-internal.aliyuncs.com", ".aliyuncs.com");
return urlForUpload;
}
@PostMapping("/scenic/{scenicId}/{taskId}/success")
public ApiResponse<String> success(@PathVariable("scenicId") Long scenicId, @PathVariable("taskId") Long taskId, @RequestBody FileObject fileObject) {

View File

@ -15,6 +15,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
@Slf4j
@ -28,7 +29,7 @@ public class VptPassiveStorageOperator extends ADeviceStorageOperator {
public Date endTime;
}
private static List<Task> taskList = Collections.synchronizedList(new ArrayList<>());
private static List<Task> taskList = new CopyOnWriteArrayList<>();
private static ConcurrentHashMap<Long, FileObject> fileListMap = new ConcurrentHashMap<>();
private VptPassiveStorageConfig config;
@ -37,10 +38,6 @@ public class VptPassiveStorageOperator extends ADeviceStorageOperator {
loadConfig(configJson);
}
public static String getUrlForTask(Long taskId) {
return StorageUtil.joinPath("video-source", taskId.toString() + ".mp4");
}
public static void onReceiveResult(Long taskId, FileObject fileObject) {
if (fileObject == null) {
log.info("任务{}获取视频失败!", taskId);
@ -81,6 +78,7 @@ public class VptPassiveStorageOperator extends ADeviceStorageOperator {
task.startTime = startDate;
task.endTime = endDate;
taskList.add(task);
log.info("任务{}获取视频开始!共{}", task.taskId, taskList.size());
Date taskStartTime = new Date();
while (true) {
if (new Date().getTime() - taskStartTime.getTime() > 80000L) {

View File

@ -13,6 +13,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
@Slf4j
@ -26,7 +27,7 @@ public class WvpPassiveStorageOperator extends ADeviceStorageOperator {
public Date endTime;
}
private static List<Task> taskList = Collections.synchronizedList(new ArrayList<>());
private static List<Task> taskList = new CopyOnWriteArrayList<>();
private static ConcurrentHashMap<Long, FileObject> fileListMap = new ConcurrentHashMap<>();
private WvpPassiveStorageConfig config;
@ -35,10 +36,6 @@ public class WvpPassiveStorageOperator extends ADeviceStorageOperator {
loadConfig(configJson);
}
public static String getUrlForTask(Long taskId) {
return StorageUtil.joinPath("video-source", taskId.toString() + ".mp4");
}
public static void onReceiveResult(Long taskId, FileObject fileObject) {
if (fileObject == null) {
log.info("任务{}获取视频失败!", taskId);

View File

@ -35,7 +35,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@ -78,19 +78,29 @@ public class VideoPieceGetter {
String outputFile;
}
public static LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>();
public static ConcurrentLinkedQueue<Task> queue = new ConcurrentLinkedQueue<>();
public static void addTask(Task task) {
queue.add(task);
}
@Scheduled(fixedRate = 2000L)
@Scheduled(fixedRate = 200L)
public void doTask() {
Task task = queue.poll();
if (task == null) {
return;
}
log.info("poll task: {}", task);
log.info("poll task: {}/{}", task, queue.size());
new Thread(() -> {
try {
runTask(task);
} catch (Exception e) {
log.error("run task error", e);
}
}).start();
}
private void runTask(Task task) {
List<String> templatePlaceholder;
if (null != task.getTemplateId()) {
templatePlaceholder = templateRepository.getTemplatePlaceholder(task.getTemplateId());