This commit is contained in:
Jerry Yan 2025-02-07 22:58:01 +08:00
commit ba4aad0ae5
23 changed files with 1061 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
dist/
.idea/

51
api/oss_upload.go Normal file
View File

@ -0,0 +1,51 @@
package api
import (
"ZhenTuLocalPassiveAdapter/dto"
"bytes"
"fmt"
"io"
"net/http"
"os"
)
func UploadTaskFile(task dto.Task, file dto.FileObject) error {
url, err := QueryUploadUrlForTask(task.TaskID)
if err != nil {
return err
}
if err := OssUpload(url, file.URL); err != nil {
return err
}
return nil
}
func OssUpload(url, filePath string) error {
// 使用 http put 请求上传文件
file, err := os.Open(filePath)
defer os.Remove(filePath)
defer file.Close()
if err != nil {
return err
}
fileBytes, err := io.ReadAll(file)
if err != nil {
return err
}
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(fileBytes))
if err != nil {
return err
}
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(fileBytes)))
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("upload failed with status code %d", resp.StatusCode)
}
return nil
}

57
api/sync_task.go Normal file
View File

@ -0,0 +1,57 @@
package api
import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
)
func SyncTask() ([]dto.Task, error) {
url := config.Config.Api.BaseUrl + "/sync"
requestBody := map[string]interface{}{
"version": "0.0.1",
"devices": config.Config.Devices,
}
jsonData, err := json.Marshal(requestBody)
if err != nil {
log.Println("Error marshaling JSON:", err)
return nil, err
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
log.Println("Error creating request:", err)
return nil, err
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Println("Error sending request:", err)
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Println("Error reading response body:", err)
return nil, err
}
// 解析响应体为 map
var response dto.TaskListResponse
err = json.Unmarshal(body, &response)
if err != nil {
log.Println("->:", string(body))
log.Println("Error unmarshaling response body:", err)
return nil, err
}
if response.Code != 200 {
log.Println("Error response code:", response.Code)
return nil, fmt.Errorf(response.Msg)
}
return response.Data, nil
}

72
api/task_report.go Normal file
View File

@ -0,0 +1,72 @@
package api
import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
"bytes"
"encoding/json"
"io"
"log"
"net/http"
)
func QueryUploadUrlForTask(taskId string) (string, error) {
url := config.Config.Api.BaseUrl + "/" + taskId + "/uploadUrl"
req, err := http.NewRequest("POST", url, nil)
if err != nil {
log.Println("Error creating request:", err)
return "", err
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Println("Error sending request:", err)
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Println("Error reading response body:", err)
return "", err
}
return string(body), nil
}
func ReportTaskFailure(taskId string) {
url := config.Config.Api.BaseUrl + "/" + taskId + "/failure"
req, err := http.NewRequest("POST", url, nil)
if err != nil {
log.Println("Error creating request:", err)
return
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Println("Error sending request:", err)
return
}
defer resp.Body.Close()
}
func ReportTaskSuccess(taskId string, file *dto.FileObject) {
url := config.Config.Api.BaseUrl + "/" + taskId + "/success"
jsonData, err := json.Marshal(file)
if err != nil {
log.Println("Error marshaling JSON:", err)
return
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
log.Println("Error creating request:", err)
return
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Println("Error sending request:", err)
return
}
defer resp.Body.Close()
}

17
config.yaml Normal file
View File

@ -0,0 +1,17 @@
api:
# baseUrl: "https://zhentuai.com/vpt/v1/scenic/3946669713328836608"
baseUrl: "http://127.0.0.1:8030/vpt/v1/scenic/3946669713328836608"
record:
storage:
type: "local"
path: "/root/opt/"
duration: 60
devices:
- deviceNo: "34020000001322200001"
name: "192.168.55.201"
path: "/root/opt/34020000001322200001/"
fileName:
timeSplit: "-"
dateSeparator: "-"
fileExt: "dav"
unFinExt: "dav_"

