package rtc import ( "errors" "fmt" "net" "sync" "github.com/google/uuid" log "github.com/sirupsen/logrus" "github.com/pion/webrtc/v4" ) var rtc *RTC func Get() *RTC { return rtc } var ErrWebRTC = errors.New("webrtc") var ErrWebRTCParam = func(format string, args ...any) error { return fmt.Errorf("%w: "+format, args...) } var ErrPeerClosedConn = ErrWebRTCParam("peer closed conn") type RTC struct { peers map[string]*webrtc.PeerConnection videoListener *net.UDPConn audioListener *net.UDPConn videoTrack *webrtc.TrackLocalStaticRTP audioTrack *webrtc.TrackLocalStaticRTP m sync.Mutex } func (r *RTC) AddPeer(p *webrtc.PeerConnection, offer webrtc.SessionDescription) (*webrtc.SessionDescription, error) { peerID := uuid.New().String() r.m.Lock() r.peers[peerID] = p r.m.Unlock() p.OnConnectionStateChange(func(connState webrtc.PeerConnectionState) { if connState == webrtc.PeerConnectionStateFailed || connState == webrtc.PeerConnectionStateClosed { r.m.Lock() defer r.m.Unlock() delete(r.peers, peerID) p.Close() peers := make([]string, 0, len(r.peers)) for p := range r.peers { peers = append(peers, p) } log.WithField("peers", peers).Infof("Peer %s disconnected and resources cleaned up.", peerID) } }) vSender, err := p.AddTrack(r.videoTrack) if err != nil { return nil, ErrWebRTCParam("failed to add video track: %v", err) } processRTCP(vSender) aSender, err := p.AddTrack(r.audioTrack) if err != nil { return nil, ErrWebRTCParam("failed to add audio track: %v", err) } processRTCP(aSender) if err := p.SetRemoteDescription(offer); err != nil { return nil, ErrWebRTCParam("failed to set remote description: %v", err) } answer, err := p.CreateAnswer(nil) if err != nil { return nil, ErrWebRTCParam("failed to create answer: %v", err) } gatherComplete := webrtc.GatheringCompletePromise(p) if err := p.SetLocalDescription(answer); err != nil { return nil, ErrWebRTCParam("failed to set local description: %v", err) } <-gatherComplete return p.LocalDescription(), nil } func (r *RTC) VideoListenerRead() { listenerRead(r.videoListener, r.videoTrack) } func (r *RTC) AudioListenerRead() { listenerRead(r.audioListener, r.audioTrack) } func (r *RTC) Close() error { r.videoListener.Close() r.audioListener.Close() return nil } func NewPeer() (*webrtc.PeerConnection, error) { peer, err := webrtc.NewPeerConnection(webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, }) if err == nil { // Set the handler for ICE connection state // This will notify you when the peer has connected/disconnected peer.OnICEConnectionStateChange(func(connState webrtc.ICEConnectionState) { log.Infof("Connection State has changed %s", connState.String()) if connState == webrtc.ICEConnectionStateFailed { if closeErr := peer.Close(); closeErr != nil { panic(closeErr) } } }) } return peer, err } // Read incoming RTCP packets // Before these packets are retuned they are processed by interceptors. For things // like NACK this needs to be called. func processRTCP(rtpSender *webrtc.RTPSender) { go func() { rtcpBuf := make([]byte, 1500) for { if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil { return } } }() }