Allow client to tail process logs

This commit is contained in:
Berger Eugene 2023-02-27 01:20:05 +02:00
parent 6bc7fb3fec
commit 9ed6dbd1a8
21 changed files with 319 additions and 31 deletions

View File

@ -18,7 +18,7 @@ buildGoModule rec {
nativeBuildInputs = [ installShellFiles ];
vendorSha256 = "g82JRmfbKH/XEZx2aLZOcyen23vOxQXR7VyeAYxCSi4=";
vendorSha256 = "iiGn0dYHNEp5Bs54X44sHbsG3HD92Xs4oah4iZXqqvQ=";
#vendorSha256 = lib.fakeSha256;
postInstall = ''

1
go.mod
View File

@ -9,6 +9,7 @@ require (
github.com/fatih/color v1.14.1
github.com/gdamore/tcell/v2 v2.5.4
github.com/gin-gonic/gin v1.8.2
github.com/gorilla/websocket v1.5.0
github.com/imdario/mergo v0.3.13
github.com/joho/godotenv v1.5.1
github.com/rivo/tview v0.0.0-20230208211350-7dfff1ce7854

2
go.sum
View File

@ -84,6 +84,8 @@ github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=

View File

@ -119,7 +119,7 @@ processes:
command: "kcalc"
disabled: true
pc_log:
__pc_log:
command: "tail -f -n100 process-compose-${USER}.log"
working_dir: "/tmp"
environment:
@ -128,6 +128,10 @@ processes:
process0:
condition: process_completed
__pc_log_client:
command: "tail -f -n100 process-compose-${USER}-client.log"
working_dir: "/tmp"
bat_config:
command: "batcat -f process-compose.yaml"

View File

@ -5,9 +5,8 @@ import (
"net/http"
"strconv"
"github.com/gin-gonic/gin"
"github.com/f1bonacc1/process-compose/src/app"
"github.com/gin-gonic/gin"
)
// @Schemes

View File

@ -45,5 +45,8 @@ func InitRoutes(useLogger bool) *gin.Engine {
r.POST("/process/start/:name", StartProcess)
r.POST("/process/restart/:name", RestartProcess)
//websocket
r.GET("/process/logs/ws/:name/:endOffset/:follow", HandleLogsStream)
return r
}

6
src/api/types.go Normal file
View File

@ -0,0 +1,6 @@
package api
type LogMessage struct {
Message string `json:"message"`
ProcessName string `json:"process_name"`
}

91
src/api/ws_api.go Normal file
View File

@ -0,0 +1,91 @@
package api
import (
"github.com/f1bonacc1/process-compose/src/app"
"github.com/f1bonacc1/process-compose/src/pclog"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"net/http"
"strconv"
)
var upgrader = websocket.Upgrader{}
func HandleLogsStream(c *gin.Context) {
procName := c.Param("name")
follow := c.Param("follow") == "true"
endOffset, err := strconv.Atoi(c.Param("endOffset"))
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
done := make(chan struct{})
logChan := make(chan LogMessage, 256)
connector := pclog.NewConnector(func(messages []string) {
for _, message := range messages {
msg := LogMessage{
Message: message,
ProcessName: procName,
}
logChan <- msg
}
if !follow {
close(logChan)
}
},
func(message string) {
msg := LogMessage{
Message: message,
ProcessName: procName,
}
logChan <- msg
},
endOffset)
go handleLog(ws, procName, connector, logChan, done)
if follow {
go handleIncoming(ws, done)
}
app.PROJ.GetLogsAndSubscribe(procName, connector)
}
func handleLog(ws *websocket.Conn, procName string, connector *pclog.Connector, logChan chan LogMessage, done chan struct{}) {
defer app.PROJ.UnSubscribeLogger(procName, connector)
defer ws.Close()
for {
select {
case msg, open := <-logChan:
if err := ws.WriteJSON(&msg); err != nil {
log.Err(err).Msg("Failed to write to socket")
return
}
if !open {
return
}
case <-done:
log.Warn().Msg("Socket closed remotely")
return
}
}
}
func handleIncoming(ws *websocket.Conn, done chan struct{}) {
defer close(done)
for {
msgType, _, err := ws.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
return
}
log.Err(err).Msgf("Failed to read from socket %d", msgType)
return
}
}
}

View File