34
config/dto.go Normal file
View File

@ -0,0 +1,34 @@
package config
type ApiConfig struct {
BaseUrl string `mapstructure:"baseUrl"`
}
type RecordConfig struct {
Storage StorageConfig `mapstructure:"storage"`
Duration int `mapstructure:"duration"`
}
type StorageConfig struct {
Type string `mapstructure:"type"`
Path string `mapstructure:"path"`
}
type DeviceMapping struct {
DeviceNo string `mapstructure:"deviceNo" json:"deviceNo"`
Name string `mapstructure:"name" json:"name"`
}
type FileNameConfig struct {
DateSeparator string `mapstructure:"dateSeparator"`
TimeSplit string `mapstructure:"timeSplit"`
FileExt string `mapstructure:"fileExt"`
UnfinishedFileExt string `mapstructure:"unFinExt"`
}
type MainConfig struct {
Api ApiConfig `mapstructure:"api"`
Record RecordConfig `mapstructure:"record"`
Devices []DeviceMapping `mapstructure:"devices"`
FileName FileNameConfig `mapstructure:"fileName"`
}

27
config/service.go Normal file
View File

@ -0,0 +1,27 @@
package config
import (
"github.com/spf13/viper"
"log"
)
var Config MainConfig
func LoadConfig() error {
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
// 读取配置文件
if err := viper.ReadInConfig(); err != nil {
log.Fatalf("Error reading config file, %s", err)
return err
}
// 反序列化配置到结构体
if err := viper.Unmarshal(&Config); err != nil {
log.Fatalf("Unable to decode into struct, %v", err)
return err
}
return nil
}

35
core/task.go Normal file
View File

@ -0,0 +1,35 @@
package core
import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/fs"
"ZhenTuLocalPassiveAdapter/util"
"fmt"
)
func HandleTask(device config.DeviceMapping, task dto.Task) (*dto.FileObject, error) {
adapter := fs.GetAdapter()
fileList, err := adapter.GetFileList(device.Name + "/" + task.StartTime.Format("2006"+config.Config.FileName.DateSeparator+"01"+config.Config.FileName.DateSeparator+"02"))
if err != nil {
return nil, err
}
files := util.FilterAndSortFiles(fileList, task.StartTime, task.EndTime)
if len(files) == 0 {
return nil, fmt.Errorf("没有找到文件")
}
constructTask, err := util.CheckFileCoverageAndConstructTask(files, task.StartTime, task.EndTime, task)
if err != nil {
return nil, err
}
ok := util.RunFfmpegTask(constructTask)
if !ok {
return nil, fmt.Errorf("ffmpeg任务执行失败")
}
return &dto.FileObject{
CreateTime: task.EndTime,
EndTime: task.EndTime,
NeedDownload: true,
URL: constructTask.OutputFile,
}, nil
}

13
dto/common_response.go Normal file
View File

@ -0,0 +1,13 @@
package dto
type ApiResponse struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data interface{} `json:"data"`
}
type TaskListResponse struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data []Task `json:"data"`
}

8
dto/ffmpeg.go Normal file
View File

@ -0,0 +1,8 @@
package dto
type FfmpegTask struct {
Files []File
Length int
Offset int
OutputFile string
}

16
dto/file.go Normal file
View File

