From 030d8e9e6de83f88398c436a96e1345463c0d69b Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Thu, 12 Mar 2026 15:56:06 +0800 Subject: [PATCH] 720p --- api/oss_upload.go | 14 +++++++++ api/task_report.go | 68 +++++++++++++++++++++++++++++++++++++++++++ main.go | 72 ++++++++++++++++++++++++++++++++++++++++++++++ util/ffmpeg.go | 20 +++++++++++++ 4 files changed, 174 insertions(+) diff --git a/api/oss_upload.go b/api/oss_upload.go index f5e330c..ff5b6d8 100644 --- a/api/oss_upload.go +++ b/api/oss_upload.go @@ -75,3 +75,17 @@ func OssUpload(ctx context.Context, url, filePath string) error { span.SetStatus(codes.Ok, "上传成功") return nil } + +func UploadPreviewFile(ctx context.Context, taskId string, previewFilePath string) error { + subCtx, span := tracer.Start(ctx, "UploadPreviewFile") + defer span.End() + url, err := QueryPreviewUploadUrl(subCtx, taskId) + if err != nil { + return err + } + logger.Info("开始上传预览文件", zap.String("url", url)) + if err := OssUpload(subCtx, url, previewFilePath); err != nil { + return err + } + return nil +} diff --git a/api/task_report.go b/api/task_report.go index 1b5cf51..4a652fc 100644 --- a/api/task_report.go +++ b/api/task_report.go @@ -126,3 +126,71 @@ func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject) return false } } + +func QueryPreviewUploadUrl(ctx context.Context, taskId string) (string, error) { + _, span := tracer.Start(ctx, "QueryPreviewUploadUrl") + defer span.End() + url := config.Config.Api.BaseUrl + "/" + taskId + "/previewUploadUrl" + span.SetAttributes(attribute.String("http.url", url)) + span.SetAttributes(attribute.String("http.method", "GET")) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "创建请求失败") + logger.Error("创建请求失败", zap.Error(err)) + return "", err + } + resp, err := GetAPIClient().Do(req) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "发送请求失败") + logger.Error("发送请求失败", zap.Error(err)) + return "", err + } + span.SetAttributes(attribute.String("http.status", resp.Status)) + span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode)) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "读取响应体失败") + logger.Error("读取响应体失败", zap.Error(err)) + return "", err + } + return string(body), nil +} + +func ReportPreviewSuccess(ctx context.Context, taskId string) bool { + _, span := tracer.Start(ctx, "ReportPreviewSuccess") + defer span.End() + url := config.Config.Api.BaseUrl + "/" + taskId + "/previewSuccess" + span.SetAttributes(attribute.String("http.url", url)) + span.SetAttributes(attribute.String("http.method", "POST")) + + req, err := http.NewRequest("POST", url, nil) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "创建请求失败") + logger.Error("创建请求失败", zap.Error(err)) + return false + } + + resp, err := GetAPIClient().Do(req) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "发送请求失败") + logger.Error("发送请求失败", zap.Error(err)) + return false + } + defer resp.Body.Close() + + span.SetAttributes(attribute.String("http.status", resp.Status)) + span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode)) + if resp.StatusCode == 200 { + span.SetStatus(codes.Ok, "成功") + return true + } else { + span.SetStatus(codes.Error, "失败") + return false + } +} diff --git a/main.go b/main.go index f87a210..02e18e9 100644 --- a/main.go +++ b/main.go @@ -7,13 +7,16 @@ import ( "ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/logger" "ZhenTuLocalPassiveAdapter/telemetry" + "ZhenTuLocalPassiveAdapter/util" "context" "errors" "fmt" + "io" "os" "os/exec" "os/signal" "runtime" + "strings" "syscall" "time" @@ -48,6 +51,16 @@ func startTask(device config.DeviceMapping, task dto.Task) { logger.Info("处理任务成功", zap.String("taskID", task.TaskID), zap.String("deviceNo", task.DeviceNo)) + + // 复制主文件用于异步生成预览,避免阻塞主上传流程 + copyFile := strings.TrimSuffix(fo.URL, ".mp4") + "_copy.mp4" + copyErr := copyFileOnDisk(fo.URL, copyFile) + if copyErr != nil { + logger.Warn("复制文件用于预览失败", + zap.String("taskID", task.TaskID), + zap.Error(copyErr)) + } + err = api.UploadTaskFile(ctx, task, *fo) if err != nil { span.SetStatus(codes.Error, "上传文件失败") @@ -56,6 +69,9 @@ func startTask(device config.DeviceMapping, task dto.Task) { zap.String("deviceNo", task.DeviceNo), zap.Error(err)) api.ReportTaskFailure(ctx, task.TaskID) + if copyErr == nil { + os.Remove(copyFile) + } return } result := api.ReportTaskSuccess(ctx, task.TaskID, fo) @@ -65,12 +81,68 @@ func startTask(device config.DeviceMapping, task dto.Task) { zap.String("taskID", task.TaskID), zap.String("deviceNo", task.DeviceNo), zap.Error(err)) + if copyErr == nil { + os.Remove(copyFile) + } return } span.SetStatus(codes.Ok, "上传文件成功") logger.Info("上传文件成功", zap.String("taskID", task.TaskID), zap.String("deviceNo", task.DeviceNo)) + + // 异步:从副本压缩720p预览 → 上传 → 上报 + if copyErr == nil { + go uploadPreview(task.TaskID, copyFile) + } +} + +func uploadPreview(taskID string, copyFile string) { + ctx, span := tracer.Start(context.Background(), "uploadPreview") + defer span.End() + defer os.Remove(copyFile) + + previewFile := strings.TrimSuffix(copyFile, "_copy.mp4") + "_preview.mp4" + ok, _ := util.CompressTo720p(ctx, copyFile, previewFile) + if !ok { + span.SetStatus(codes.Error, "生成预览文件失败") + logger.Error("生成预览文件失败", zap.String("taskID", taskID)) + return + } + + err := api.UploadPreviewFile(ctx, taskID, previewFile) + if err != nil { + span.SetStatus(codes.Error, "上传预览文件失败") + logger.Error("上传预览文件失败", + zap.String("taskID", taskID), + zap.Error(err)) + os.Remove(previewFile) + return + } + result := api.ReportPreviewSuccess(ctx, taskID) + if !result { + span.SetStatus(codes.Error, "上报预览成功失败") + logger.Error("上报预览成功失败", + zap.String("taskID", taskID)) + return + } + span.SetStatus(codes.Ok, "预览上传成功") + logger.Info("预览上传成功", zap.String("taskID", taskID)) +} + +func copyFileOnDisk(src, dst string) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer in.Close() + out, err := os.Create(dst) + if err != nil { + return err + } + defer out.Close() + _, err = io.Copy(out, in) + return err } func executeDisconnectCommand(command string) { diff --git a/util/ffmpeg.go b/util/ffmpeg.go index 629b4dd..e405f42 100644 --- a/util/ffmpeg.go +++ b/util/ffmpeg.go @@ -470,6 +470,26 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error) return true, nil } +func CompressTo720p(ctx context.Context, inputFile, outputFile string) (bool, error) { + subCtx, span := tracer.Start(ctx, "CompressTo720p") + defer span.End() + ffmpegCmd := []string{ + FfmpegExec, + "-hide_banner", + "-y", + "-i", inputFile, + "-vf", "scale=-2:720", + "-c:v", "libx264", + "-preset", "fast", + "-crf", "28", + "-c:a", "aac", + "-b:a", "128k", + "-f", "mp4", + outputFile, + } + return handleFfmpegProcess(subCtx, ffmpegCmd) +} + func GetVideoCodec(ctx context.Context, filePath string) (string, error) { _, span := tracer.Start(ctx, "GetVideoCodec") defer span.End()