From 9ed6dbd1a863b88b45eb563d440e3a206bfb115e Mon Sep 17 00:00:00 2001 From: Berger Eugene Date: Mon, 27 Feb 2023 01:20:05 +0200 Subject: [PATCH] Allow client to tail process logs --- default.nix | 2 +- go.mod | 1 + go.sum | 2 + process-compose.yaml | 6 +- src/api/pc_api.go | 3 +- src/api/routes.go | 3 + src/api/types.go | 6 ++ src/api/ws_api.go | 91 +++++++++++++++++++++++++++++ src/app/project_runner.go | 7 +-- src/client/logs.go | 69 ++++++++++++++++++++++ src/cmd/info.go | 2 +- src/cmd/logs.go | 39 +++++++++++++ src/cmd/root.go | 2 +- src/cmd/up.go | 5 ++ src/pclog/log_observer_connector.go | 35 +++++++++++ src/pclog/logs_observer.go | 4 +- src/pclog/nil_logger.go | 6 +- src/pclog/process_log_buffer.go | 34 ++++++----- src/pclog/unique_id.go | 15 +++++ src/tui/log-operations.go | 2 +- src/tui/log-viewer.go | 16 ++++- 21 files changed, 319 insertions(+), 31 deletions(-) create mode 100644 src/api/types.go create mode 100644 src/api/ws_api.go create mode 100644 src/client/logs.go create mode 100644 src/cmd/logs.go create mode 100644 src/pclog/log_observer_connector.go create mode 100644 src/pclog/unique_id.go diff --git a/default.nix b/default.nix index 0455dc0..c70744d 100644 --- a/default.nix +++ b/default.nix @@ -18,7 +18,7 @@ buildGoModule rec { nativeBuildInputs = [ installShellFiles ]; - vendorSha256 = "g82JRmfbKH/XEZx2aLZOcyen23vOxQXR7VyeAYxCSi4="; + vendorSha256 = "iiGn0dYHNEp5Bs54X44sHbsG3HD92Xs4oah4iZXqqvQ="; #vendorSha256 = lib.fakeSha256; postInstall = '' diff --git a/go.mod b/go.mod index f4fcb36..cd79c70 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/fatih/color v1.14.1 github.com/gdamore/tcell/v2 v2.5.4 github.com/gin-gonic/gin v1.8.2 + github.com/gorilla/websocket v1.5.0 github.com/imdario/mergo v0.3.13 github.com/joho/godotenv v1.5.1 github.com/rivo/tview v0.0.0-20230208211350-7dfff1ce7854 diff --git a/go.sum b/go.sum index 02c8314..ddf44bb 100644 --- a/go.sum +++ b/go.sum @@ -84,6 +84,8 @@ github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk= diff --git a/process-compose.yaml b/process-compose.yaml index 358ac6a..638d653 100644 --- a/process-compose.yaml +++ b/process-compose.yaml @@ -119,7 +119,7 @@ processes: command: "kcalc" disabled: true - pc_log: + __pc_log: command: "tail -f -n100 process-compose-${USER}.log" working_dir: "/tmp" environment: @@ -128,6 +128,10 @@ processes: process0: condition: process_completed + __pc_log_client: + command: "tail -f -n100 process-compose-${USER}-client.log" + working_dir: "/tmp" + bat_config: command: "batcat -f process-compose.yaml" diff --git a/src/api/pc_api.go b/src/api/pc_api.go index ac33cca..1d69bea 100644 --- a/src/api/pc_api.go +++ b/src/api/pc_api.go @@ -5,9 +5,8 @@ import ( "net/http" "strconv" - "github.com/gin-gonic/gin" - "github.com/f1bonacc1/process-compose/src/app" + "github.com/gin-gonic/gin" ) // @Schemes diff --git a/src/api/routes.go b/src/api/routes.go index f1d3c05..98e802c 100644 --- a/src/api/routes.go +++ b/src/api/routes.go @@ -45,5 +45,8 @@ func InitRoutes(useLogger bool) *gin.Engine { r.POST("/process/start/:name", StartProcess) r.POST("/process/restart/:name", RestartProcess) + //websocket + r.GET("/process/logs/ws/:name/:endOffset/:follow", HandleLogsStream) + return r } diff --git a/src/api/types.go b/src/api/types.go new file mode 100644 index 0000000..ad8b7d7 --- /dev/null +++ b/src/api/types.go @@ -0,0 +1,6 @@ +package api + +type LogMessage struct { + Message string `json:"message"` + ProcessName string `json:"process_name"` +} diff --git a/src/api/ws_api.go b/src/api/ws_api.go new file mode 100644 index 0000000..eb48493 --- /dev/null +++ b/src/api/ws_api.go @@ -0,0 +1,91 @@ +package api + +import ( + "github.com/f1bonacc1/process-compose/src/app" + "github.com/f1bonacc1/process-compose/src/pclog" + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + "github.com/rs/zerolog/log" + "net/http" + "strconv" +) + +var upgrader = websocket.Upgrader{} + +func HandleLogsStream(c *gin.Context) { + procName := c.Param("name") + follow := c.Param("follow") == "true" + endOffset, err := strconv.Atoi(c.Param("endOffset")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + done := make(chan struct{}) + logChan := make(chan LogMessage, 256) + connector := pclog.NewConnector(func(messages []string) { + for _, message := range messages { + msg := LogMessage{ + Message: message, + ProcessName: procName, + } + logChan <- msg + } + if !follow { + close(logChan) + } + }, + func(message string) { + msg := LogMessage{ + Message: message, + ProcessName: procName, + } + logChan <- msg + }, + endOffset) + go handleLog(ws, procName, connector, logChan, done) + if follow { + go handleIncoming(ws, done) + } + app.PROJ.GetLogsAndSubscribe(procName, connector) +} + +func handleLog(ws *websocket.Conn, procName string, connector *pclog.Connector, logChan chan LogMessage, done chan struct{}) { + defer app.PROJ.UnSubscribeLogger(procName, connector) + defer ws.Close() + for { + select { + case msg, open := <-logChan: + if err := ws.WriteJSON(&msg); err != nil { + log.Err(err).Msg("Failed to write to socket") + return + } + if !open { + return + } + case <-done: + log.Warn().Msg("Socket closed remotely") + return + } + + } +} + +func handleIncoming(ws *websocket.Conn, done chan struct{}) { + defer close(done) + for { + msgType, _, err := ws.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + return + } + log.Err(err).Msgf("Failed to read from socket %d", msgType) + return + } + } +} diff --git a/src/app/project_runner.go b/src/app/project_runner.go index c662738..ea9202a 100644 --- a/src/app/project_runner.go +++ b/src/app/project_runner.go @@ -289,8 +289,7 @@ func (p *ProjectRunner) GetProcessLogLength(name string) int { return logs.GetLogLength() } -func (p *ProjectRunner) GetLogsAndSubscribe(name string, observer pclog.PcLogObserver) { - +func (p *ProjectRunner) GetLogsAndSubscribe(name string, observer pclog.LogObserver) { logs, err := p.getProcessLog(name) if err != nil { return @@ -298,12 +297,12 @@ func (p *ProjectRunner) GetLogsAndSubscribe(name string, observer pclog.PcLogObs logs.GetLogsAndSubscribe(observer) } -func (p *ProjectRunner) UnSubscribeLogger(name string) { +func (p *ProjectRunner) UnSubscribeLogger(name string, observer pclog.LogObserver) { logs, err := p.getProcessLog(name) if err != nil { return } - logs.UnSubscribe() + logs.UnSubscribe(observer) } func (p *ProjectRunner) selectRunningProcesses(procList []string) error { diff --git a/src/client/logs.go b/src/client/logs.go new file mode 100644 index 0000000..c5bb8bb --- /dev/null +++ b/src/client/logs.go @@ -0,0 +1,69 @@ +package client + +import ( + "fmt" + "github.com/f1bonacc1/process-compose/src/api" + "github.com/gorilla/websocket" + "github.com/rs/zerolog/log" + "os" + "os/signal" + "time" +) + +func ReadProcessLogs(address string, port int, name string, offset int, follow bool) error { + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + url := fmt.Sprintf("ws://%s:%d/process/logs/ws/%s/%d/%v", address, port, name, offset, follow) + log.Info().Msgf("Connecting to %s", url) + ws, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + log.Error().Msgf("failed to dial to %s error: %v", url, err) + return err + } + defer ws.Close() + done := make(chan struct{}) + + go readLogs(done, ws, follow) + + for { + select { + case <-done: + return nil + case <-interrupt: + fmt.Println("interrupt") + + // Cleanly close the connection by sending a close message and then + // waiting (with timeout) for the server to close the connection. + err := ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + fmt.Println("write close:", err) + return nil + } + select { + case <-done: + case <-time.After(time.Second): + } + return nil + } + } +} + +func readLogs(done chan struct{}, ws *websocket.Conn, follow bool) { + defer close(done) + for { + var message api.LogMessage + if err := ws.ReadJSON(&message); err != nil { + if !follow && websocket.IsCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { + return + } + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + return + } + log.Error().Msgf("failed to read message: %v", err) + return + } + if len(message.ProcessName) > 0 { + fmt.Printf("%s:\t%s\n", message.ProcessName, message.Message) + } + } +} diff --git a/src/cmd/info.go b/src/cmd/info.go index 476793b..5b7f1c3 100644 --- a/src/cmd/info.go +++ b/src/cmd/info.go @@ -21,7 +21,7 @@ var infoCmd = &cobra.Command{ func printInfo() { format := "%-15s %s\n" fmt.Println("Process Compose") - fmt.Printf(format, "Logs:", config.LogFilePath) + fmt.Printf(format, "Logs:", config.GetLogFilePath()) path := config.GetShortCutsPath() if len(path) > 0 { diff --git a/src/cmd/logs.go b/src/cmd/logs.go new file mode 100644 index 0000000..c16598b --- /dev/null +++ b/src/cmd/logs.go @@ -0,0 +1,39 @@ +/* +Copyright © 2023 NAME HERE +*/ +package cmd + +import ( + "github.com/f1bonacc1/process-compose/src/client" + "github.com/rs/zerolog/log" + "math" + + "github.com/spf13/cobra" +) + +var ( + follow bool + tailLength int +) + +// logsCmd represents the logs command +var logsCmd = &cobra.Command{ + Use: "logs [PROCESS]", + Short: "Fetch the logs of a process", + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + name := args[0] + err := client.ReadProcessLogs(pcAddress, port, name, tailLength, follow) + if err != nil { + log.Error().Msgf("Failed to fetch logs for process %s: %v", name, err) + return + } + }, +} + +func init() { + processCmd.AddCommand(logsCmd) + + logsCmd.Flags().BoolVarP(&follow, "follow", "f", false, "Follow log output") + logsCmd.Flags().IntVarP(&tailLength, "tail", "n", math.MaxInt, "Number of lines to show from the end of the logs (default - all)") +} diff --git a/src/cmd/root.go b/src/cmd/root.go index e40afe3..1d04281 100644 --- a/src/cmd/root.go +++ b/src/cmd/root.go @@ -49,7 +49,7 @@ func init() { rootCmd.Flags().BoolVarP(&isTui, "tui", "t", true, "disable tui (-t=false)") rootCmd.PersistentFlags().IntVarP(&port, "port", "p", getPortDefault(), "port number") - rootCmd.PersistentFlags().StringArrayVarP(&opts.FileNames, "config", "f", getConfigDefault(), "path to config files to load") + rootCmd.Flags().StringArrayVarP(&opts.FileNames, "config", "f", getConfigDefault(), "path to config files to load") } func getTuiDefault() bool { diff --git a/src/cmd/up.go b/src/cmd/up.go index 97fa58e..b9b90d8 100644 --- a/src/cmd/up.go +++ b/src/cmd/up.go @@ -17,6 +17,9 @@ var upCmd = &cobra.Command{ If one or more process names are passed as arguments, will start them and their dependencies only`, Run: func(cmd *cobra.Command, args []string) { + if !cmd.Flags().Changed("tui") { + isTui = getTuiDefault() + } api.StartHttpServer(!isTui, port) runProject(args, noDeps) }, @@ -27,4 +30,6 @@ func init() { upCmd.Flags().BoolVarP(&isTui, "tui", "t", true, "disable tui (-t=false)") upCmd.Flags().BoolVarP(&noDeps, "no-deps", "", false, "don't start dependent processes") + upCmd.Flags().StringArrayVarP(&opts.FileNames, "config", "f", getConfigDefault(), "path to config files to load") + } diff --git a/src/pclog/log_observer_connector.go b/src/pclog/log_observer_connector.go new file mode 100644 index 0000000..1a360c0 --- /dev/null +++ b/src/pclog/log_observer_connector.go @@ -0,0 +1,35 @@ +package pclog + +type multiLineHandler func(string []string) +type lineHandler func(string string) + +type Connector struct { + LogObserver + logLinesHandler multiLineHandler + logMessageHandler lineHandler + uniqueId string + taiLength int +} + +func NewConnector(mlHandler multiLineHandler, slHandler lineHandler, tail int) *Connector { + return &Connector{ + logLinesHandler: mlHandler, + logMessageHandler: slHandler, + uniqueId: GenerateUniqueID(10), + taiLength: tail, + } +} + +func (c *Connector) AddLine(line string) { + c.logMessageHandler(line) +} +func (c *Connector) SetLines(lines []string) { + c.logLinesHandler(lines) +} +func (c *Connector) GetUniqueID() string { + return c.uniqueId +} + +func (c *Connector) GetTailLength() int { + return c.taiLength +} diff --git a/src/pclog/logs_observer.go b/src/pclog/logs_observer.go index 9a9fc66..0633669 100644 --- a/src/pclog/logs_observer.go +++ b/src/pclog/logs_observer.go @@ -1,6 +1,8 @@ package pclog -type PcLogObserver interface { +type LogObserver interface { AddLine(line string) SetLines(lines []string) + GetTailLength() int + GetUniqueID() string } diff --git a/src/pclog/nil_logger.go b/src/pclog/nil_logger.go index aa8305f..b8a97d3 100644 --- a/src/pclog/nil_logger.go +++ b/src/pclog/nil_logger.go @@ -11,14 +11,14 @@ func NewNilLogger() *PcNilLog { func (l *PcNilLog) Sync() { } -func (l PcNilLog) Info(message string, process string, replica int) { +func (l *PcNilLog) Info(message string, process string, replica int) { } -func (l PcNilLog) Error(message string, process string, replica int) { +func (l *PcNilLog) Error(message string, process string, replica int) { } -func (l PcNilLog) Close() { +func (l *PcNilLog) Close() { } diff --git a/src/pclog/process_log_buffer.go b/src/pclog/process_log_buffer.go index 8fc2e45..ab3124d 100644 --- a/src/pclog/process_log_buffer.go +++ b/src/pclog/process_log_buffer.go @@ -9,17 +9,17 @@ const ( ) type ProcessLogBuffer struct { - buffer []string - size int - observer PcLogObserver - mx sync.Mutex + buffer []string + size int + observers map[string]LogObserver + mx sync.Mutex } func NewLogBuffer(size int) *ProcessLogBuffer { return &ProcessLogBuffer{ - size: size, - buffer: make([]string, 0, size+slack), - observer: nil, + size: size, + buffer: make([]string, 0, size+slack), + observers: map[string]LogObserver{}, } } @@ -30,8 +30,8 @@ func (b *ProcessLogBuffer) Write(message string) { if len(b.buffer) > b.size+slack { b.buffer = b.buffer[slack:] } - if b.observer != nil { - b.observer.AddLine(message) + for _, observer := range b.observers { + observer.AddLine(message) } } @@ -82,15 +82,21 @@ func (b *ProcessLogBuffer) GetLogLength() int { return len(b.buffer) } -func (b *ProcessLogBuffer) GetLogsAndSubscribe(observer PcLogObserver) { +func (b *ProcessLogBuffer) GetLogsAndSubscribe(observer LogObserver) { b.mx.Lock() defer b.mx.Unlock() - observer.SetLines(b.buffer) - b.observer = observer + observer.SetLines(b.GetLogRange(observer.GetTailLength(), 0)) + b.observers[observer.GetUniqueID()] = observer } -func (b *ProcessLogBuffer) UnSubscribe() { +func (b *ProcessLogBuffer) Subscribe(observer LogObserver) { b.mx.Lock() defer b.mx.Unlock() - b.observer = nil + b.observers[observer.GetUniqueID()] = observer +} + +func (b *ProcessLogBuffer) UnSubscribe(observer LogObserver) { + b.mx.Lock() + defer b.mx.Unlock() + delete(b.observers, observer.GetUniqueID()) } diff --git a/src/pclog/unique_id.go b/src/pclog/unique_id.go new file mode 100644 index 0000000..a376ce3 --- /dev/null +++ b/src/pclog/unique_id.go @@ -0,0 +1,15 @@ +package pclog + +import ( + "crypto/rand" + "encoding/hex" +) + +func GenerateUniqueID(length int) string { + if length%2 != 0 { + length += 1 + } + b := make([]byte, length/2) //each byte is 2 chars + rand.Read(b) + return hex.EncodeToString(b) +} diff --git a/src/tui/log-operations.go b/src/tui/log-operations.go index 44fc6c0..c8d1678 100644 --- a/src/tui/log-operations.go +++ b/src/tui/log-operations.go @@ -56,7 +56,7 @@ func (pv *pcView) followLog(name string) { func (pv *pcView) unFollowLog() { if pv.loggedProc != "" { - app.PROJ.UnSubscribeLogger(pv.loggedProc) + app.PROJ.UnSubscribeLogger(pv.loggedProc, pv.logsText) } pv.logsText.Flush() } diff --git a/src/tui/log-viewer.go b/src/tui/log-viewer.go index 455a258..53ee632 100644 --- a/src/tui/log-viewer.go +++ b/src/tui/log-viewer.go @@ -2,7 +2,9 @@ package tui import ( "fmt" + "github.com/f1bonacc1/process-compose/src/pclog" "io" + "math" "strings" "sync" @@ -16,6 +18,7 @@ type LogView struct { ansiWriter io.Writer mx sync.Mutex useAnsi bool + uniqueId string } func NewLogView(maxLines int) *LogView { @@ -26,8 +29,9 @@ func NewLogView(maxLines int) *LogView { SetDynamicColors(true). SetScrollable(true). SetMaxLines(maxLines), - buffer: &strings.Builder{}, - useAnsi: false, + buffer: &strings.Builder{}, + useAnsi: false, + uniqueId: pclog.GenerateUniqueID(10), } l.ansiWriter = tview.ANSIWriter(l) l.SetBorder(true) @@ -59,6 +63,14 @@ func (l *LogView) SetLines(lines []string) { l.AddLines(lines) } +func (l *LogView) GetUniqueID() string { + return l.uniqueId +} + +func (l *LogView) GetTailLength() int { + return math.MaxInt +} + func (l *LogView) ToggleWrap() { l.isWrapOn = !l.isWrapOn l.SetWrap(l.isWrapOn)