This commit is contained in:
2026-03-12 15:56:06 +08:00
parent f8f0e92723
commit 030d8e9e6d
4 changed files with 174 additions and 0 deletions

View File

@@ -75,3 +75,17 @@ func OssUpload(ctx context.Context, url, filePath string) error {
span.SetStatus(codes.Ok, "上传成功") span.SetStatus(codes.Ok, "上传成功")
return nil return nil
} }
func UploadPreviewFile(ctx context.Context, taskId string, previewFilePath string) error {
subCtx, span := tracer.Start(ctx, "UploadPreviewFile")
defer span.End()
url, err := QueryPreviewUploadUrl(subCtx, taskId)
if err != nil {
return err
}
logger.Info("开始上传预览文件", zap.String("url", url))
if err := OssUpload(subCtx, url, previewFilePath); err != nil {
return err
}
return nil
}

View File

@@ -126,3 +126,71 @@ func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject)
return false return false
} }
} }
func QueryPreviewUploadUrl(ctx context.Context, taskId string) (string, error) {
_, span := tracer.Start(ctx, "QueryPreviewUploadUrl")
defer span.End()
url := config.Config.Api.BaseUrl + "/" + taskId + "/previewUploadUrl"
span.SetAttributes(attribute.String("http.url", url))
span.SetAttributes(attribute.String("http.method", "GET"))
req, err := http.NewRequest("GET", url, nil)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建请求失败")
logger.Error("创建请求失败", zap.Error(err))
return "", err
}
resp, err := GetAPIClient().Do(req)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败")
logger.Error("发送请求失败", zap.Error(err))
return "", err
}
span.SetAttributes(attribute.String("http.status", resp.Status))
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "读取响应体失败")
logger.Error("读取响应体失败", zap.Error(err))
return "", err
}
return string(body), nil
}
func ReportPreviewSuccess(ctx context.Context, taskId string) bool {
_, span := tracer.Start(ctx, "ReportPreviewSuccess")
defer span.End()
url := config.Config.Api.BaseUrl + "/" + taskId + "/previewSuccess"
span.SetAttributes(attribute.String("http.url", url))
span.SetAttributes(attribute.String("http.method", "POST"))
req, err := http.NewRequest("POST", url, nil)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建请求失败")
logger.Error("创建请求失败", zap.Error(err))
return false
}
resp, err := GetAPIClient().Do(req)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败")
logger.Error("发送请求失败", zap.Error(err))
return false
}
defer resp.Body.Close()
span.SetAttributes(attribute.String("http.status", resp.Status))
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
if resp.StatusCode == 200 {
span.SetStatus(codes.Ok, "成功")
return true
} else {
span.SetStatus(codes.Error, "失败")
return false
}
}

72
main.go
View File

