initial commit

This commit is contained in:
root
2024-10-30 13:23:52 +01:00
commit 68ba48a3a2
29 changed files with 1977 additions and 0 deletions

33
http/hw/stream/audio.go Normal file
View File

@@ -0,0 +1,33 @@
package stream
import (
"github.com/gin-gonic/gin"
"io"
"net/http"
"os/exec"
)
func AudioHandler(c *gin.Context) {
cmd := exec.Command("ffmpeg",
"-f", "alsa",
"-i", "hw:0,0",
"-acodec", "aac",
"-f", "mp4", "-")
// ffmpeg -f alsa -i hw:0,0 -acodec aac -f mp4 -
stdout, err := cmd.StdoutPipe()
if err != nil {
c.String(http.StatusInternalServerError, "Failed to capture audio: %v", err)
return
}
if err := cmd.Start(); err != nil {
c.String(http.StatusInternalServerError, "Failed to start ffmpeg: %v", err)
return
}
defer cmd.Wait()
c.Header("Content-Type", "audio/mp4")
if _, err := io.Copy(c.Writer, stdout); err != nil {
c.String(http.StatusInternalServerError, "Failed to stream audio: %v", err)
return
}
}

View File

@@ -0,0 +1,135 @@
package stream
import (
"io"
"os"
"os/exec"
"sync"
log "github.com/sirupsen/logrus"
)
type ExtProcess struct {
path string
cmd *exec.Cmd
mu sync.Mutex
args []string
running bool
stopChan chan struct{}
finished chan struct{}
state State
stdin io.WriteCloser
}
func Init(path string, args []string) *ExtProcess {
return &ExtProcess{
args: args,
path: path,
}
}
func (u *ExtProcess) Start() {
u.mu.Lock()
defer u.mu.Unlock()
if u.running {
log.Debug("process is already running.")
return
}
u.stopChan = make(chan struct{})
u.finished = make(chan struct{})
log.Debug("Starting external process...")
u.cmd = exec.Command(u.path, u.args...)
u.cmd.Stdout = os.Stdout
u.cmd.Stderr = os.Stderr
stdin, err := u.cmd.StdinPipe()
if err != nil {
log.Errorf("Couldn't handle stdin pipe: %v", err)
return
}
u.stdin = stdin
err = u.cmd.Start()
if err != nil {
log.Errorf("Failed to start process: %v", err)
}
u.running = true
go func() {
err = u.cmd.Wait()
u.running = false
log.Errorf("process exited with error: %v", err)
// planned stop
if err == nil {
u.finished <- struct{}{}
close(u.stopChan)
return
}
u.stopChan <- struct{}{}
close(u.finished)
}()
}
// Stop the ustreamer application
func (u *ExtProcess) Stop() {
u.mu.Lock()
defer u.mu.Unlock()
if !u.running {
log.Debug("process is not running.")
return
}
log.Warn("Stopping process...")
_, err := u.stdin.Write([]byte("q")) // TODO: works for ffmpeg, should be general or move into FFmpeg struct
if err != nil {
log.Errorf("Failed to stop process: %v", err)
if err := u.cmd.Process.Kill(); err != nil {
log.Errorf("Failed to kill process: %v", err)
}
}
if err := u.stdin.Close(); err != nil {
log.Errorf("Failed to close stdin: %v", err)
}
log.Info("waiting for finish")
select {
case <-u.finished:
log.Info("stopped as expected")
case <-u.stopChan:
log.Info("was killed")
}
u.running = false
}
func (u *ExtProcess) ChangeArgs(newArgs []string) {
u.mu.Lock()
defer u.mu.Unlock()
u.args = newArgs
log.Printf("Updated process arguments: %v", u.args)
}
func (u *ExtProcess) Watch() {
for {
select {
case <-u.stopChan:
log.Errorf("process stopped unexpectedly. Restarting...")
u.Start()
/*case <-time.After(1 * time.Second): // Adjust the monitoring interval as needed
if !u.running {
log.Errorf("process is not running. Restarting...")
u.Start()
}*/
}
}
}

53
http/hw/stream/ffmpeg.go Normal file
View File

@@ -0,0 +1,53 @@
package stream
import (
"rkkvm/config"
)
//h264
//ffmpeg -re -i /dev/video0 -c:v h264_rkmpp -b:v 2000000 -bsf:v h264_mp4toannexb -g 10 -f rtp rtp://127.0.0.1:5004?pkt_size=1200
//h265
//ffmpeg -re -i /dev/video0 -c:v hevc_rkmpp -b:v 2000000 -g 10 -f rtp rtp://127.0.0.1:5004?pkt_size=1200
// vp8
//ffmpeg -re -i /dev/video0 -c:v libvpx -crf 10 -b:v 3M -deadline 1 -g 10 -error-resilient 1 -auto-alt-ref 1 -an -f rtp rtp://127.0.0.1:5004?pkt_size=1200
// https://jsfiddle.net/z7ms3u5r/
var ffmpeg *FFmpeg
type FFmpeg struct {
*ExtProcess
config.FFmpeg
}
func (f *FFmpeg) SetBitrate(b int) {
f.Bitrate = b
if f.Bitrate < 0 {
f.Bitrate = 6000
}
f.ChangeArgs(f.FormatArgs())
}
func (f *FFmpeg) SetFPS(fps int) {
f.FPS = fps
if f.FPS < 0 {
f.FPS = 30
}
f.ChangeArgs(f.FormatArgs())
}
func InitFFmpeg(path string, args []string) *FFmpeg {
ffmpeg = &FFmpeg{
ExtProcess: Init(path, args),
FFmpeg: config.Get().FFmpeg,
}
return ffmpeg
}
func GetFFmpeg() *FFmpeg {
return ffmpeg
}

