package stream import ( "log" "os/exec" "rkkvm/config" "strings" "sync" "syscall" ) var pipedCmd *PipedCmd // PipedCmd struct manages a sequence of piped commands. type PipedCmd struct { cmds []*exec.Cmd mu sync.Mutex running bool finished chan struct{} } // InitPipedCmd initializes a PipedCmd instance with a sequence of commands. func InitPipedCmd(cmds []string) *PipedCmd { pipedCmds := make([]*exec.Cmd, len(cmds)) // Initialize each command in the sequence for i, cmd := range cmds { if len(cmd) == 0 { continue } cmdArgs := strings.Split(cmd, " ") pipedCmds[i] = exec.Command(config.RootFS+"/"+cmdArgs[0], cmdArgs[1:]...) } pipedCmd = &PipedCmd{ cmds: pipedCmds, finished: make(chan struct{}), } return pipedCmd } // Start begins execution of all commands in the piped sequence. func (p *PipedCmd) Start() error { p.mu.Lock() defer p.mu.Unlock() if p.running { log.Println("Process is already running.") return nil } // Pipe each command's output to the next command's input for i := 0; i < len(p.cmds)-1; i++ { stdout, err := p.cmds[i].StdoutPipe() if err != nil { return err } p.cmds[i+1].Stdin = stdout } // Start each command in the sequence for _, cmd := range p.cmds { if err := cmd.Start(); err != nil { p.terminateAll() // Clean up if any command fails to start return err } } p.running = true // Monitor commands in a goroutine to handle failures go p.monitorCommands() return nil } // monitorCommands waits for each command to finish and checks for errors. func (p *PipedCmd) monitorCommands() { var wg sync.WaitGroup wg.Add(len(p.cmds)) for _, cmd := range p.cmds { go func(cmd *exec.Cmd) { defer wg.Done() err := cmd.Wait() if err != nil { log.Printf("Command failed: %v", err) p.terminateAll() // Terminate all if any command fails } }(cmd) } // Wait for all commands to complete or terminate wg.Wait() p.mu.Lock() p.running = false close(p.finished) p.mu.Unlock() } // terminateAll sends a termination signal to all running commands. func (p *PipedCmd) terminateAll() { p.mu.Lock() defer p.mu.Unlock() for _, cmd := range p.cmds { if cmd.Process != nil { _ = cmd.Process.Signal(syscall.SIGTERM) // Send SIGTERM to allow graceful termination } } } // Stop manually stops all commands in the sequence. func (p *PipedCmd) Stop() { p.mu.Lock() defer p.mu.Unlock() if !p.running { log.Println("Process is not running.") return } log.Println("Stopping process...") p.terminateAll() p.running = false close(p.finished) } /* import ( "os" "os/exec" "strings" "sync" log "github.com/sirupsen/logrus" ) type PipedCmd struct { cmds []string cmdsExec []*exec.Cmd mu sync.Mutex running bool stopChan chan struct{} finished chan struct{} } // Init initializes the PipedCmd with a slice of command strings func InitPipedCmd(cmds []string) *PipedCmd { return &PipedCmd{ cmds: cmds, } } // Start initializes and starts the piped commands func (p *PipedCmd) Start() { p.mu.Lock() defer p.mu.Unlock() if p.running { log.Debug("process is already running.") return } p.stopChan = make(chan struct{}) p.finished = make(chan struct{}) log.Debugf("Starting piped commands: <%s>", strings.Join(p.cmds, " | ")) // Create commands and set up pipes for i, cmdStr := range p.cmds { // Split command string into command and arguments cmdParts := strings.Fields(cmdStr) if len(cmdParts) == 0 { log.Errorf("Empty command string at index %d", i) continue } cmd := exec.Command(cmdParts[0], cmdParts[1:]...) // Set up pipes for stdin/stdout if i > 0 { stdin, err := p.cmdsExec[i-1].StdoutPipe() if err != nil { log.Errorf("Couldn't set up stdout pipe for command %s: %v", p.cmdsExec[i-1].Path, err) return } cmd.Stdin = stdin } cmd.Stderr = os.Stderr // Log stderr to standard error output p.cmdsExec = append(p.cmdsExec, cmd) } // Start the first command if err := p.cmdsExec[0].Start(); err != nil { log.Errorf("Failed to start command: %v", err) return } p.running = true // Start remaining commands for _, cmd := range p.cmdsExec[1:] { if err := cmd.Start(); err != nil { log.Errorf("Failed to start command: %v", err) return } } go func() { // Wait for the last command to finish err := p.cmdsExec[len(p.cmdsExec)-1].Wait() p.running = false log.Errorf("process exited with error: %v", err) // Signal that the process has finished close(p.finished) close(p.stopChan) }() } // Stop terminates the piped commands gracefully func (p *PipedCmd) Stop() { p.mu.Lock() defer p.mu.Unlock() if !p.running { log.Debug("process is not running.") return } log.Warn("Stopping process...") for _, cmd := range p.cmdsExec { if err := cmd.Process.Kill(); err != nil { log.Errorf("Failed to kill process: %v", err) } } select { case <-p.finished: log.Info("stopped as expected") case <-p.stopChan: log.Info("was killed") } p.running = false } // ChangeArgs updates the command arguments func (p *PipedCmd) ChangeArgs(newCmds []string) { p.mu.Lock() defer p.mu.Unlock() p.cmds = newCmds log.Printf("Updated process commands: %v", p.cmds) } // Watch monitors the process and restarts if it stops unexpectedly func (p *PipedCmd) Watch() { for { select { case <-p.stopChan: log.Errorf("process stopped unexpectedly. Restarting...") p.Start() } } } */