diff --git a/cmd/kvm/main.go b/cmd/kvm/main.go index 9aa12e9..671cd32 100644 --- a/cmd/kvm/main.go +++ b/cmd/kvm/main.go @@ -35,11 +35,8 @@ func main() { ustreamer.Start() //go ustreamer.Watch() } else if cfg.Stream.Source == config.StreamSourceH264 { - ffmpeg := stream.InitFFmpeg(cfg.Video.Path, cfg.Video.FormatArgs()) - ffmpeg.Start() - - audio := stream.InitPipedCmd(config.Get().Audio) - audio.Start() + v := stream.InitFFmpeg() + v.Start() } else { log.Fatalf("unsupported stream source type: %v", cfg.Stream.Source) } diff --git a/config/config.go b/config/config.go index 2195faf..de1ee7d 100644 --- a/config/config.go +++ b/config/config.go @@ -3,7 +3,6 @@ package config import ( "fmt" "os" - "strings" "gopkg.in/yaml.v3" ) @@ -43,17 +42,24 @@ const ( StreamSourceHevc = "hevc" ) +type StreamInput string + +const ( + StreamInputVideo = "v" + StreamInputVideoAudio = "va" +) + type FFmpeg struct { - ExtProcess - FPS int `yaml:"fps"` - Bitrate int `yaml:"bitrate"` - Height int `yaml:"height"` - GOP int `yaml:"gop"` - Codec string `yaml:"codec"` + Commands map[StreamInput]string `yaml:"commands"` + FPS int `yaml:"fps"` + Bitrate int `yaml:"bitrate"` + Height int `yaml:"height"` + GOP int `yaml:"gop"` + Codec string `yaml:"codec"` } -func (f FFmpeg) FormatArgs() []string { - return strings.Split(fmt.Sprintf(f.Args, f.Height, f.Codec, f.Bitrate*1000, f.FPS, f.GOP), " ") +func (f FFmpeg) FormatCmd(cmd string) string { + return fmt.Sprintf(cmd, f.Height, f.Codec, f.Bitrate*1000, f.FPS, f.GOP) } type UStreamer struct { @@ -68,8 +74,6 @@ func Get() Config { return c } -// arecord -D hw:0,0 -f cd -r 44100 -c 2 | /app/ffmpeg -re -init_hw_device rkmpp=hw -filter_hw_device hw -f wav -i pipe:0 -f v4l2 -c:a aac -b:a 128k -ar 44100 -ac 2 -f rtp rtp://127.0.0.1:5006?pkt_size=1200 -i /dev/video0 -vf hwupload,scale_rkrga=h=720:force_original_aspect_ratio=1 -c:v h264_rkmpp -flags +low_delay -b:v 6000000 -framerate 60 -g 10 -f rtp rtp://127.0.0.1:5004?pkt_size=1200 - func Init() { c = Config{ LogLevel: "debug", @@ -84,10 +88,15 @@ func Init() { }, }, Video: FFmpeg{ - ExtProcess: ExtProcess{ - Path: "/app/ffmpeg", - Args: "-hide_banner -loglevel error -re -init_hw_device rkmpp=hw -filter_hw_device hw -i /dev/video0 -vf hwupload,scale_rkrga=h=%d:force_original_aspect_ratio=1 -c:v %s_rkmpp -flags +low_delay -b:v %d -framerate %d -g %d -f rtp rtp://127.0.0.1:5004?pkt_size=1200", - //Args: "-re -i /dev/video0 -c:v h264_rkmpp -b:v %d -framerate %d -bsf:v h264_mp4toannexb -g 10 -f rtp rtp://127.0.0.1:5004?pkt_size=1200", + Commands: map[StreamInput]string{ + StreamInputVideoAudio: "/usr/bin/arecord -D hw:0,0 -f dat -r 48000 -c 2 --buffer-size=150 | /app/ffmpeg -init_hw_device rkmpp=hw -filter_hw_device hw" + + " -f wav -i pipe:0 -map 0:a -c:a libopus -b:a 48000 -sample_fmt s16 -ssrc 1 -payload_type 111 -f rtp -max_delay 0 -application lowdelay" + + " -f rtp rtp://127.0.0.1:5006?pkt_size=1200" + + " -i /dev/video0 -map 1:v -vf hwupload,scale_rkrga=h=%d:force_original_aspect_ratio=1 -c:v %s_rkmpp -flags +low_delay -b:v %d -framerate %d -g %d" + + " -f rtp rtp://127.0.0.1:5004?pkt_size=1200", + StreamInputVideo: "/app/ffmpeg -hide_banner -loglevel error -init_hw_device rkmpp=hw -filter_hw_device hw" + + " -i /dev/video0 -vf hwupload,scale_rkrga=h=%d:force_original_aspect_ratio=1 -c:v %s_rkmpp -flags +low_delay -b:v %d -framerate %d -g %d" + + " -f rtp rtp://127.0.0.1:5004?pkt_size=1200", }, FPS: 60, Bitrate: 6000, @@ -95,10 +104,6 @@ func Init() { GOP: 5, Codec: "h264", }, - Audio: []string{ - "/usr/bin/arecord -D hw:0,0 -f dat -r 48000 -c 2 --buffer-size=60", - "/app/ffmpeg -hide_banner -loglevel error -re -f wav -i pipe:0 -c:a libopus -b:a 48000 -sample_fmt s16 -ssrc 1 -payload_type 111 -f rtp -max_delay 0 -application lowdelay -f rtp rtp://127.0.0.1:5006?pkt_size=1200", - }, WebRtc: WebRtc{ Host: "127.0.0.1", Port: 5004, diff --git a/go.mod b/go.mod index 383ab30..db90c2e 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/gorilla/websocket v1.5.3 github.com/pion/webrtc/v4 v4.0.1 github.com/sirupsen/logrus v1.9.3 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -54,5 +55,4 @@ require ( golang.org/x/sys v0.26.0 // indirect golang.org/x/text v0.19.0 // indirect google.golang.org/protobuf v1.34.1 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/http/hw/stream/audio.go b/http/hw/stream/audio.go deleted file mode 100644 index 1269f6e..0000000 --- a/http/hw/stream/audio.go +++ /dev/null @@ -1,47 +0,0 @@ -package stream - -import ( - "io" - "net/http" - "os/exec" - - "github.com/gin-gonic/gin" - log "github.com/sirupsen/logrus" -) - -type Audio struct { -} - -func AudioHandler(c *gin.Context) { - cmd := exec.Command("ffmpeg", - "-f", "alsa", - "-i", "hw:0,0", - "-acodec", "aac", - "-f", "mp4", "-") - - //c := "arecord -D hw:0,0 -f cd -r 44100 -c 2 | /app/ffmpeg -re -f wav -i pipe:0 -c:a aac -b:a 128k -ar 44100 -ac 2 -f rtp rtp://127.0.0.1:5006?pkt_size=1200" - //cmd := exec.Command("sh", "-c", c) - // ffmpeg -f alsa -i hw:0,0 -acodec aac -f mp4 - - stdout, err := cmd.StdoutPipe() - if err != nil { - c.String(http.StatusInternalServerError, "Failed to capture audio: %v", err) - log.Errorf("Failed to capture audio: %v", err) - return - } - if err := cmd.Start(); err != nil { - c.String(http.StatusInternalServerError, "Failed to start ffmpeg: %v", err) - log.Errorf("Failed to start ffmpeg: %v", err) - return - } - defer cmd.Wait() - - //c.Header("Content-Type", "audio/aac") - c.Header("Content-Type", "audio/wav") - c.Header("Transfer-Encoding", "chunked") - - if _, err := io.Copy(c.Writer, stdout); err != nil { - c.String(http.StatusInternalServerError, "Failed to stream audio: %v", err) - log.Errorf("Failed to stream audio: %v", err) - return - } -} diff --git a/http/hw/stream/ffmpeg.go b/http/hw/stream/ffmpeg.go index d5bf413..954d79c 100644 --- a/http/hw/stream/ffmpeg.go +++ b/http/hw/stream/ffmpeg.go @@ -23,6 +23,12 @@ arecord -D hw:0,0 -f cd -r 44100 -c 2 | /app/ffmpeg -re -init_hw_device rkmpp=hw arecord -D hw:0,0 -f cd -r 44100 -c 2 | /app/ffmpeg -re -f wav -i pipe:0 -c:a aac -b:a 128k -ar 44100 -ac 2 -f rtp rtp://127.0.0.1:5006?pkt_size=1200 + +arecord -D hw:0,0 -f dat -r 48000 -c 2 --buffer-size=60 | /app/ffmpeg -re -init_hw_device rkmpp=hw -filter_hw_device hw \ +-f wav -i pipe:0 -map 0:a -c:a libopus -b:a 48000 -sample_fmt s16 -ssrc 1 -payload_type 111 -f rtp -max_delay 0 -application lowdelay \ +-f rtp rtp://127.0.0.1:5006?pkt_size=1200 \ +-i /dev/video0 -map 1:v -vf hwupload,scale_rkrga=h=720:force_original_aspect_ratio=1 -c:v h264_rkmpp -flags +low_delay -b:v 6000000 -framerate 60 -g 10 \ +-f rtp rtp://127.0.0.1:5004?pkt_size=1200 */ // https://jsfiddle.net/z7ms3u5r/ @@ -30,50 +36,57 @@ arecord -D hw:0,0 -f cd -r 44100 -c 2 | /app/ffmpeg -re -f wav -i pipe:0 -c:a aa var ffmpeg *FFmpeg type FFmpeg struct { - *ExtProcess + *PipedCmd config.FFmpeg } +func (f *FFmpeg) getCmd() string { + return f.Commands[config.StreamInputVideoAudio] +} + func (f *FFmpeg) SetBitrate(b int) { f.Bitrate = b if f.Bitrate < 0 { f.Bitrate = 6000 } - - f.ChangeArgs(f.FormatArgs()) } func (f *FFmpeg) SetFPS(fps int) { f.FPS = fps if f.FPS < 0 { f.FPS = 30 + } else if f.FPS > 60 { + f.FPS = 60 } - - f.ChangeArgs(f.FormatArgs()) } func (f *FFmpeg) SetResolution(height int) { f.Height = height if f.Height <= 0 { f.Height = 1080 + } else if f.Height > 2060 { + f.Height = 2060 } - - f.ChangeArgs(f.FormatArgs()) } func (f *FFmpeg) SetGOP(gop int) { f.GOP = gop if f.GOP < 0 { - f.GOP = 0 + f.GOP = 2 } - - f.ChangeArgs(f.FormatArgs()) } -func InitFFmpeg(path string, args []string) *FFmpeg { +func (f *FFmpeg) ApplyOptions() { + f.ChangeCmd(f.FormatCmd(f.getCmd())) +} + +func InitFFmpeg() *FFmpeg { + cfg := config.Get().Video + cmd := cfg.Commands[config.StreamInputVideoAudio] + ffmpeg = &FFmpeg{ - ExtProcess: Init(path, args), - FFmpeg: config.Get().Video, + PipedCmd: InitPipedCmd(cfg.FormatCmd(cmd)), + FFmpeg: cfg, } return ffmpeg } diff --git a/http/hw/stream/pipedprocess.go b/http/hw/stream/pipedprocess.go index 71fb1ab..9ae7b4f 100644 --- a/http/hw/stream/pipedprocess.go +++ b/http/hw/stream/pipedprocess.go @@ -1,25 +1,39 @@ package stream import ( - "log" + "os" "os/exec" "strings" "sync" "syscall" + + log "github.com/sirupsen/logrus" ) var pipedCmd *PipedCmd +var mu sync.Mutex // PipedCmd struct manages a sequence of piped commands. type PipedCmd struct { - cmds []*exec.Cmd - mu sync.Mutex - running bool - finished chan struct{} + cmds []*exec.Cmd + running bool } // InitPipedCmd initializes a PipedCmd instance with a sequence of commands. -func InitPipedCmd(cmds []string) *PipedCmd { +func InitPipedCmd(cmd string) *PipedCmd { + pipedCmd = &PipedCmd{} + pipedCmd.ChangeCmd(cmd) + return pipedCmd +} + +func (p *PipedCmd) ChangeCmd(cmd string) { + cmds := strings.Split(cmd, "|") + for i, c := range cmds { + cmds[i] = strings.TrimSpace(c) + } + + log.Debugf("Cmds: %+v", cmds) + pipedCmds := make([]*exec.Cmd, len(cmds)) // Initialize each command in the sequence @@ -31,20 +45,16 @@ func InitPipedCmd(cmds []string) *PipedCmd { pipedCmds[i] = exec.Command(cmdArgs[0], cmdArgs[1:]...) } - pipedCmd = &PipedCmd{ - cmds: pipedCmds, - finished: make(chan struct{}), - } - return pipedCmd + p.cmds = pipedCmds } // Start begins execution of all commands in the piped sequence. func (p *PipedCmd) Start() error { - p.mu.Lock() - defer p.mu.Unlock() + mu.Lock() + defer mu.Unlock() if p.running { - log.Println("Process is already running.") + log.Debugf("Process is already running.") return nil } @@ -56,12 +66,17 @@ func (p *PipedCmd) Start() error { } p.cmds[i+1].Stdin = stdout } + // pipe stdout and stderr of the last command into os + p.cmds[len(p.cmds)-1].Stdout = os.Stdout + p.cmds[len(p.cmds)-1].Stderr = os.Stderr // Start each command in the sequence for _, cmd := range p.cmds { if err := cmd.Start(); err != nil { - p.terminateAll() // Clean up if any command fails to start + p.terminateAll() return err + } else { + log.Debugf("Started process: (%d) %v", cmd.Process.Pid, cmd.Args) } } @@ -82,8 +97,8 @@ func (p *PipedCmd) monitorCommands() { go func(cmd *exec.Cmd) { defer wg.Done() err := cmd.Wait() - if err != nil { - log.Printf("Command failed: %v", err) + if err != nil && err.Error() != "signal: terminated" { + log.Debugf("Command failed: %v", err) p.terminateAll() // Terminate all if any command fails } }(cmd) @@ -91,179 +106,33 @@ func (p *PipedCmd) monitorCommands() { // Wait for all commands to complete or terminate wg.Wait() - p.mu.Lock() + mu.Lock() + defer mu.Unlock() p.running = false - close(p.finished) - p.mu.Unlock() } // terminateAll sends a termination signal to all running commands. func (p *PipedCmd) terminateAll() { - p.mu.Lock() - defer p.mu.Unlock() - for _, cmd := range p.cmds { if cmd.Process != nil { + log.Debugf("Sending SIGTERM to: (%d) %v", cmd.Process.Pid, cmd.Args) _ = cmd.Process.Signal(syscall.SIGTERM) // Send SIGTERM to allow graceful termination } } + + p.running = false } // Stop manually stops all commands in the sequence. func (p *PipedCmd) Stop() { - p.mu.Lock() - defer p.mu.Unlock() + mu.Lock() + defer mu.Unlock() if !p.running { - log.Println("Process is not running.") + log.Debug("Process is not running.") return } - log.Println("Stopping process...") + log.Debug("Stopping process...") p.terminateAll() - p.running = false - close(p.finished) } - -/* -import ( - "os" - "os/exec" - "strings" - "sync" - - log "github.com/sirupsen/logrus" -) - -type PipedCmd struct { - cmds []string - cmdsExec []*exec.Cmd - mu sync.Mutex - running bool - stopChan chan struct{} - finished chan struct{} -} - -// Init initializes the PipedCmd with a slice of command strings -func InitPipedCmd(cmds []string) *PipedCmd { - return &PipedCmd{ - cmds: cmds, - } -} - -// Start initializes and starts the piped commands -func (p *PipedCmd) Start() { - p.mu.Lock() - defer p.mu.Unlock() - - if p.running { - log.Debug("process is already running.") - return - } - - p.stopChan = make(chan struct{}) - p.finished = make(chan struct{}) - - log.Debugf("Starting piped commands: <%s>", strings.Join(p.cmds, " | ")) - - // Create commands and set up pipes - for i, cmdStr := range p.cmds { - // Split command string into command and arguments - cmdParts := strings.Fields(cmdStr) - if len(cmdParts) == 0 { - log.Errorf("Empty command string at index %d", i) - continue - } - - cmd := exec.Command(cmdParts[0], cmdParts[1:]...) - - // Set up pipes for stdin/stdout - if i > 0 { - stdin, err := p.cmdsExec[i-1].StdoutPipe() - if err != nil { - log.Errorf("Couldn't set up stdout pipe for command %s: %v", p.cmdsExec[i-1].Path, err) - return - } - cmd.Stdin = stdin - } - - cmd.Stderr = os.Stderr // Log stderr to standard error output - - p.cmdsExec = append(p.cmdsExec, cmd) - } - - // Start the first command - if err := p.cmdsExec[0].Start(); err != nil { - log.Errorf("Failed to start command: %v", err) - return - } - p.running = true - - // Start remaining commands - for _, cmd := range p.cmdsExec[1:] { - if err := cmd.Start(); err != nil { - log.Errorf("Failed to start command: %v", err) - return - } - } - - go func() { - // Wait for the last command to finish - err := p.cmdsExec[len(p.cmdsExec)-1].Wait() - p.running = false - log.Errorf("process exited with error: %v", err) - - // Signal that the process has finished - close(p.finished) - close(p.stopChan) - }() -} - -// Stop terminates the piped commands gracefully -func (p *PipedCmd) Stop() { - p.mu.Lock() - defer p.mu.Unlock() - - if !p.running { - log.Debug("process is not running.") - return - } - - log.Warn("Stopping process...") - - for _, cmd := range p.cmdsExec { - if err := cmd.Process.Kill(); err != nil { - log.Errorf("Failed to kill process: %v", err) - } - } - - select { - case <-p.finished: - log.Info("stopped as expected") - case <-p.stopChan: - log.Info("was killed") - } - - p.running = false -} - -// ChangeArgs updates the command arguments -func (p *PipedCmd) ChangeArgs(newCmds []string) { - p.mu.Lock() - defer p.mu.Unlock() - - p.cmds = newCmds - log.Printf("Updated process commands: %v", p.cmds) -} - -// Watch monitors the process and restarts if it stops unexpectedly -func (p *PipedCmd) Watch() { - for { - select { - case <-p.stopChan: - log.Errorf("process stopped unexpectedly. Restarting...") - p.Start() - } - } -} -*/ diff --git a/http/route/api.go b/http/route/api.go index 5a04e77..4bcc11a 100644 --- a/http/route/api.go +++ b/http/route/api.go @@ -21,7 +21,6 @@ func Api(e *gin.Engine) { api := e.Group("/api").Use(middleware.CheckToken()) api.GET("/stream/mjpeg", stream.MjpegHandler) - api.GET("/stream/audio", stream.AudioHandler) api.GET("/ws", ws.ConnHandler) api.POST("/stream/webrtc", stream.WebRTCPeerConnect) diff --git a/http/route/nanokvm_ui.go b/http/route/nanokvm_ui.go index ba2e8e7..2e18269 100644 --- a/http/route/nanokvm_ui.go +++ b/http/route/nanokvm_ui.go @@ -5,6 +5,7 @@ import ( "rkkvm/http/hw/stream" "rkkvm/http/middleware" "rkkvm/http/reqrsp" + "time" "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" @@ -70,7 +71,12 @@ func SetScreen(c *gin.Context) { }) return } + + log.Debug("Stopping ffmpeg SetScreen") ffmpeg.Stop() + time.Sleep(100 * time.Millisecond) + ffmpeg.ApplyOptions() + log.Debug("Starting ffmpeg SetScreen") ffmpeg.Start() log.Debugf("update screen: %+v", req)