@ -0,0 +1,16 @@
package dto
import "time"
type File struct {
BasePath string `json:"basePath"`
Url string `json:"url"`
Path string `json:"path"`
Name string `json:"name"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
}
func (f *File) GetDiffMs() int64 {
return f.EndTime.Sub(f.StartTime).Milliseconds()
}

62
dto/task.go Normal file
View File

@ -0,0 +1,62 @@
package dto
import (
"encoding/json"
"time"
)
// 自定义时间格式
const customTimeFormat = "2006-01-02 15:04:05"
func (t *Task) UnmarshalJSON(data []byte) error {
type Alias Task
aux := &struct {
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
*Alias
}{
Alias: (*Alias)(t),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
var err error
t.StartTime, err = time.ParseInLocation(customTimeFormat, aux.StartTime, time.Local)
if err != nil {
return err
}
t.EndTime, err = time.ParseInLocation(customTimeFormat, aux.EndTime, time.Local)
if err != nil {
return err
}
return nil
}
type Task struct {
TaskID string `json:"taskId"`
ScenicID string `json:"scenicId"`
DeviceID string `json:"deviceId"`
DeviceNo string `json:"deviceNo"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
}
type FileObject struct {
URL string `json:"url"`
NeedDownload bool `json:"needDownload"`
CreateTime time.Time `json:"createTime"`
EndTime time.Time `json:"endTime"`
}
func (f *FileObject) MarshalJSON() ([]byte, error) {
type Alias FileObject
return json.Marshal(&struct {
CreateTime string `json:"createTime"`
EndTime string `json:"endTime"`
*Alias
}{
CreateTime: f.CreateTime.Format(customTimeFormat),
EndTime: f.EndTime.Format(customTimeFormat),
Alias: (*Alias)(f),
})
}

16
fs/adapter.go Normal file
View File

@ -0,0 +1,16 @@
package fs
import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
)
type Adapter interface {
GetFileList(path string) ([]dto.File, error)
}
func GetAdapter() Adapter {
return &LocalAdapter{
config.Config.Record.Storage,
}
}

70
fs/local_adapter.go Normal file
View File

@ -0,0 +1,70 @@
package fs
import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/util"
"fmt"
"os"
"sort"
"time"
)
type LocalAdapter struct {
StorageConfig config.StorageConfig
}
func (l *LocalAdapter) GetFileList(path string) ([]dto.File, error) {
if l.StorageConfig.Path == "" {
return nil, fmt.Errorf("未配置存储路径")
}
if path == "" {
path = "/"
}
if path[0] != '/' {
path = "/" + path
}
if path[len(path)-1] != '/' {
path = path + "/"
}
// 读取文件夹下目录
files, err := os.ReadDir(l.StorageConfig.Path + path)
if err != nil {
return nil, err
}
var fileList []dto.File
for _, file := range files {
if file.IsDir() {
continue
}
if !util.IsVideoFile(file.Name()) {
continue
}
info, err := file.Info()
if err != nil {
continue
}
// TODO: 0点左右会出问题
relDt := info.ModTime()
startTime, stopTime, err := util.ParseStartStopTime(info.Name(), relDt)
if err != nil {
continue
}
if startTime.Equal(stopTime) {
stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration))
}
fileList = append(fileList, dto.File{
BasePath: l.StorageConfig.Path,
Name: file.Name(),
Path: path,
Url: fmt.Sprintf("%s%s%s", l.StorageConfig.Path, path, file.Name()),
StartTime: startTime,
EndTime: stopTime,
})
sort.Slice(fileList, func(i, j int) bool {
return fileList[i].StartTime.Before(fileList[j].StartTime)
})
}
return fileList, nil
}

26
go.mod Normal file
View File

