package stream import ( "os" "os/exec" "strings" "sync" "syscall" log "github.com/sirupsen/logrus" ) var pipedCmd *PipedCmd var mu sync.Mutex // PipedCmd struct manages a sequence of piped commands. type PipedCmd struct { cmds []*exec.Cmd running bool } // InitPipedCmd initializes a PipedCmd instance with a sequence of commands. func InitPipedCmd(cmd string) *PipedCmd { pipedCmd = &PipedCmd{} pipedCmd.ChangeCmd(cmd) return pipedCmd } func (p *PipedCmd) ChangeCmd(cmd string) { cmds := strings.Split(cmd, "|") for i, c := range cmds { cmds[i] = strings.TrimSpace(c) } log.Debugf("Cmds: %+v", cmds) pipedCmds := make([]*exec.Cmd, len(cmds)) // Initialize each command in the sequence for i, cmd := range cmds { if len(cmd) == 0 { continue } cmdArgs := strings.Split(cmd, " ") pipedCmds[i] = exec.Command(cmdArgs[0], cmdArgs[1:]...) } p.cmds = pipedCmds } // Start begins execution of all commands in the piped sequence. func (p *PipedCmd) Start() error { mu.Lock() defer mu.Unlock() if p.running { log.Debugf("Process is already running.") return nil } // Pipe each command's output to the next command's input for i := 0; i < len(p.cmds)-1; i++ { stdout, err := p.cmds[i].StdoutPipe() if err != nil { return err } p.cmds[i+1].Stdin = stdout } // pipe stdout and stderr of the last command into os p.cmds[len(p.cmds)-1].Stdout = os.Stdout p.cmds[len(p.cmds)-1].Stderr = os.Stderr // Start each command in the sequence for _, cmd := range p.cmds { if err := cmd.Start(); err != nil { p.terminateAll() return err } else { log.Debugf("Started process: (%d) %v", cmd.Process.Pid, cmd.Args) } } p.running = true // Monitor commands in a goroutine to handle failures go p.monitorCommands() return nil } // monitorCommands waits for each command to finish and checks for errors. func (p *PipedCmd) monitorCommands() { var wg sync.WaitGroup wg.Add(len(p.cmds)) for _, cmd := range p.cmds { go func(cmd *exec.Cmd) { defer wg.Done() err := cmd.Wait() if err != nil && err.Error() != "signal: terminated" { log.Debugf("Command failed: %v", err) p.terminateAll() // Terminate all if any command fails } }(cmd) } // Wait for all commands to complete or terminate wg.Wait() mu.Lock() defer mu.Unlock() p.running = false } // terminateAll sends a termination signal to all running commands. func (p *PipedCmd) terminateAll() { for _, cmd := range p.cmds { if cmd.Process != nil { log.Debugf("Sending SIGTERM to: (%d) %v", cmd.Process.Pid, cmd.Args) _ = cmd.Process.Signal(syscall.SIGTERM) // Send SIGTERM to allow graceful termination } } p.running = false } // Stop manually stops all commands in the sequence. func (p *PipedCmd) Stop() { mu.Lock() defer mu.Unlock() if !p.running { log.Debug("Process is not running.") return } log.Debug("Stopping process...") p.terminateAll() }