View File

@@ -0,0 +1,52 @@
package stream
import (
"rkkvm/config"
"time"
log "github.com/sirupsen/logrus"
)
var ustreamer *UStreamer
type UStreamer struct {
*ExtProcess
config.UStreamer
}
func InitUStreamer(path string, args []string) *UStreamer {
ustreamer = &UStreamer{
ExtProcess: Init(path, args),
UStreamer: config.Get().UStreamer,
}
return ustreamer
}
func GetUStreamer() *UStreamer {
return ustreamer
}
func (u *UStreamer) MonitorState() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
{
state, err := GetState()
if err != nil {
u.state = State{}
log.Errorf("Failed to get process state: %v", err)
continue
}
u.state = state
}
}
}
}
func (u *UStreamer) GetState() State {
return u.state
}

189
http/hw/stream/video.go Normal file
View File

@@ -0,0 +1,189 @@
package stream
import (
"encoding/json"
"fmt"
"io"
"net/http"
"rkkvm/config"
"rkkvm/http/hw/rtc"
"strconv"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
)
/*
{
"ok": true,
"result": {
"instance_id": "",
"encoder": {
"type": "CPU",
"quality": 80
},
"source": {
"resolution": {
"width": 1920,
"height": 1080
},
"online": true,
"desired_fps": 30,
"captured_fps": 50
},
"stream": {
"queued_fps": 26,
"clients": 1,
"clients_stat": {
"2d90d210e837a19c": {
"fps": 26,
"extra_headers": false,
"advance_headers": false,
"dual_final_frames": false,
"zero_data": false,
"key": "0"
}
}
}
}
}
*/
type State struct {
Ok bool `json:"ok"`
Result struct {
InstanceID string `json:"instance_id"`
Encoder struct {
Type string `json:"type"`
Quality int `json:"quality"`
} `json:"encoder"`
Source struct {
Resolution struct {
Width int `json:"width"`
Height int `json:"height"`
} `json:"resolution"`
Online bool `json:"online"`
DesiredFPS int `json:"desired_fps"`
CapturedFPS int `json:"captured_fps"`
} `json:"source"`
Stream struct {
QueuedFPS int `json:"queued_fps"`
Clients int `json:"clients"`
ClientsStat map[string]struct {
FPS int `json:"fps"`
ExtraHeaders bool `json:"extra_headers"`
AdvanceHeaders bool `json:"advance_headers"`
DualFinalFrames bool `json:"dual_final_frames"`
ZeroData bool `json:"zero_data"`
Key string `json:"key"`
}
} `json:"stream"`
} `json:"result"`
}
func FormatHttpUrl() string {
cfg := config.Get()
return fmt.Sprintf("http://%s:%d", cfg.UStreamer.Host, cfg.UStreamer.Port)
}
func GetState() (State, error) {
url := FormatHttpUrl() + "/state"
rsp, err := http.Get(url)
if err != nil {
log.Errorf("Failed to get state: %v", err)
return State{}, err
}
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
log.Errorf("Invalid status code: %d", rsp.StatusCode)
return State{}, err
}
var state State
if err := json.NewDecoder(rsp.Body).Decode(&state); err != nil {
log.Errorf("Failed to decode state: %v", err)
return State{}, err
}
return state, nil
}
func MjpegHandler(c *gin.Context) {
url := FormatHttpUrl() + "/stream"
rsp, err := http.Get(url)
if err != nil {
c.String(http.StatusInternalServerError, "Failed to get stream")
log.Errorf("Failed to get stream: %v", err)
return
}
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
c.String(rsp.StatusCode, "Invalid status code")
return
}
c.Header("Content-Type", rsp.Header.Get("Content-Type"))
buf := make([]byte, 1024*1024)
_, err = io.CopyBuffer(c.Writer, rsp.Body, buf)
if err != nil {
c.String(http.StatusInternalServerError, "Failed to passthrough stream")
log.Errorf("Failed to passthrough stream: %v", err)
return
}
}
func WebRTCHandshake(c *gin.Context) {
str, err := c.GetRawData()
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
log.Debugf("Client session description: %s", string(str))
r := rtc.Get()
localSession, err := r.Handshake(string(str))
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
c.String(http.StatusOK, localSession)
}
func WebRTCSettings(c *gin.Context) {
bitrateStr := c.Query("bitrate")
fpsStr := c.Query("fps")
if fpsStr == "" && bitrateStr == "" {
c.String(http.StatusBadGateway, "please specify bitrate or fps parameters")
return
}
ffmpeg := GetFFmpeg()
if len(bitrateStr) > 0 {
bitrate, err := strconv.Atoi(bitrateStr)
if err != nil {
c.String(http.StatusBadRequest, "wrong bitrate value")
return
}
ffmpeg.SetBitrate(bitrate)
}
if len(fpsStr) > 0 {
fps, err := strconv.Atoi(fpsStr)
if err != nil {
c.String(http.StatusBadRequest, "wrong fps value")
return
}
ffmpeg.SetFPS(fps)
}
ffmpeg.Stop()
ffmpeg.Start()
}