@ -0,0 +1,26 @@
module ZhenTuLocalPassiveAdapter
go 1.23
require (
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.19.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

54
go.sum Normal file
View File

@ -0,0 +1,54 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ=
github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY=
github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0=
github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI=
github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

72
main.go Normal file
View File

@ -0,0 +1,72 @@
package main
import (
"ZhenTuLocalPassiveAdapter/api"
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/core"
"ZhenTuLocalPassiveAdapter/dto"
"log"
"os"
"time"
)
func startTask(device config.DeviceMapping, task dto.Task) {
fo, err := core.HandleTask(device, task)
if err != nil {
log.Printf("处理任务失败, TaskID【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
api.ReportTaskFailure(task.TaskID)
return
}
log.Printf("处理任务成功, TaskID【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo)
err = api.UploadTaskFile(task, *fo)
if err != nil {
log.Printf("上传文件失败, TaskID【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
api.ReportTaskFailure(task.TaskID)
return
}
log.Printf("上传文件成功, TaskID【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo)
api.ReportTaskSuccess(task.TaskID, fo)
}
func main() {
err := config.LoadConfig()
if err != nil {
log.Println("加载配置文件失败:", err)
return
}
// 日志文件路径
logFilePath := "app.log"
// 创建或打开日志文件
logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v", err)
}
defer logFile.Close()
// 设置日志输出到文件
log.SetOutput(logFile)
// 每两秒定时执行
for {
// 执行任务
tasks, err := api.SyncTask()
if err == nil {
for _, task := range tasks {
log.Printf("开始处理任务, TaskID【%s】DeviceNo: %s开始时间: %s结束时间: %s\n", task.TaskID, task.DeviceNo, task.StartTime, task.EndTime)
// 处理任务
for _, device := range config.Config.Devices {
if device.DeviceNo == task.DeviceNo {
// 处理任务
go startTask(device, task)
break // 提前返回,避免不必要的循环
}
}
log.Printf("处理任务结束, TaskID【%s】DeviceNo: %s\n", task.TaskID, task.DeviceNo)
}
} else {
log.Println("同步任务失败:", err)
}
// 等待两秒
<-time.After(2 * time.Second)
}
}

297
util/ffmpeg.go Normal file
View File

@ -0,0 +1,297 @@
package util
import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
"bytes"
"fmt"
"log"
"math/rand"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"time"
)
const FfmpegExec = "ffmpeg"
func RunFfmpegTask(task *dto.FfmpegTask) bool {
var result bool
if len(task.Files) == 1 {
// 单个文件切割,用简单方法
result = runFfmpegForSingleFile(task)
} else {
// 多个文件切割,用速度快的
result = runFfmpegForMultipleFile1(task)
}
// 先尝试方法1
if result {
return true
}
log.Printf("FFMPEG简易方法失败尝试复杂方法转码")
// 不行再尝试方法二
return runFfmpegForMultipleFile2(task)
}
func runFfmpegForMultipleFile1(task *dto.FfmpegTask) bool {
// 多文件方法一先转换成ts然后合并切割
// 步骤一先转换成ts并行转换
var wg sync.WaitGroup
var mu sync.Mutex
var notOk bool
for i := range task.Files {
wg.Add(1)
go func(file *dto.File) {
defer wg.Done()
tmpFile := os.TempDir() + "/" + file.Name + ".ts"
result, err := convertMp4ToTs(*file, tmpFile)
if err != nil {
log.Printf("转码出错: %v", err)
mu.Lock()
notOk = true
mu.Unlock()
return
}
if result {
mu.Lock()
file.Url = tmpFile
mu.Unlock()
} else {
// 失败了,务必删除临时文件
os.Remove(tmpFile)
}
}(&task.Files[i])
}
wg.Wait()
if notOk {
return false
}
// 步骤二使用concat协议拼接裁切
result, err := QuickConcatVideoCut(task.Files, int64(task.Offset), int64(task.Length), task.OutputFile)
if err != nil {
return false
}
// 步骤三:删除临时文件
for _, file := range task.Files {
if err := os.Remove(file.Url); err != nil {
log.Printf("删除临时文件失败: %v", err)
}
}
return result
}
func runFfmpegForMultipleFile2(task *dto.FfmpegTask) bool {
// 多文件,方法二:使用计算资源编码
result, err := SlowVideoCut(task.Files, int64(task.Offset), int64(task.Length), task.OutputFile)
if err != nil {
return false
}
return result
}
func runFfmpegForSingleFile(task *dto.FfmpegTask) bool {
result, err := QuickVideoCut(task.Files[0].Url, int64(task.Offset), int64(task.Length), task.OutputFile)
if err != nil {
return false
}
return result
}
func CheckFileCoverageAndConstructTask(fileList []dto.File, beginDt, endDt time.Time, task dto.Task) (*dto.FfmpegTask, error) {
if fileList == nil || len(fileList) == 0 {
log.Printf("无法根据要求找到对应录制片段ID【%s】开始时间【%s】结束时间【%s】", task.TaskID, beginDt, endDt)
return nil, fmt.Errorf("无法根据要求找到对应录制片段")
}
// 如果片段在中间断开时间过长
if len(fileList) > 1 {
var lastFile *dto.File
for _, file := range fileList {
if lastFile == nil {
lastFile = &file
continue
}
if file.StartTime.Sub(lastFile.EndTime).Milliseconds() > 2000 {
// 片段断开
log.Printf("分析FFMPEG任务失败ID【%s】文件片段【%s,%s】中间断开【%f】秒(超过2秒)", task.TaskID, lastFile.Name, file.Name, file.StartTime.Sub(lastFile.EndTime).Seconds())
return nil, fmt.Errorf("片段断开")
}
lastFile = &file
}
}
// 通过文件列表构造的任务仍然是缺失的
if fileList[len(fileList)-1].EndTime.Before(endDt) {
log.Printf("分析FFMPEG任务失败ID【%s】文件片段【%s】无法完整覆盖时间点【%s】", task.TaskID, fileList[len(fileList)-1].Name, endDt)
return nil, fmt.Errorf("片段断开")
}
// 构造FfmpegTaskPo
ffmpegTask := &dto.FfmpegTask{
Files: fileList,
Length: int(endDt.Sub(beginDt).Seconds()),
Offset: int(beginDt.Sub(fileList[0].StartTime).Seconds()),
OutputFile: os.TempDir() + "/" + task.TaskID + ".mp4",
}
if ffmpegTask.Offset > (config.Config.Record.Duration) {
log.Printf("分析FFMPEG任务失败ID【%s】文件片段【%s】无法完整覆盖时间点【%s】", task.TaskID, fileList[len(fileList)-1].Name, endDt)
return nil, fmt.Errorf("无法完整覆盖时间点")
}
return ffmpegTask, nil
}
func convertMp4ToTs(file dto.File, outFileName string) (bool, error) {
ffmpegCmd := []string{
FfmpegExec,
"-hide_banner",
"-y",
"-i", file.Url,
"-c", "copy",
"-bsf:v", "h264_mp4toannexb",
"-f", "mpegts",
outFileName,
}
return handleFfmpegProcess(ffmpegCmd)
}
func convertHevcToTs(file dto.File, outFileName string) (bool, error) {
ffmpegCmd := []string{
FfmpegExec,
"-hide_banner",
"-y",
"-i", file.Url,
"-c", "copy",
"-bsf:v", "hevc_mp4toannexb",
"-f", "mpegts",
outFileName,
}
return handleFfmpegProcess(ffmpegCmd)
}
func QuickVideoCut(inputFile string, offset, length int64, outputFile string) (bool, error) {
ffmpegCmd := []string{
FfmpegExec,
"-hide_banner",
"-y",
"-ss", strconv.FormatInt(offset, 10),
"-i", inputFile,
"-c:v", "copy",
"-an",
"-t", strconv.FormatInt(length, 10),
"-f", "mp4",
outputFile,
}
return handleFfmpegProcess(ffmpegCmd)
}
func QuickConcatVideoCut(inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) {
tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64())
tmpFileObj, err := os.Create(tmpFile)
if err != nil {
log.Printf("创建临时文件失败:%s", tmpFile)
return false, err
}
defer os.Remove(tmpFile)
defer tmpFileObj.Close()
for _, filePo := range inputFiles {
_, err := tmpFileObj.WriteString(fmt.Sprintf("file '%s'\n", filePo.Url))
if err != nil {
log.Printf("写入临时文件失败:%s", tmpFile)
return false, err
}
}
ffmpegCmd := []string{
FfmpegExec,
"-hide_banner",
"-y",
"-f", "concat",
"-safe", "0",
"-i", tmpFile,
"-c:v", "copy",
"-an",
"-ss", strconv.FormatInt(offset, 10),
"-t", strconv.FormatInt(length, 10),
"-f", "mp4",
outputFile,
}
return handleFfmpegProcess(ffmpegCmd)
}
func SlowVideoCut(inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) {
ffmpegCmd := []string{
FfmpegExec,
"-hide_banner",
"-y",
}
for _, file := range inputFiles {
ffmpegCmd = append(ffmpegCmd, "-i", file.Url)
}
inputCount := len(inputFiles)
filterComplex := strings.Builder{}
for i := 0; i < inputCount; i++ {
filterComplex.WriteString(fmt.Sprintf("[%d:v]", i))
}
filterComplex.WriteString(fmt.Sprintf("concat=n=%d:v=1[v]", inputCount))
ffmpegCmd = append(ffmpegCmd,
"-filter_complex", filterComplex.String(),
"-map", "[v]",
"-preset:v", "fast",
"-an",
"-ss", strconv.FormatInt(offset, 10),
"-t", strconv.FormatInt(length, 10),
"-f", "mp4",
outputFile,
)
return handleFfmpegProcess(ffmpegCmd)
}
func handleFfmpegProcess(ffmpegCmd []string) (bool, error) {
startTime := time.Now()
log.Printf("FFMPEG执行命令【%s】", strings.Join(ffmpegCmd, " "))
cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...)
var stderr bytes.Buffer
cmd.Stderr = &stderr
err := cmd.Start()
if err != nil {
log.Printf("FFMPEG执行命令失败错误信息%s命令【%s】", stderr.String(), strings.Join(ffmpegCmd, " "))
return false, err
}
defer cmd.Process.Kill()
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case <-time.After(1 * time.Minute):
log.Printf("FFMPEG执行命令没有在1分钟内退出命令【%s】", strings.Join(ffmpegCmd, " "))
return false, fmt.Errorf("ffmpeg command timed out")
case err := <-done:
if err != nil {
log.Printf("FFMPEG执行命令失败错误信息%s命令【%s】", stderr.String(), strings.Join(ffmpegCmd, " "))
return false, err
}
endTime := time.Now()
log.Printf("FFMPEG执行命令结束耗费时间【%dms】命令【%s】", endTime.Sub(startTime).Milliseconds(), strings.Join(ffmpegCmd, " "))
return true, nil
}
}

