From ba4aad0ae5742024125a243cd4a63b21a2651715 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Fri, 7 Feb 2025 22:58:01 +0800 Subject: [PATCH] Initial --- .gitignore | 2 + api/oss_upload.go | 51 +++++++ api/sync_task.go | 57 ++++++++ api/task_report.go | 72 ++++++++++ config.yaml | 17 +++ config/dto.go | 34 +++++ config/service.go | 27 ++++ core/task.go | 35 +++++ dto/common_response.go | 13 ++ dto/ffmpeg.go | 8 ++ dto/file.go | 16 +++ dto/task.go | 62 ++++++++ fs/adapter.go | 16 +++ fs/local_adapter.go | 70 +++++++++ go.mod | 26 ++++ go.sum | 54 +++++++ main.go | 72 ++++++++++ util/ffmpeg.go | 297 +++++++++++++++++++++++++++++++++++++++ util/file_filter.go | 41 ++++++ util/file_filter_test.go | 26 ++++ util/file_recognizer.go | 10 ++ util/parse_date.go | 16 +++ util/parse_time.go | 39 +++++ 23 files changed, 1061 insertions(+) create mode 100644 .gitignore create mode 100644 api/oss_upload.go create mode 100644 api/sync_task.go create mode 100644 api/task_report.go create mode 100644 config.yaml create mode 100644 config/dto.go create mode 100644 config/service.go create mode 100644 core/task.go create mode 100644 dto/common_response.go create mode 100644 dto/ffmpeg.go create mode 100644 dto/file.go create mode 100644 dto/task.go create mode 100644 fs/adapter.go create mode 100644 fs/local_adapter.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 util/ffmpeg.go create mode 100644 util/file_filter.go create mode 100644 util/file_filter_test.go create mode 100644 util/file_recognizer.go create mode 100644 util/parse_date.go create mode 100644 util/parse_time.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7198016 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +dist/ +.idea/ \ No newline at end of file diff --git a/api/oss_upload.go b/api/oss_upload.go new file mode 100644 index 0000000..9c11ba8 --- /dev/null +++ b/api/oss_upload.go @@ -0,0 +1,51 @@ +package api + +import ( + "ZhenTuLocalPassiveAdapter/dto" + "bytes" + "fmt" + "io" + "net/http" + "os" +) + +func UploadTaskFile(task dto.Task, file dto.FileObject) error { + url, err := QueryUploadUrlForTask(task.TaskID) + if err != nil { + return err + } + if err := OssUpload(url, file.URL); err != nil { + return err + } + return nil +} + +func OssUpload(url, filePath string) error { + // 使用 http put 请求上传文件 + file, err := os.Open(filePath) + defer os.Remove(filePath) + defer file.Close() + if err != nil { + return err + } + fileBytes, err := io.ReadAll(file) + if err != nil { + return err + } + + req, err := http.NewRequest("PUT", url, bytes.NewBuffer(fileBytes)) + if err != nil { + return err + } + req.Header.Set("Content-Length", fmt.Sprintf("%d", len(fileBytes))) + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("upload failed with status code %d", resp.StatusCode) + } + return nil +} diff --git a/api/sync_task.go b/api/sync_task.go new file mode 100644 index 0000000..d11f8c2 --- /dev/null +++ b/api/sync_task.go @@ -0,0 +1,57 @@ +package api + +import ( + "ZhenTuLocalPassiveAdapter/config" + "ZhenTuLocalPassiveAdapter/dto" + "bytes" + "encoding/json" + "fmt" + "io" + "log" + "net/http" +) + +func SyncTask() ([]dto.Task, error) { + url := config.Config.Api.BaseUrl + "/sync" + requestBody := map[string]interface{}{ + "version": "0.0.1", + "devices": config.Config.Devices, + } + jsonData, err := json.Marshal(requestBody) + if err != nil { + log.Println("Error marshaling JSON:", err) + return nil, err + } + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + log.Println("Error creating request:", err) + return nil, err + } + req.Header.Set("Content-Type", "application/json") + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + log.Println("Error sending request:", err) + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Println("Error reading response body:", err) + return nil, err + } + + // 解析响应体为 map + var response dto.TaskListResponse + err = json.Unmarshal(body, &response) + if err != nil { + log.Println("->:", string(body)) + log.Println("Error unmarshaling response body:", err) + return nil, err + } + if response.Code != 200 { + log.Println("Error response code:", response.Code) + return nil, fmt.Errorf(response.Msg) + } + return response.Data, nil +} diff --git a/api/task_report.go b/api/task_report.go new file mode 100644 index 0000000..a034acd --- /dev/null +++ b/api/task_report.go @@ -0,0 +1,72 @@ +package api + +import ( + "ZhenTuLocalPassiveAdapter/config" + "ZhenTuLocalPassiveAdapter/dto" + "bytes" + "encoding/json" + "io" + "log" + "net/http" +) + +func QueryUploadUrlForTask(taskId string) (string, error) { + url := config.Config.Api.BaseUrl + "/" + taskId + "/uploadUrl" + req, err := http.NewRequest("POST", url, nil) + if err != nil { + log.Println("Error creating request:", err) + return "", err + } + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + log.Println("Error sending request:", err) + return "", err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Println("Error reading response body:", err) + return "", err + } + return string(body), nil +} + +func ReportTaskFailure(taskId string) { + url := config.Config.Api.BaseUrl + "/" + taskId + "/failure" + + req, err := http.NewRequest("POST", url, nil) + if err != nil { + log.Println("Error creating request:", err) + return + } + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + log.Println("Error sending request:", err) + return + } + defer resp.Body.Close() +} + +func ReportTaskSuccess(taskId string, file *dto.FileObject) { + url := config.Config.Api.BaseUrl + "/" + taskId + "/success" + jsonData, err := json.Marshal(file) + if err != nil { + log.Println("Error marshaling JSON:", err) + return + } + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + log.Println("Error creating request:", err) + return + } + req.Header.Set("Content-Type", "application/json") + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + log.Println("Error sending request:", err) + return + } + defer resp.Body.Close() +} diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..e7511b5 --- /dev/null +++ b/config.yaml @@ -0,0 +1,17 @@ +api: +# baseUrl: "https://zhentuai.com/vpt/v1/scenic/3946669713328836608" + baseUrl: "http://127.0.0.1:8030/vpt/v1/scenic/3946669713328836608" +record: + storage: + type: "local" + path: "/root/opt/" + duration: 60 +devices: + - deviceNo: "34020000001322200001" + name: "192.168.55.201" + path: "/root/opt/34020000001322200001/" +fileName: + timeSplit: "-" + dateSeparator: "-" + fileExt: "dav" + unFinExt: "dav_" \ No newline at end of file diff --git a/config/dto.go b/config/dto.go new file mode 100644 index 0000000..8c31540 --- /dev/null +++ b/config/dto.go @@ -0,0 +1,34 @@ +package config + +type ApiConfig struct { + BaseUrl string `mapstructure:"baseUrl"` +} + +type RecordConfig struct { + Storage StorageConfig `mapstructure:"storage"` + Duration int `mapstructure:"duration"` +} + +type StorageConfig struct { + Type string `mapstructure:"type"` + Path string `mapstructure:"path"` +} + +type DeviceMapping struct { + DeviceNo string `mapstructure:"deviceNo" json:"deviceNo"` + Name string `mapstructure:"name" json:"name"` +} + +type FileNameConfig struct { + DateSeparator string `mapstructure:"dateSeparator"` + TimeSplit string `mapstructure:"timeSplit"` + FileExt string `mapstructure:"fileExt"` + UnfinishedFileExt string `mapstructure:"unFinExt"` +} + +type MainConfig struct { + Api ApiConfig `mapstructure:"api"` + Record RecordConfig `mapstructure:"record"` + Devices []DeviceMapping `mapstructure:"devices"` + FileName FileNameConfig `mapstructure:"fileName"` +} diff --git a/config/service.go b/config/service.go new file mode 100644 index 0000000..1d0366d --- /dev/null +++ b/config/service.go @@ -0,0 +1,27 @@ +package config + +import ( + "github.com/spf13/viper" + "log" +) + +var Config MainConfig + +func LoadConfig() error { + viper.SetConfigName("config") + viper.SetConfigType("yaml") + viper.AddConfigPath(".") + + // 读取配置文件 + if err := viper.ReadInConfig(); err != nil { + log.Fatalf("Error reading config file, %s", err) + return err + } + + // 反序列化配置到结构体 + if err := viper.Unmarshal(&Config); err != nil { + log.Fatalf("Unable to decode into struct, %v", err) + return err + } + return nil +} diff --git a/core/task.go b/core/task.go new file mode 100644 index 0000000..ecf4f1a --- /dev/null +++ b/core/task.go @@ -0,0 +1,35 @@ +package core + +import ( + "ZhenTuLocalPassiveAdapter/config" + "ZhenTuLocalPassiveAdapter/dto" + "ZhenTuLocalPassiveAdapter/fs" + "ZhenTuLocalPassiveAdapter/util" + "fmt" +) + +func HandleTask(device config.DeviceMapping, task dto.Task) (*dto.FileObject, error) { + adapter := fs.GetAdapter() + fileList, err := adapter.GetFileList(device.Name + "/" + task.StartTime.Format("2006"+config.Config.FileName.DateSeparator+"01"+config.Config.FileName.DateSeparator+"02")) + if err != nil { + return nil, err + } + files := util.FilterAndSortFiles(fileList, task.StartTime, task.EndTime) + if len(files) == 0 { + return nil, fmt.Errorf("没有找到文件") + } + constructTask, err := util.CheckFileCoverageAndConstructTask(files, task.StartTime, task.EndTime, task) + if err != nil { + return nil, err + } + ok := util.RunFfmpegTask(constructTask) + if !ok { + return nil, fmt.Errorf("ffmpeg任务执行失败") + } + return &dto.FileObject{ + CreateTime: task.EndTime, + EndTime: task.EndTime, + NeedDownload: true, + URL: constructTask.OutputFile, + }, nil +} diff --git a/dto/common_response.go b/dto/common_response.go new file mode 100644 index 0000000..4d3edf2 --- /dev/null +++ b/dto/common_response.go @@ -0,0 +1,13 @@ +package dto + +type ApiResponse struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data interface{} `json:"data"` +} + +type TaskListResponse struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data []Task `json:"data"` +} diff --git a/dto/ffmpeg.go b/dto/ffmpeg.go new file mode 100644 index 0000000..1d61f40 --- /dev/null +++ b/dto/ffmpeg.go @@ -0,0 +1,8 @@ +package dto + +type FfmpegTask struct { + Files []File + Length int + Offset int + OutputFile string +} diff --git a/dto/file.go b/dto/file.go new file mode 100644 index 0000000..ee12bb4 --- /dev/null +++ b/dto/file.go @@ -0,0 +1,16 @@ +package dto + +import "time" + +type File struct { + BasePath string `json:"basePath"` + Url string `json:"url"` + Path string `json:"path"` + Name string `json:"name"` + StartTime time.Time `json:"startTime"` + EndTime time.Time `json:"endTime"` +} + +func (f *File) GetDiffMs() int64 { + return f.EndTime.Sub(f.StartTime).Milliseconds() +} diff --git a/dto/task.go b/dto/task.go new file mode 100644 index 0000000..e012bb6 --- /dev/null +++ b/dto/task.go @@ -0,0 +1,62 @@ +package dto + +import ( + "encoding/json" + "time" +) + +// 自定义时间格式 +const customTimeFormat = "2006-01-02 15:04:05" + +func (t *Task) UnmarshalJSON(data []byte) error { + type Alias Task + aux := &struct { + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + *Alias + }{ + Alias: (*Alias)(t), + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + var err error + t.StartTime, err = time.ParseInLocation(customTimeFormat, aux.StartTime, time.Local) + if err != nil { + return err + } + t.EndTime, err = time.ParseInLocation(customTimeFormat, aux.EndTime, time.Local) + if err != nil { + return err + } + return nil +} + +type Task struct { + TaskID string `json:"taskId"` + ScenicID string `json:"scenicId"` + DeviceID string `json:"deviceId"` + DeviceNo string `json:"deviceNo"` + StartTime time.Time `json:"startTime"` + EndTime time.Time `json:"endTime"` +} + +type FileObject struct { + URL string `json:"url"` + NeedDownload bool `json:"needDownload"` + CreateTime time.Time `json:"createTime"` + EndTime time.Time `json:"endTime"` +} + +func (f *FileObject) MarshalJSON() ([]byte, error) { + type Alias FileObject + return json.Marshal(&struct { + CreateTime string `json:"createTime"` + EndTime string `json:"endTime"` + *Alias + }{ + CreateTime: f.CreateTime.Format(customTimeFormat), + EndTime: f.EndTime.Format(customTimeFormat), + Alias: (*Alias)(f), + }) +} diff --git a/fs/adapter.go b/fs/adapter.go new file mode 100644 index 0000000..ead6d59 --- /dev/null +++ b/fs/adapter.go @@ -0,0 +1,16 @@ +package fs + +import ( + "ZhenTuLocalPassiveAdapter/config" + "ZhenTuLocalPassiveAdapter/dto" +) + +type Adapter interface { + GetFileList(path string) ([]dto.File, error) +} + +func GetAdapter() Adapter { + return &LocalAdapter{ + config.Config.Record.Storage, + } +} diff --git a/fs/local_adapter.go b/fs/local_adapter.go new file mode 100644 index 0000000..e38ff7f --- /dev/null +++ b/fs/local_adapter.go @@ -0,0 +1,70 @@ +package fs + +import ( + "ZhenTuLocalPassiveAdapter/config" + "ZhenTuLocalPassiveAdapter/dto" + "ZhenTuLocalPassiveAdapter/util" + "fmt" + "os" + "sort" + "time" +) + +type LocalAdapter struct { + StorageConfig config.StorageConfig +} + +func (l *LocalAdapter) GetFileList(path string) ([]dto.File, error) { + if l.StorageConfig.Path == "" { + return nil, fmt.Errorf("未配置存储路径") + } + if path == "" { + path = "/" + } + if path[0] != '/' { + path = "/" + path + } + if path[len(path)-1] != '/' { + path = path + "/" + } + // 读取文件夹下目录 + files, err := os.ReadDir(l.StorageConfig.Path + path) + if err != nil { + return nil, err + } + + var fileList []dto.File + for _, file := range files { + if file.IsDir() { + continue + } + if !util.IsVideoFile(file.Name()) { + continue + } + info, err := file.Info() + if err != nil { + continue + } + // TODO: 0点左右会出问题 + relDt := info.ModTime() + startTime, stopTime, err := util.ParseStartStopTime(info.Name(), relDt) + if err != nil { + continue + } + if startTime.Equal(stopTime) { + stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration)) + } + fileList = append(fileList, dto.File{ + BasePath: l.StorageConfig.Path, + Name: file.Name(), + Path: path, + Url: fmt.Sprintf("%s%s%s", l.StorageConfig.Path, path, file.Name()), + StartTime: startTime, + EndTime: stopTime, + }) + sort.Slice(fileList, func(i, j int) bool { + return fileList[i].StartTime.Before(fileList[j].StartTime) + }) + } + return fileList, nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9792e00 --- /dev/null +++ b/go.mod @@ -0,0 +1,26 @@ +module ZhenTuLocalPassiveAdapter + +go 1.23 + +require ( + github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/hashicorp/hcl v1.0.0 // indirect + github.com/magiconair/properties v1.8.7 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/sagikazarmark/locafero v0.4.0 // indirect + github.com/sagikazarmark/slog-shim v0.1.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect + github.com/spf13/afero v1.11.0 // indirect + github.com/spf13/cast v1.6.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/spf13/viper v1.19.0 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + go.uber.org/atomic v1.9.0 // indirect + go.uber.org/multierr v1.9.0 // indirect + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect + golang.org/x/sys v0.18.0 // indirect + golang.org/x/text v0.14.0 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2b668ad --- /dev/null +++ b/go.sum @@ -0,0 +1,54 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= +github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= +github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= +github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= +github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= +github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= +github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= +github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= +github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= +github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= +github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..57d4360 --- /dev/null +++ b/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "ZhenTuLocalPassiveAdapter/api" + "ZhenTuLocalPassiveAdapter/config" + "ZhenTuLocalPassiveAdapter/core" + "ZhenTuLocalPassiveAdapter/dto" + "log" + "os" + "time" +) + +func startTask(device config.DeviceMapping, task dto.Task) { + fo, err := core.HandleTask(device, task) + if err != nil { + log.Printf("处理任务失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err) + api.ReportTaskFailure(task.TaskID) + return + } + log.Printf("处理任务成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo) + err = api.UploadTaskFile(task, *fo) + if err != nil { + log.Printf("上传文件失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err) + api.ReportTaskFailure(task.TaskID) + return + } + log.Printf("上传文件成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo) + api.ReportTaskSuccess(task.TaskID, fo) +} + +func main() { + err := config.LoadConfig() + if err != nil { + log.Println("加载配置文件失败:", err) + return + } + // 日志文件路径 + logFilePath := "app.log" + + // 创建或打开日志文件 + logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + log.Fatalf("Failed to open log file: %v", err) + } + defer logFile.Close() + + // 设置日志输出到文件 + log.SetOutput(logFile) + // 每两秒定时执行 + for { + // 执行任务 + tasks, err := api.SyncTask() + if err == nil { + for _, task := range tasks { + log.Printf("开始处理任务, TaskID:【%s】,DeviceNo: %s,开始时间: %s,结束时间: %s\n", task.TaskID, task.DeviceNo, task.StartTime, task.EndTime) + // 处理任务 + for _, device := range config.Config.Devices { + if device.DeviceNo == task.DeviceNo { + // 处理任务 + go startTask(device, task) + break // 提前返回,避免不必要的循环 + } + } + log.Printf("处理任务结束, TaskID:【%s】,DeviceNo: %s\n", task.TaskID, task.DeviceNo) + } + } else { + log.Println("同步任务失败:", err) + } + // 等待两秒 + <-time.After(2 * time.Second) + } +} diff --git a/util/ffmpeg.go b/util/ffmpeg.go new file mode 100644 index 0000000..15d8538 --- /dev/null +++ b/util/ffmpeg.go @@ -0,0 +1,297 @@ +package util + +import ( + "ZhenTuLocalPassiveAdapter/config" + "ZhenTuLocalPassiveAdapter/dto" + "bytes" + "fmt" + "log" + "math/rand" + "os" + "os/exec" + "strconv" + "strings" + "sync" + "time" +) + +const FfmpegExec = "ffmpeg" + +func RunFfmpegTask(task *dto.FfmpegTask) bool { + var result bool + if len(task.Files) == 1 { + // 单个文件切割,用简单方法 + result = runFfmpegForSingleFile(task) + } else { + // 多个文件切割,用速度快的 + result = runFfmpegForMultipleFile1(task) + } + // 先尝试方法1 + if result { + return true + } + log.Printf("FFMPEG简易方法失败,尝试复杂方法转码") + // 不行再尝试方法二 + return runFfmpegForMultipleFile2(task) +} + +func runFfmpegForMultipleFile1(task *dto.FfmpegTask) bool { + // 多文件,方法一:先转换成ts,然后合并切割 + // 步骤一:先转换成ts,并行转换 + var wg sync.WaitGroup + var mu sync.Mutex + var notOk bool + + for i := range task.Files { + wg.Add(1) + go func(file *dto.File) { + defer wg.Done() + tmpFile := os.TempDir() + "/" + file.Name + ".ts" + result, err := convertMp4ToTs(*file, tmpFile) + if err != nil { + log.Printf("转码出错: %v", err) + mu.Lock() + notOk = true + mu.Unlock() + return + } + if result { + mu.Lock() + file.Url = tmpFile + mu.Unlock() + } else { + // 失败了,务必删除临时文件 + os.Remove(tmpFile) + } + }(&task.Files[i]) + } + + wg.Wait() + + if notOk { + return false + } + + // 步骤二:使用concat协议拼接裁切 + result, err := QuickConcatVideoCut(task.Files, int64(task.Offset), int64(task.Length), task.OutputFile) + if err != nil { + return false + } + + // 步骤三:删除临时文件 + for _, file := range task.Files { + if err := os.Remove(file.Url); err != nil { + log.Printf("删除临时文件失败: %v", err) + } + } + + return result +} + +func runFfmpegForMultipleFile2(task *dto.FfmpegTask) bool { + // 多文件,方法二:使用计算资源编码 + result, err := SlowVideoCut(task.Files, int64(task.Offset), int64(task.Length), task.OutputFile) + if err != nil { + return false + } + return result +} + +func runFfmpegForSingleFile(task *dto.FfmpegTask) bool { + result, err := QuickVideoCut(task.Files[0].Url, int64(task.Offset), int64(task.Length), task.OutputFile) + if err != nil { + return false + } + return result +} + +func CheckFileCoverageAndConstructTask(fileList []dto.File, beginDt, endDt time.Time, task dto.Task) (*dto.FfmpegTask, error) { + if fileList == nil || len(fileList) == 0 { + log.Printf("无法根据要求找到对应录制片段!ID:【%s】,开始时间:【%s】,结束时间:【%s】", task.TaskID, beginDt, endDt) + return nil, fmt.Errorf("无法根据要求找到对应录制片段") + } + + // 如果片段在中间断开时间过长 + if len(fileList) > 1 { + var lastFile *dto.File + for _, file := range fileList { + if lastFile == nil { + lastFile = &file + continue + } + if file.StartTime.Sub(lastFile.EndTime).Milliseconds() > 2000 { + // 片段断开 + log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s,%s】中间断开【%f】秒(超过2秒)", task.TaskID, lastFile.Name, file.Name, file.StartTime.Sub(lastFile.EndTime).Seconds()) + return nil, fmt.Errorf("片段断开") + } + lastFile = &file + } + } + + // 通过文件列表构造的任务仍然是缺失的 + if fileList[len(fileList)-1].EndTime.Before(endDt) { + log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s】,无法完整覆盖时间点【%s】", task.TaskID, fileList[len(fileList)-1].Name, endDt) + return nil, fmt.Errorf("片段断开") + } + + // 构造FfmpegTaskPo + ffmpegTask := &dto.FfmpegTask{ + Files: fileList, + Length: int(endDt.Sub(beginDt).Seconds()), + Offset: int(beginDt.Sub(fileList[0].StartTime).Seconds()), + OutputFile: os.TempDir() + "/" + task.TaskID + ".mp4", + } + + if ffmpegTask.Offset > (config.Config.Record.Duration) { + log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s】,无法完整覆盖时间点【%s】", task.TaskID, fileList[len(fileList)-1].Name, endDt) + return nil, fmt.Errorf("无法完整覆盖时间点") + } + + return ffmpegTask, nil +} + +func convertMp4ToTs(file dto.File, outFileName string) (bool, error) { + ffmpegCmd := []string{ + FfmpegExec, + "-hide_banner", + "-y", + "-i", file.Url, + "-c", "copy", + "-bsf:v", "h264_mp4toannexb", + "-f", "mpegts", + outFileName, + } + return handleFfmpegProcess(ffmpegCmd) +} + +func convertHevcToTs(file dto.File, outFileName string) (bool, error) { + ffmpegCmd := []string{ + FfmpegExec, + "-hide_banner", + "-y", + "-i", file.Url, + "-c", "copy", + "-bsf:v", "hevc_mp4toannexb", + "-f", "mpegts", + outFileName, + } + return handleFfmpegProcess(ffmpegCmd) +} + +func QuickVideoCut(inputFile string, offset, length int64, outputFile string) (bool, error) { + ffmpegCmd := []string{ + FfmpegExec, + "-hide_banner", + "-y", + "-ss", strconv.FormatInt(offset, 10), + "-i", inputFile, + "-c:v", "copy", + "-an", + "-t", strconv.FormatInt(length, 10), + "-f", "mp4", + outputFile, + } + return handleFfmpegProcess(ffmpegCmd) +} + +func QuickConcatVideoCut(inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) { + tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64()) + tmpFileObj, err := os.Create(tmpFile) + if err != nil { + log.Printf("创建临时文件失败:%s", tmpFile) + return false, err + } + defer os.Remove(tmpFile) + defer tmpFileObj.Close() + + for _, filePo := range inputFiles { + _, err := tmpFileObj.WriteString(fmt.Sprintf("file '%s'\n", filePo.Url)) + if err != nil { + log.Printf("写入临时文件失败:%s", tmpFile) + return false, err + } + } + + ffmpegCmd := []string{ + FfmpegExec, + "-hide_banner", + "-y", + "-f", "concat", + "-safe", "0", + "-i", tmpFile, + "-c:v", "copy", + "-an", + "-ss", strconv.FormatInt(offset, 10), + "-t", strconv.FormatInt(length, 10), + "-f", "mp4", + outputFile, + } + return handleFfmpegProcess(ffmpegCmd) +} + +func SlowVideoCut(inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) { + ffmpegCmd := []string{ + FfmpegExec, + "-hide_banner", + "-y", + } + + for _, file := range inputFiles { + ffmpegCmd = append(ffmpegCmd, "-i", file.Url) + } + + inputCount := len(inputFiles) + filterComplex := strings.Builder{} + for i := 0; i < inputCount; i++ { + filterComplex.WriteString(fmt.Sprintf("[%d:v]", i)) + } + filterComplex.WriteString(fmt.Sprintf("concat=n=%d:v=1[v]", inputCount)) + + ffmpegCmd = append(ffmpegCmd, + "-filter_complex", filterComplex.String(), + "-map", "[v]", + "-preset:v", "fast", + "-an", + "-ss", strconv.FormatInt(offset, 10), + "-t", strconv.FormatInt(length, 10), + "-f", "mp4", + outputFile, + ) + + return handleFfmpegProcess(ffmpegCmd) +} + +func handleFfmpegProcess(ffmpegCmd []string) (bool, error) { + startTime := time.Now() + log.Printf("FFMPEG执行命令:【%s】", strings.Join(ffmpegCmd, " ")) + cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...) + + var stderr bytes.Buffer + cmd.Stderr = &stderr + + err := cmd.Start() + if err != nil { + log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " ")) + return false, err + } + defer cmd.Process.Kill() + + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + select { + case <-time.After(1 * time.Minute): + log.Printf("FFMPEG执行命令没有在1分钟内退出,命令:【%s】", strings.Join(ffmpegCmd, " ")) + return false, fmt.Errorf("ffmpeg command timed out") + case err := <-done: + if err != nil { + log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " ")) + return false, err + } + endTime := time.Now() + log.Printf("FFMPEG执行命令结束,耗费时间:【%dms】,命令:【%s】", endTime.Sub(startTime).Milliseconds(), strings.Join(ffmpegCmd, " ")) + return true, nil + } +} diff --git a/util/file_filter.go b/util/file_filter.go new file mode 100644 index 0000000..91b2aa9 --- /dev/null +++ b/util/file_filter.go @@ -0,0 +1,41 @@ +package util + +import ( + "ZhenTuLocalPassiveAdapter/dto" + "sort" + "time" +) + +func FilterAndSortFiles(fileList []dto.File, beginDt, endDt time.Time) []dto.File { + var filteredFiles []dto.File + + for _, file := range fileList { + fileStartTime := file.StartTime + nextFileStartTime := file.EndTime + + // 如果当前文件还没有开始 + if beginDt.After(fileStartTime) { + // 没有下一个文件的情况下,就是最后一个文件 + if nextFileStartTime.IsZero() { + continue + } + // 但是下一个文件已经开始 + if beginDt.Before(nextFileStartTime) { + filteredFiles = append(filteredFiles, file) + } + // 已经开始,但是也已经结束了 + } else if fileStartTime.After(endDt) { + continue + // 已经开始,但未结束 + } else { + filteredFiles = append(filteredFiles, file) + } + } + + // 按照 GetDiffMs 的值降序排序 + sort.Slice(filteredFiles, func(i, j int) bool { + return filteredFiles[i].GetDiffMs() > filteredFiles[j].GetDiffMs() + }) + + return filteredFiles +} diff --git a/util/file_filter_test.go b/util/file_filter_test.go new file mode 100644 index 0000000..a237ebf --- /dev/null +++ b/util/file_filter_test.go @@ -0,0 +1,26 @@ +package util + +import ( + "ZhenTuLocalPassiveAdapter/dto" + "testing" + "time" +) + +func TestMain(m *testing.M) { + // 示例数据 + fileList := []dto.File{ + {StartTime: time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 1, 0, 0, time.UTC)}, + {StartTime: time.Date(2023, 1, 1, 0, 1, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 2, 0, 0, time.UTC)}, + {StartTime: time.Date(2023, 1, 1, 0, 2, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 3, 0, 0, time.UTC)}, + {StartTime: time.Date(2023, 1, 1, 0, 3, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 4, 0, 0, time.UTC)}, + {StartTime: time.Date(2023, 1, 1, 0, 4, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 5, 0, 0, time.UTC)}, + {StartTime: time.Date(2023, 1, 1, 0, 5, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 6, 0, 0, time.UTC)}, + {StartTime: time.Date(2023, 1, 1, 0, 6, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 7, 0, 0, time.UTC)}, + } + beginDt := time.Date(2023, 1, 1, 0, 2, 10, 0, time.UTC) + endDt := time.Date(2023, 1, 1, 0, 2, 20, 0, time.UTC) + filteredFiles := FilterAndSortFiles(fileList, beginDt, endDt) + for _, file := range filteredFiles { + println(file.StartTime.String(), file.EndTime.String()) + } +} diff --git a/util/file_recognizer.go b/util/file_recognizer.go new file mode 100644 index 0000000..01ce9ba --- /dev/null +++ b/util/file_recognizer.go @@ -0,0 +1,10 @@ +package util + +import "ZhenTuLocalPassiveAdapter/config" + +func IsVideoFile(name string) bool { + if name == "" { + return false + } + return name[len(name)-len(config.Config.FileName.FileExt):] == config.Config.FileName.FileExt || name[len(name)-len(config.Config.FileName.UnfinishedFileExt):] == config.Config.FileName.UnfinishedFileExt +} diff --git a/util/parse_date.go b/util/parse_date.go new file mode 100644 index 0000000..a6d076e --- /dev/null +++ b/util/parse_date.go @@ -0,0 +1,16 @@ +package util + +import ( + "fmt" + "regexp" + "time" +) + +func ParseDate(filePath string) (time.Time, error) { + re := regexp.MustCompile(`(\d{4}).?(\d{2}).?(\d{2})`) + matches := re.FindStringSubmatch(filePath) + if len(matches) != 4 { + return time.Time{}, fmt.Errorf("无法解析时间范围") + } + return time.Parse("2006.01.02", fmt.Sprintf("%s.%s.%s", matches[1], matches[2], matches[3])) +} diff --git a/util/parse_time.go b/util/parse_time.go new file mode 100644 index 0000000..a58c45f --- /dev/null +++ b/util/parse_time.go @@ -0,0 +1,39 @@ +package util + +import ( + "ZhenTuLocalPassiveAdapter/config" + "fmt" + "regexp" + "strings" + "time" +) + +func ParseStartStopTime(filePath string, relativeDate time.Time) (time.Time, time.Time, error) { + split := strings.Split(filePath, config.Config.FileName.TimeSplit) + if len(split) != 2 { + return time.Time{}, time.Time{}, fmt.Errorf("无法解析时间范围") + } + startTime, err := ParseTime(split[0], relativeDate) + if err != nil { + return time.Time{}, time.Time{}, err + } + stopTime, err := ParseTime(split[1], relativeDate) + if err != nil { + return time.Time{}, time.Time{}, err + } + return startTime, stopTime, nil +} + +func ParseTime(s string, relativeDate time.Time) (time.Time, error) { + re := regexp.MustCompile(`(\d{2}).?(\d{2}).?(\d{2})`) + matches := re.FindStringSubmatch(s) + if len(matches) != 4 { + return time.Time{}, fmt.Errorf("无法解析时间范围") + } + tm, err := time.Parse("15.04.05", fmt.Sprintf("%s.%s.%s", matches[1], matches[2], matches[3])) + if err != nil { + return time.Time{}, err + } + return time.Date(relativeDate.Year(), relativeDate.Month(), relativeDate.Day(), + tm.Hour(), tm.Minute(), tm.Second(), 0, time.Local), nil +}