package rtc import ( "errors" "fmt" "net" "rkkvm/external/mpp" "sync" "time" "github.com/google/uuid" log "github.com/sirupsen/logrus" "github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4/pkg/media" ) var rtc *RTC func Get() *RTC { return rtc } var ErrWebRTC = errors.New("webrtc") var ErrWebRTCParam = func(format string, args ...any) error { return fmt.Errorf("%w: "+format, args...) } var ErrPeerClosedConn = ErrWebRTCParam("peer closed conn") type RTC struct { peers map[string]*webrtc.PeerConnection audioListener *net.UDPConn videoTrack *webrtc.TrackLocalStaticSample audioTrack *webrtc.TrackLocalStaticRTP m sync.Mutex } func (r *RTC) AddPeer(p *webrtc.PeerConnection, offer webrtc.SessionDescription) (*webrtc.SessionDescription, error) { peerID := uuid.New().String() r.m.Lock() r.peers[peerID] = p /*if len(r.peers) == 1 { ffmpeg.GetFFmpeg().Start() log.Info("FFmpeg process started") }*/ r.m.Unlock() p.OnConnectionStateChange(func(connState webrtc.PeerConnectionState) { if connState == webrtc.PeerConnectionStateFailed || connState == webrtc.PeerConnectionStateClosed { r.m.Lock() defer r.m.Unlock() delete(r.peers, peerID) /*if len(r.peers) == 0 { ffmpeg.GetFFmpeg().Stop() log.Info("No clients anymore, stop ffmpeg process") }*/ p.Close() peers := make([]string, 0, len(r.peers)) for p := range r.peers { peers = append(peers, p) } log.WithField("peers", peers).Infof("Peer %s disconnected and resources cleaned up.", peerID) } }) vSender, err := p.AddTrack(r.videoTrack) if err != nil { return nil, ErrWebRTCParam("failed to add video track: %v", err) } processRTCP(vSender) aSender, err := p.AddTrack(r.audioTrack) if err != nil { return nil, ErrWebRTCParam("failed to add audio track: %v", err) } processRTCP(aSender) if err := p.SetRemoteDescription(offer); err != nil { return nil, ErrWebRTCParam("failed to set remote description: %v", err) } answer, err := p.CreateAnswer(nil) if err != nil { return nil, ErrWebRTCParam("failed to create answer: %v", err) } gatherComplete := webrtc.GatheringCompletePromise(p) if err := p.SetLocalDescription(answer); err != nil { return nil, ErrWebRTCParam("failed to set local description: %v", err) } <-gatherComplete return p.LocalDescription(), nil } func (r *RTC) VideoListenerRead() { duration := time.Second / time.Duration(60) ticker := time.NewTicker(duration) defer ticker.Stop() // Retrieve SPS and PPS once at the start sps, err := mpp.GetSPS() if err != nil { log.Fatalf("Failed to retrieve SPS: %v", err) } firstFrame := true for { select { case <-ticker.C: frame, err := mpp.GetInstance().CaptureAndEncode() if err != nil { log.Errorf("failed to capture frame: %v", err) continue } // If this is the first frame or an IDR frame, prepend SPS and PPS if firstFrame || isIDRFrame(frame) { firstFrame = false frame = append(sps, frame...) } sample := media.Sample{ Data: frame, Duration: duration, } err = r.videoTrack.WriteSample(sample) if err != nil { log.Errorf("failed to write sample: %v", err) } } } } func (r *RTC) AudioListenerRead() { listenerRead(r.audioListener, r.audioTrack) } func (r *RTC) Close() error { r.audioListener.Close() return nil } func NewPeer() (*webrtc.PeerConnection, error) { peer, err := webrtc.NewPeerConnection(webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, }) if err == nil { // Set the handler for ICE connection state // This will notify you when the peer has connected/disconnected peer.OnICEConnectionStateChange(func(connState webrtc.ICEConnectionState) { log.Infof("Connection State has changed %s", connState.String()) if connState == webrtc.ICEConnectionStateFailed { if closeErr := peer.Close(); closeErr != nil { panic(closeErr) } } }) } return peer, err } // Read incoming RTCP packets // Before these packets are retuned they are processed by interceptors. For things // like NACK this needs to be called. func processRTCP(rtpSender *webrtc.RTPSender) { go func() { rtcpBuf := make([]byte, 1500) for { if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil { return } } }() } func isIDRFrame(frame []byte) bool { // Check for NAL unit type 5 (IDR) for i := 0; i < len(frame)-4; i++ { if frame[i] == 0x00 && frame[i+1] == 0x00 && frame[i+2] == 0x00 && frame[i+3] == 0x01 { nalType := frame[i+4] & 0x1F if nalType == 5 { // IDR frame return true } } } return false }