41
util/file_filter.go Normal file
View File

@ -0,0 +1,41 @@
package util
import (
"ZhenTuLocalPassiveAdapter/dto"
"sort"
"time"
)
func FilterAndSortFiles(fileList []dto.File, beginDt, endDt time.Time) []dto.File {
var filteredFiles []dto.File
for _, file := range fileList {
fileStartTime := file.StartTime
nextFileStartTime := file.EndTime
// 如果当前文件还没有开始
if beginDt.After(fileStartTime) {
// 没有下一个文件的情况下,就是最后一个文件
if nextFileStartTime.IsZero() {
continue
}
// 但是下一个文件已经开始
if beginDt.Before(nextFileStartTime) {
filteredFiles = append(filteredFiles, file)
}
// 已经开始,但是也已经结束了
} else if fileStartTime.After(endDt) {
continue
// 已经开始,但未结束
} else {
filteredFiles = append(filteredFiles, file)
}
}
// 按照 GetDiffMs 的值降序排序
sort.Slice(filteredFiles, func(i, j int) bool {
return filteredFiles[i].GetDiffMs() > filteredFiles[j].GetDiffMs()
})
return filteredFiles
}

26
util/file_filter_test.go Normal file
View File

@ -0,0 +1,26 @@
package util
import (
"ZhenTuLocalPassiveAdapter/dto"
"testing"
"time"
)
func TestMain(m *testing.M) {
// 示例数据
fileList := []dto.File{
{StartTime: time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 1, 0, 0, time.UTC)},
{StartTime: time.Date(2023, 1, 1, 0, 1, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 2, 0, 0, time.UTC)},
{StartTime: time.Date(2023, 1, 1, 0, 2, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 3, 0, 0, time.UTC)},
{StartTime: time.Date(2023, 1, 1, 0, 3, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 4, 0, 0, time.UTC)},
{StartTime: time.Date(2023, 1, 1, 0, 4, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 5, 0, 0, time.UTC)},
{StartTime: time.Date(2023, 1, 1, 0, 5, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 6, 0, 0, time.UTC)},
{StartTime: time.Date(2023, 1, 1, 0, 6, 0, 0, time.UTC), EndTime: time.Date(2023, 1, 1, 0, 7, 0, 0, time.UTC)},
}
beginDt := time.Date(2023, 1, 1, 0, 2, 10, 0, time.UTC)
endDt := time.Date(2023, 1, 1, 0, 2, 20, 0, time.UTC)
filteredFiles := FilterAndSortFiles(fileList, beginDt, endDt)
for _, file := range filteredFiles {
println(file.StartTime.String(), file.EndTime.String())
}
}