@@ -7,13 +7,16 @@ import (
"ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger" "ZhenTuLocalPassiveAdapter/logger"
"ZhenTuLocalPassiveAdapter/telemetry" "ZhenTuLocalPassiveAdapter/telemetry"
"ZhenTuLocalPassiveAdapter/util"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
"runtime" "runtime"
"strings"
"syscall" "syscall"
"time" "time"
@@ -48,6 +51,16 @@ func startTask(device config.DeviceMapping, task dto.Task) {
logger.Info("处理任务成功", logger.Info("处理任务成功",
zap.String("taskID", task.TaskID), zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo)) zap.String("deviceNo", task.DeviceNo))
// 复制主文件用于异步生成预览,避免阻塞主上传流程
copyFile := strings.TrimSuffix(fo.URL, ".mp4") + "_copy.mp4"
copyErr := copyFileOnDisk(fo.URL, copyFile)
if copyErr != nil {
logger.Warn("复制文件用于预览失败",
zap.String("taskID", task.TaskID),
zap.Error(copyErr))
}
err = api.UploadTaskFile(ctx, task, *fo) err = api.UploadTaskFile(ctx, task, *fo)
if err != nil { if err != nil {
span.SetStatus(codes.Error, "上传文件失败") span.SetStatus(codes.Error, "上传文件失败")
@@ -56,6 +69,9 @@ func startTask(device config.DeviceMapping, task dto.Task) {
zap.String("deviceNo", task.DeviceNo), zap.String("deviceNo", task.DeviceNo),
zap.Error(err)) zap.Error(err))
api.ReportTaskFailure(ctx, task.TaskID) api.ReportTaskFailure(ctx, task.TaskID)
if copyErr == nil {
os.Remove(copyFile)
}
return return
} }
result := api.ReportTaskSuccess(ctx, task.TaskID, fo) result := api.ReportTaskSuccess(ctx, task.TaskID, fo)
@@ -65,12 +81,68 @@ func startTask(device config.DeviceMapping, task dto.Task) {
zap.String("taskID", task.TaskID), zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo), zap.String("deviceNo", task.DeviceNo),
zap.Error(err)) zap.Error(err))
if copyErr == nil {
os.Remove(copyFile)
}
return return
} }
span.SetStatus(codes.Ok, "上传文件成功") span.SetStatus(codes.Ok, "上传文件成功")
logger.Info("上传文件成功", logger.Info("上传文件成功",
zap.String("taskID", task.TaskID), zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo)) zap.String("deviceNo", task.DeviceNo))
// 异步:从副本压缩720p预览 → 上传 → 上报
if copyErr == nil {
go uploadPreview(task.TaskID, copyFile)
}
}
func uploadPreview(taskID string, copyFile string) {
ctx, span := tracer.Start(context.Background(), "uploadPreview")
defer span.End()
defer os.Remove(copyFile)
previewFile := strings.TrimSuffix(copyFile, "_copy.mp4") + "_preview.mp4"
ok, _ := util.CompressTo720p(ctx, copyFile, previewFile)
if !ok {
span.SetStatus(codes.Error, "生成预览文件失败")
logger.Error("生成预览文件失败", zap.String("taskID", taskID))
return
}
err := api.UploadPreviewFile(ctx, taskID, previewFile)
if err != nil {
span.SetStatus(codes.Error, "上传预览文件失败")
logger.Error("上传预览文件失败",
zap.String("taskID", taskID),
zap.Error(err))
os.Remove(previewFile)
return
}
result := api.ReportPreviewSuccess(ctx, taskID)
if !result {
span.SetStatus(codes.Error, "上报预览成功失败")
logger.Error("上报预览成功失败",
zap.String("taskID", taskID))
return
}
span.SetStatus(codes.Ok, "预览上传成功")
logger.Info("预览上传成功", zap.String("taskID", taskID))
}
func copyFileOnDisk(src, dst string) error {
in, err := os.Open(src)
if err != nil {
return err
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return err
}
defer out.Close()
_, err = io.Copy(out, in)
return err
} }
func executeDisconnectCommand(command string) { func executeDisconnectCommand(command string) {

View File

@@ -470,6 +470,26 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
return true, nil return true, nil
} }
func CompressTo720p(ctx context.Context, inputFile, outputFile string) (bool, error) {
subCtx, span := tracer.Start(ctx, "CompressTo720p")
defer span.End()
ffmpegCmd := []string{
FfmpegExec,
"-hide_banner",
"-y",
"-i", inputFile,
"-vf", "scale=-2:720",
"-c:v", "libx264",
"-preset", "fast",
"-crf", "28",
"-c:a", "aac",
"-b:a", "128k",
"-f", "mp4",
outputFile,
}
return handleFfmpegProcess(subCtx, ffmpegCmd)
}
func GetVideoCodec(ctx context.Context, filePath string) (string, error) { func GetVideoCodec(ctx context.Context, filePath string) (string, error) {
_, span := tracer.Start(ctx, "GetVideoCodec") _, span := tracer.Start(ctx, "GetVideoCodec")
defer span.End() defer span.End()