@ -289,8 +289,7 @@ func (p *ProjectRunner) GetProcessLogLength(name string) int {
return logs.GetLogLength()
}
func (p *ProjectRunner) GetLogsAndSubscribe(name string, observer pclog.PcLogObserver) {
func (p *ProjectRunner) GetLogsAndSubscribe(name string, observer pclog.LogObserver) {
logs, err := p.getProcessLog(name)
if err != nil {
return
@ -298,12 +297,12 @@ func (p *ProjectRunner) GetLogsAndSubscribe(name string, observer pclog.PcLogObs
logs.GetLogsAndSubscribe(observer)
}
func (p *ProjectRunner) UnSubscribeLogger(name string) {
func (p *ProjectRunner) UnSubscribeLogger(name string, observer pclog.LogObserver) {
logs, err := p.getProcessLog(name)
if err != nil {
return
}
logs.UnSubscribe()
logs.UnSubscribe(observer)
}
func (p *ProjectRunner) selectRunningProcesses(procList []string) error {

69
src/client/logs.go Normal file
View File

@ -0,0 +1,69 @@
package client
import (
"fmt"
"github.com/f1bonacc1/process-compose/src/api"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"os"
"os/signal"
"time"
)
func ReadProcessLogs(address string, port int, name string, offset int, follow bool) error {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
url := fmt.Sprintf("ws://%s:%d/process/logs/ws/%s/%d/%v", address, port, name, offset, follow)
log.Info().Msgf("Connecting to %s", url)
ws, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
log.Error().Msgf("failed to dial to %s error: %v", url, err)
return err
}
defer ws.Close()
done := make(chan struct{})
go readLogs(done, ws, follow)
for {
select {
case <-done:
return nil
case <-interrupt:
fmt.Println("interrupt")
// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.
err := ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
fmt.Println("write close:", err)
return nil
}
select {
case <-done:
case <-time.After(time.Second):
}
return nil
}
}
}
func readLogs(done chan struct{}, ws *websocket.Conn, follow bool) {
defer close(done)
for {
var message api.LogMessage
if err := ws.ReadJSON(&message); err != nil {
if !follow && websocket.IsCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) {
return
}
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
return
}
log.Error().Msgf("failed to read message: %v", err)
return
}
if len(message.ProcessName) > 0 {
fmt.Printf("%s:\t%s\n", message.ProcessName, message.Message)
}
}
}

View File

@ -21,7 +21,7 @@ var infoCmd = &cobra.Command{
func printInfo() {
format := "%-15s %s\n"
fmt.Println("Process Compose")
fmt.Printf(format, "Logs:", config.LogFilePath)
fmt.Printf(format, "Logs:", config.GetLogFilePath())
path := config.GetShortCutsPath()
if len(path) > 0 {

39
src/cmd/logs.go Normal file
View File

@ -0,0 +1,39 @@
/*
Copyright © 2023 NAME HERE <EMAIL ADDRESS>
*/
package cmd
import (
"github.com/f1bonacc1/process-compose/src/client"
"github.com/rs/zerolog/log"
"math"
"github.com/spf13/cobra"
)
var (
follow bool
tailLength int
)
// logsCmd represents the logs command
var logsCmd = &cobra.Command{
Use: "logs [PROCESS]",
Short: "Fetch the logs of a process",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
name := args[0]
err := client.ReadProcessLogs(pcAddress, port, name, tailLength, follow)
if err != nil {
log.Error().Msgf("Failed to fetch logs for process %s: %v", name, err)
return
}
},
}
func init() {
processCmd.AddCommand(logsCmd)
logsCmd.Flags().BoolVarP(&follow, "follow", "f", false, "Follow log output")
logsCmd.Flags().IntVarP(&tailLength, "tail", "n", math.MaxInt, "Number of lines to show from the end of the logs (default - all)")
}

View File

@ -49,7 +49,7 @@ func init() {
rootCmd.Flags().BoolVarP(&isTui, "tui", "t", true, "disable tui (-t=false)")
rootCmd.PersistentFlags().IntVarP(&port, "port", "p", getPortDefault(), "port number")
rootCmd.PersistentFlags().StringArrayVarP(&opts.FileNames, "config", "f", getConfigDefault(), "path to config files to load")
rootCmd.Flags().StringArrayVarP(&opts.FileNames, "config", "f", getConfigDefault(), "path to config files to load")
}
func getTuiDefault() bool {

View File

@ -17,6 +17,9 @@ var upCmd = &cobra.Command{
If one or more process names are passed as arguments,
will start them and their dependencies only`,
Run: func(cmd *cobra.Command, args []string) {
if !cmd.Flags().Changed("tui") {
isTui = getTuiDefault()
}
api.StartHttpServer(!isTui, port)
runProject(args, noDeps)
},
@ -27,4 +30,6 @@ func init() {
upCmd.Flags().BoolVarP(&isTui, "tui", "t", true, "disable tui (-t=false)")
upCmd.Flags().BoolVarP(&noDeps, "no-deps", "", false, "don't start dependent processes")
upCmd.Flags().StringArrayVarP(&opts.FileNames, "config", "f", getConfigDefault(), "path to config files to load")
}

View File

@ -0,0 +1,35 @@
package pclog
type multiLineHandler func(string []string)
type lineHandler func(string string)
type Connector struct {
LogObserver
logLinesHandler multiLineHandler
logMessageHandler lineHandler
uniqueId string
taiLength int
}
func NewConnector(mlHandler multiLineHandler, slHandler lineHandler, tail int) *Connector {
return &Connector{
logLinesHandler: mlHandler,
logMessageHandler: slHandler,
uniqueId: GenerateUniqueID(10),
taiLength: tail,
}
}
func (c *Connector) AddLine(line string) {
c.logMessageHandler(line)
}
func (c *Connector) SetLines(lines []string) {
c.logLinesHandler(lines)
}
func (c *Connector) GetUniqueID() string {
return c.uniqueId
}
func (c *Connector) GetTailLength() int {
return c.taiLength
}

View File

@ -1,6 +1,8 @@
package pclog
type PcLogObserver interface {
type LogObserver interface {
AddLine(line string)
SetLines(lines []string)
GetTailLength() int
GetUniqueID() string
}

View File

@ -11,14 +11,14 @@ func NewNilLogger() *PcNilLog {
func (l *PcNilLog) Sync() {
}
func (l PcNilLog) Info(message string, process string, replica int) {
func (l *PcNilLog) Info(message string, process string, replica int) {
}
func (l PcNilLog) Error(message string, process string, replica int) {
func (l *PcNilLog) Error(message string, process string, replica int) {
}
func (l PcNilLog) Close() {
func (l *PcNilLog) Close() {
}

View File

@ -9,17 +9,17 @@ const (
)
type ProcessLogBuffer struct {
buffer []string
size int
observer PcLogObserver
mx sync.Mutex
buffer []string
size int
observers map[string]LogObserver
mx sync.Mutex
}
func NewLogBuffer(size int) *ProcessLogBuffer {
return &ProcessLogBuffer{
size: size,
buffer: make([]string, 0, size+slack),
observer: nil,
size: size,
buffer: make([]string, 0, size+slack),
observers: map[string]LogObserver{},
}
}
@ -30,8 +30,8 @@ func (b *ProcessLogBuffer) Write(message string) {
if len(b.buffer) > b.size+slack {
b.buffer = b.buffer[slack:]
}
if b.observer != nil {
b.observer.AddLine(message)
for _, observer := range b.observers {
observer.AddLine(message)
}
}
@ -82,15 +82,21 @@ func (b *ProcessLogBuffer) GetLogLength() int {
return len(b.buffer)
}
func (b *ProcessLogBuffer) GetLogsAndSubscribe(observer PcLogObserver) {
func (b *ProcessLogBuffer) GetLogsAndSubscribe(observer LogObserver) {
b.mx.Lock()
defer b.mx.Unlock()
observer.SetLines(b.buffer)
b.observer = observer
observer.SetLines(b.GetLogRange(observer.GetTailLength(), 0))
b.observers[observer.GetUniqueID()] = observer
}
func (b *ProcessLogBuffer) UnSubscribe() {
func (b *ProcessLogBuffer) Subscribe(observer LogObserver) {
b.mx.Lock()
defer b.mx.Unlock()
b.observer = nil
b.observers[observer.GetUniqueID()] = observer
}
func (b *ProcessLogBuffer) UnSubscribe(observer LogObserver) {
b.mx.Lock()
defer b.mx.Unlock()
delete(b.observers, observer.GetUniqueID())
}

15
src/pclog/unique_id.go Normal file
View File

@ -0,0 +1,15 @@
package pclog
import (
"crypto/rand"
"encoding/hex"
)
func GenerateUniqueID(length int) string {
if length%2 != 0 {
length += 1
}
b := make([]byte, length/2) //each byte is 2 chars
rand.Read(b)
return hex.EncodeToString(b)
}

View File

@ -56,7 +56,7 @@ func (pv *pcView) followLog(name string) {
func (pv *pcView) unFollowLog() {
if pv.loggedProc != "" {
app.PROJ.UnSubscribeLogger(pv.loggedProc)
app.PROJ.UnSubscribeLogger(pv.loggedProc, pv.logsText)
}
pv.logsText.Flush()
}

View File

@ -2,7 +2,9 @@ package tui
import (
"fmt"
"github.com/f1bonacc1/process-compose/src/pclog"
"io"
"math"
"strings"
"sync"
@ -16,6 +18,7 @@ type LogView struct {
ansiWriter io.Writer
mx sync.Mutex
useAnsi bool
uniqueId string
}
func NewLogView(maxLines int) *LogView {
@ -26,8 +29,9 @@ func NewLogView(maxLines int) *LogView {
SetDynamicColors(true).
SetScrollable(true).
SetMaxLines(maxLines),
buffer: &strings.Builder{},
useAnsi: false,
buffer: &strings.Builder{},
useAnsi: false,
uniqueId: pclog.GenerateUniqueID(10),
}
l.ansiWriter = tview.ANSIWriter(l)
l.SetBorder(true)
@ -59,6 +63,14 @@ func (l *LogView) SetLines(lines []string) {
l.AddLines(lines)
}
func (l *LogView) GetUniqueID() string {
return l.uniqueId
}
func (l *LogView) GetTailLength() int {
return math.MaxInt
}
func (l *LogView) ToggleWrap() {
l.isWrapOn = !l.isWrapOn
l.SetWrap(l.isWrapOn)