10
util/file_recognizer.go Normal file
View File

@ -0,0 +1,10 @@
package util
import "ZhenTuLocalPassiveAdapter/config"
func IsVideoFile(name string) bool {
if name == "" {
return false
}
return name[len(name)-len(config.Config.FileName.FileExt):] == config.Config.FileName.FileExt || name[len(name)-len(config.Config.FileName.UnfinishedFileExt):] == config.Config.FileName.UnfinishedFileExt
}

16
util/parse_date.go Normal file
View File

@ -0,0 +1,16 @@
package util
import (
"fmt"
"regexp"
"time"
)
func ParseDate(filePath string) (time.Time, error) {
re := regexp.MustCompile(`(\d{4}).?(\d{2}).?(\d{2})`)
matches := re.FindStringSubmatch(filePath)
if len(matches) != 4 {
return time.Time{}, fmt.Errorf("无法解析时间范围")
}
return time.Parse("2006.01.02", fmt.Sprintf("%s.%s.%s", matches[1], matches[2], matches[3]))
}

39
util/parse_time.go Normal file
View File

@ -0,0 +1,39 @@
package util
import (
"ZhenTuLocalPassiveAdapter/config"
"fmt"
"regexp"
"strings"
"time"
)
func ParseStartStopTime(filePath string, relativeDate time.Time) (time.Time, time.Time, error) {
split := strings.Split(filePath, config.Config.FileName.TimeSplit)
if len(split) != 2 {
return time.Time{}, time.Time{}, fmt.Errorf("无法解析时间范围")
}
startTime, err := ParseTime(split[0], relativeDate)
if err != nil {
return time.Time{}, time.Time{}, err
}
stopTime, err := ParseTime(split[1], relativeDate)
if err != nil {
return time.Time{}, time.Time{}, err
}
return startTime, stopTime, nil
}
func ParseTime(s string, relativeDate time.Time) (time.Time, error) {
re := regexp.MustCompile(`(\d{2}).?(\d{2}).?(\d{2})`)
matches := re.FindStringSubmatch(s)
if len(matches) != 4 {
return time.Time{}, fmt.Errorf("无法解析时间范围")
}
tm, err := time.Parse("15.04.05", fmt.Sprintf("%s.%s.%s", matches[1], matches[2], matches[3]))
if err != nil {
return time.Time{}, err
}
return time.Date(relativeDate.Year(), relativeDate.Month(), relativeDate.Day(),
tm.Hour(), tm.Minute(), tm.Second(), 0, time.Local), nil
}