Scaling Support

This commit is contained in:
Berger Eugene 2023-07-08 00:21:42 +03:00
parent 328fecdc1b
commit e9b02c5fab
31 changed files with 951 additions and 171 deletions

20
go.sum
View File

@ -4,8 +4,6 @@ github.com/InVisionApp/go-logger v1.0.1 h1:WFL19PViM1mHUmUWfsv5zMo379KSWj2MRmBlz
github.com/InVisionApp/go-logger v1.0.1/go.mod h1:+cGTDSn+P8105aZkeOfIhdd7vFO5X1afUHcjvanY0L8=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/adrg/xdg v0.4.0 h1:RzRqFcjH4nE5C6oTAxhBtoE2IRyjBSa62SCbyPidvls=
github.com/adrg/xdg v0.4.0/go.mod h1:N6ag73EX4wyxeaoeHctc1mas01KZgsj5tYiAIwqJE/E=
@ -59,7 +57,6 @@ github.com/go-openapi/spec v0.20.9 h1:xnlYNQAwKd2VQRRfwTEI0DcK+2cbuvI/0c7jx3gA8/
github.com/go-openapi/spec v0.20.9/go.mod h1:2OpW+JddWPrpXSCIX8eOx7lZ5iyuWj3RYR6VaaBKcWA=
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-openapi/swag v0.19.15/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogBU=
github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
@ -68,8 +65,6 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js=
github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/go-playground/validator/v10 v10.14.1 h1:9c50NUPC30zyuKprjL3vNZ0m5oG+jU0zvx4AqHGnv4k=
github.com/go-playground/validator/v10 v10.14.1/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/go-redis/redis v6.15.5+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
@ -86,8 +81,6 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
@ -101,8 +94,6 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
@ -147,8 +138,6 @@ github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNc
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/tview v0.0.0-20230511053024-822bd067b165 h1:YMycYmUdmLI7ZTn86HUEDM8E8fCMz7twtysBW3SlG0c=
github.com/rivo/tview v0.0.0-20230511053024-822bd067b165/go.mod h1:nVwGv4MP47T0jvlk7KuTTjjuSmrGO4JF0iaiNt4bufE=
github.com/rivo/tview v0.0.0-20230530133550-8bd761dda819 h1:qRMCGgwKl66uWe7Hnzl5bCvZlfrLNIxOx7K00j5XeNc=
github.com/rivo/tview v0.0.0-20230530133550-8bd761dda819/go.mod h1:nVwGv4MP47T0jvlk7KuTTjjuSmrGO4JF0iaiNt4bufE=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
@ -223,35 +212,26 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.9.0 h1:GRRCnKYhdQrD8kfRAdQ6Zcw1P0OcELxGLKJvtjVMZ28=
golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.10.0 h1:UpjohKhiEgNc0CSauXmwYftY1+LlaC75SJwh0SgCX58=
golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo=
golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/tools v0.9.3 h1:Gn1I8+64MsuTb/HpH+LmQtNas23LhUVr3rYZ0eKuaMM=
golang.org/x/tools v0.9.3/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -34,7 +34,7 @@ processes:
_process2:
command: "./test_loop.bash process2"
log_location: ./pc.proc2.log
log_location: ./pc.proc2.{PC_LOG}.log
availability:
restart: "on_failure"
# depends_on:
@ -59,7 +59,7 @@ processes:
failure_threshold: 3
process3:
command: "./test_loop.bash process3"
command: "./test_loop.bash $PROC4"
availability:
restart: "always"
backoff_seconds: 2
@ -70,6 +70,8 @@ processes:
process4:
command: "./test_loop.bash process4-override"
namespace: test
replicas: 2
disable_ansi_colors: true
# availability:
# restart: on_failure

View File

@ -163,6 +163,30 @@ func (api *PcApi) RestartProcess(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"name": name})
}
// @Schemes
// @Description Scale a process
// @Tags Process
// @Summary Scale a process to a given replicas count
// @Produce json
// @Param name path string true "Process Name"
// @Success 200 {string} string "Scaled Process Name"
// @Router /process/scale/{name}/{scale} [post]
func (api *PcApi) ScaleProcess(c *gin.Context) {
name := c.Param("name")
scale, err := strconv.Atoi(c.Param("scale"))
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
err = api.project.ScaleProcess(name, scale)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"name": name})
}
// @Schemes
// @Description Check if server is responding
// @Tags Liveness

View File

@ -47,6 +47,7 @@ func InitRoutes(useLogger bool, handler *PcApi) *gin.Engine {
r.PATCH("/process/stop/:name", handler.StopProcess)
r.POST("/process/start/:name", handler.StartProcess)
r.POST("/process/restart/:name", handler.RestartProcess)
r.PATCH("/process/scale/:name/:scale", handler.ScaleProcess)
//websocket
r.GET("/process/logs/ws", handler.HandleLogsStream)

View File

@ -0,0 +1,18 @@
package app
import (
"io"
)
type Commander interface {
Start() error
Wait() error
ExitCode() int
Pid() int
SetEnv([]string)
SetDir(string)
StdoutPipe() (io.ReadCloser, error)
StderrPipe() (io.ReadCloser, error)
Stop(int) error
SetCmdArgs()
}

View File

@ -8,8 +8,8 @@ import (
"io"
"math/rand"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"syscall"
"time"
@ -29,9 +29,8 @@ const (
type Process struct {
sync.Mutex
globalEnv []string
procConf types.ProcessConfig
procConf *types.ProcessConfig
procState *types.ProcessState
stateMtx sync.Mutex
procCond sync.Cond
@ -44,35 +43,26 @@ type Process struct {
redColor func(a ...interface{}) string
logBuffer *pclog.ProcessLogBuffer
logger pclog.PcLogger
command *exec.Cmd
command Commander
done bool
replica int
startTime time.Time
liveProber *health.Prober
readyProber *health.Prober
shellConfig command.ShellConfig
}
func NewProcess(
globalEnv []string,
logger pclog.PcLogger,
procConf types.ProcessConfig,
procState *types.ProcessState,
procLog *pclog.ProcessLogBuffer,
replica int,
shellConfig command.ShellConfig) *Process {
func NewProcess(globalEnv []string, logger pclog.PcLogger, procConf *types.ProcessConfig, processState *types.ProcessState, procLog *pclog.ProcessLogBuffer, shellConfig command.ShellConfig) *Process {
colNumeric := rand.Intn(int(color.FgHiWhite)-int(color.FgHiBlack)) + int(color.FgHiBlack)
proc := &Process{
globalEnv: globalEnv,
procConf: procConf,
procState: processState,
procColor: color.New(color.Attribute(colNumeric), color.Bold).SprintFunc(),
redColor: color.New(color.FgHiRed).SprintFunc(),
noColor: color.New(color.Reset).SprintFunc(),
logger: logger,
procState: procState,
done: false,
replica: replica,
logBuffer: procLog,
shellConfig: shellConfig,
procStateChan: make(chan string, 1),
@ -107,7 +97,7 @@ func (p *Process) run() int {
}
p.startTime = time.Now()
p.procState.Pid = p.command.Process.Pid
p.procState.Pid = p.command.Pid()
log.Info().Msgf("%s started", p.getName())
p.startProbes()
@ -118,7 +108,7 @@ func (p *Process) run() int {
time.Sleep(50 * time.Millisecond)
_ = p.command.Wait()
p.Lock()
p.procState.ExitCode = p.command.ProcessState.ExitCode()
p.procState.ExitCode = p.command.ExitCode()
p.Unlock()
log.Info().Msgf("%s exited with status %d", p.getName(), p.procState.ExitCode)
@ -133,7 +123,7 @@ func (p *Process) run() int {
p.setState(types.ProcessStateRestarting)
p.procState.Restarts += 1
log.Info().Msgf("Restarting %s in %v second(s)... Restarts: %d",
p.procConf.Name, p.getBackoff().Seconds(), p.procState.Restarts)
p.getName(), p.getBackoff().Seconds(), p.procState.Restarts)
time.Sleep(p.getBackoff())
}
@ -144,9 +134,9 @@ func (p *Process) run() int {
func (p *Process) getProcessStarter() func() error {
return func() error {
p.command = command.BuildCommandShellArg(p.shellConfig, p.getCommand())
p.command.Env = p.getProcessEnvironment()
p.command.Dir = p.procConf.WorkingDir
p.setProcArgs()
p.command.SetEnv(p.getProcessEnvironment())
p.command.SetDir(p.procConf.WorkingDir)
p.command.SetCmdArgs()
stdout, _ := p.command.StdoutPipe()
stderr, _ := p.command.StderrPipe()
go p.handleOutput(stdout, p.handleInfo)
@ -168,7 +158,7 @@ func (p *Process) getBackoff() time.Duration {
func (p *Process) getProcessEnvironment() []string {
env := []string{
"PC_PROC_NAME=" + p.getName(),
"PC_REPLICA_NUM=" + strconv.Itoa(p.replica),
"PC_REPLICA_NUM=" + strconv.Itoa(p.procConf.ReplicaNum),
}
env = append(env, os.Environ()...)
env = append(env, p.globalEnv...)
@ -235,6 +225,12 @@ func (p *Process) wontRun() {
}
// perform graceful process shutdown if defined in configuration
func (p *Process) shutDownNoRestart() error {
p.prepareForShutDown()
return p.shutDown()
}
// perform graceful process shutdown if defined in configuration
func (p *Process) shutDown() error {
if !p.isRunning() {
@ -249,7 +245,7 @@ func (p *Process) shutDown() error {
return p.doConfiguredStop(p.procConf.ShutDownParams)
}
return p.stop(p.procConf.ShutDownParams.Signal, p.procConf.ShutDownParams.ParentOnly)
return p.command.Stop(p.procConf.ShutDownParams.Signal, p.procConf.ShutDownParams.ParentOnly)
}
func (p *Process) doConfiguredStop(params types.ShutDownParams) error {
@ -268,7 +264,7 @@ func (p *Process) doConfiguredStop(params types.ShutDownParams) error {
if err := cmd.Run(); err != nil {
// the process termination timedout and it will be killed
log.Error().Msgf("terminating %s with timeout %d failed - %s", p.getName(), timeout, err.Error())
return p.stop(int(syscall.SIGKILL), false)
return p.command.Stop(int(syscall.SIGKILL), false)
}
return nil
}
@ -278,13 +274,13 @@ func (p *Process) isRunning() bool {
}
func (p *Process) prepareForShutDown() {
// prevent restart during global shutdown
// prevent restart during global shutdown or scale down
p.procConf.RestartPolicy.Restart = types.RestartPolicyNo
}
func (p *Process) onProcessStart() {
if isStringDefined(p.procConf.LogLocation) {
p.logger.Open(p.procConf.LogLocation)
p.logger.Open(p.getLogPath())
}
}
@ -301,12 +297,32 @@ func (p *Process) onProcessEnd(state string) {
p.procCond.Broadcast()
}
func (p *Process) getName() string {
return p.procConf.Name
func (p *Process) getLogPath() string {
logLocation := p.procConf.LogLocation
if strings.Contains(logLocation, "{PC_LOG}") {
replicaStr := strconv.Itoa(p.procConf.ReplicaNum)
logLocation = strings.Replace(logLocation, "{PC_LOG}", replicaStr, -1)
} else if p.procConf.Replicas > 1 {
logLocation = fmt.Sprintf("%s.%d", logLocation, p.procConf.ReplicaNum)
}
return logLocation
}
func (p *Process) getNameWithReplica() string {
return fmt.Sprintf("%s_%d", p.procConf.Name, p.replica)
func (p *Process) getName() string {
return p.procConf.ReplicaName
}
func (p *Process) setName(replicaName string) {
p.procConf.ReplicaName = replicaName
}
func (p *Process) getNameWithSmartReplica() string {
if p.procConf.Replicas > 1 {
return p.getName()
}
return p.procConf.Name
}
func (p *Process) getCommand() string {
@ -319,6 +335,7 @@ func (p *Process) updateProcState() {
p.procState.SystemTime = durationToString(dur)
p.procState.Age = dur
p.procState.IsRunning = true
p.procState.Name = p.getName()
}
}
@ -336,21 +353,25 @@ func (p *Process) handleInput(pipe io.WriteCloser) {
func (p *Process) handleOutput(pipe io.ReadCloser, handler func(message string)) {
outscanner := bufio.NewScanner(pipe)
//outscanner.Buffer(make([]byte, 0, 1024), 1024*1024*10)
outscanner.Split(bufio.ScanLines)
for outscanner.Scan() {
handler(outscanner.Text())
}
if err := outscanner.Err(); err != nil {
log.Error().Msgf("error reading from stdout - %s", err.Error())
}
}
func (p *Process) handleInfo(message string) {
p.logger.Info(message, p.getName(), p.replica)
fmt.Printf("[%s\t] %s\n", p.procColor(p.getNameWithReplica()), message)
p.logger.Info(message, p.getName(), p.procConf.ReplicaNum)
fmt.Printf("[%s\t] %s\n", p.procColor(p.getName()), message)
p.logBuffer.Write(message)
}
func (p *Process) handleError(message string) {
p.logger.Error(message, p.getName(), p.replica)
fmt.Printf("[%s\t] %s\n", p.procColor(p.getNameWithReplica()), p.redColor(message))
p.logger.Error(message, p.getName(), p.procConf.ReplicaNum)
fmt.Printf("[%s\t] %s\n", p.procColor(p.getName()), p.redColor(message))
p.logBuffer.Write(message)
}
@ -376,7 +397,13 @@ func (p *Process) setState(state string) {
defer p.stateMtx.Unlock()
p.procState.Status = state
p.onStateChange(state)
}
func (p *Process) getState() *types.ProcessState {
p.updateProcState()
p.stateMtx.Lock()
defer p.stateMtx.Unlock()
return p.procState
}
func (p *Process) setStateAndRun(state string, runnable func() error) error {

View File

@ -24,4 +24,5 @@ type IProject interface {
StopProcess(name string) error
StartProcess(name string) error
RestartProcess(name string) error
ScaleProcess(name string, scale int) error
}

View File

@ -14,11 +14,14 @@ import (
//var PROJ *ProjectRunner
type ProjectRunner struct {
procConfMutex sync.Mutex
project *types.Project
runningProcesses map[string]*Process
processStates map[string]*types.ProcessState
logsMutex sync.Mutex
processLogs map[string]*pclog.ProcessLogBuffer
mapMutex sync.Mutex
statesMutex sync.Mutex
processStates map[string]*types.ProcessState
runProcMutex sync.Mutex
runningProcesses map[string]*Process
logger pclog.PcLogger
waitGroup sync.WaitGroup
exitCode int
@ -46,7 +49,7 @@ func (p *ProjectRunner) Run() int {
})
var nameOrder []string
for _, v := range runOrder {
nameOrder = append(nameOrder, v.Name)
nameOrder = append(nameOrder, v.ReplicaName)
}
p.logger = pclog.NewNilLogger()
if isStringDefined(p.project.LogLocation) {
@ -57,30 +60,31 @@ func (p *ProjectRunner) Run() int {
//zerolog.SetGlobalLevel(zerolog.PanicLevel)
log.Debug().Msgf("Spinning up %d processes. Order: %q", len(runOrder), nameOrder)
for _, proc := range runOrder {
p.runProcess(proc)
newConf := proc
p.runProcess(&newConf)
}
p.waitGroup.Wait()
log.Info().Msg("Project completed")
return p.exitCode
}
func (p *ProjectRunner) runProcess(proc types.ProcessConfig) {
func (p *ProjectRunner) runProcess(config *types.ProcessConfig) {
procLogger := p.logger
if isStringDefined(proc.LogLocation) {
if isStringDefined(config.LogLocation) {
procLogger = pclog.NewLogger()
}
procLog, err := p.getProcessLog(proc.Name)
procLog, err := p.getProcessLog(config.ReplicaName)
if err != nil {
// we shouldn't get here
log.Error().Msgf("Error: Can't get log: %s using empty buffer", err.Error())
procLog = pclog.NewLogBuffer(0)
}
procState, _ := p.GetProcessState(proc.Name)
process := NewProcess(p.project.Environment, procLogger, proc, procState, procLog, 1, *p.project.ShellConfig)
procState, _ := p.GetProcessState(config.ReplicaName)
process := NewProcess(p.project.Environment, procLogger, config, procState, procLog, *p.project.ShellConfig)
p.addRunningProcess(process)
p.waitGroup.Add(1)
go func() {
defer p.removeRunningProcess(process.getName())
defer p.removeRunningProcess(process)
defer p.waitGroup.Done()
if err := p.waitIfNeeded(process.procConf); err != nil {
log.Error().Msgf("Error: %s", err.Error())
@ -93,7 +97,7 @@ func (p *ProjectRunner) runProcess(proc types.ProcessConfig) {
}()
}
func (p *ProjectRunner) waitIfNeeded(process types.ProcessConfig) error {
func (p *ProjectRunner) waitIfNeeded(process *types.ProcessConfig) error {
for k := range process.DependsOn {
if runningProc := p.getRunningProcess(k); runningProc != nil {
@ -101,17 +105,17 @@ func (p *ProjectRunner) waitIfNeeded(process types.ProcessConfig) error {
case types.ProcessConditionCompleted:
runningProc.waitForCompletion()
case types.ProcessConditionCompletedSuccessfully:
log.Info().Msgf("%s is waiting for %s to complete successfully", process.Name, k)
log.Info().Msgf("%s is waiting for %s to complete successfully", process.ReplicaName, k)
exitCode := runningProc.waitForCompletion()
if exitCode != 0 {
return fmt.Errorf("process %s depended on %s to complete successfully, but it exited with status %d",
process.Name, k, exitCode)
process.ReplicaName, k, exitCode)
}
case types.ProcessConditionHealthy:
log.Info().Msgf("%s is waiting for %s to be healthy", process.Name, k)
log.Info().Msgf("%s is waiting for %s to be healthy", process.ReplicaName, k)
ready := runningProc.waitUntilReady()
if !ready {
return fmt.Errorf("process %s depended on %s to become ready, but it was terminated", process.Name, k)
return fmt.Errorf("process %s depended on %s to become ready, but it was terminated", process.ReplicaName, k)
}
}
@ -120,7 +124,7 @@ func (p *ProjectRunner) waitIfNeeded(process types.ProcessConfig) error {
return nil
}
func (p *ProjectRunner) onProcessEnd(exitCode int, procConf types.ProcessConfig) {
func (p *ProjectRunner) onProcessEnd(exitCode int, procConf *types.ProcessConfig) {
if exitCode != 0 && procConf.RestartPolicy.Restart == types.RestartPolicyExitOnFailure {
p.ShutDownProject()
p.exitCode = exitCode
@ -128,83 +132,75 @@ func (p *ProjectRunner) onProcessEnd(exitCode int, procConf types.ProcessConfig)
}
func (p *ProjectRunner) initProcessStates() {
p.statesMutex.Lock()
defer p.statesMutex.Unlock()
p.processStates = make(map[string]*types.ProcessState)
for key, proc := range p.project.Processes {
p.processStates[key] = &types.ProcessState{
Name: key,
Namespace: proc.Namespace,
Status: types.ProcessStatePending,
SystemTime: "",
Health: types.ProcessHealthUnknown,
Restarts: 0,
ExitCode: 0,
Pid: 0,
}
if proc.Disabled {
p.processStates[key].Status = types.ProcessStateDisabled
}
for name, proc := range p.project.Processes {
p.processStates[name] = types.NewProcessState(&proc)
}
}
func (p *ProjectRunner) initProcessLogs() {
p.processLogs = make(map[string]*pclog.ProcessLogBuffer)
for key := range p.project.Processes {
p.processLogs[key] = pclog.NewLogBuffer(p.project.LogLength)
for _, proc := range p.project.Processes {
p.initProcessLog(proc.ReplicaName)
}
}
func (p *ProjectRunner) GetProcessState(name string) (*types.ProcessState, error) {
if procState, ok := p.processStates[name]; ok {
proc := p.getRunningProcess(name)
if proc != nil {
proc.updateProcState()
} else {
procState.Pid = 0
procState.SystemTime = ""
procState.Age = time.Duration(0)
procState.Health = types.ProcessHealthUnknown
procState.IsRunning = false
}
return procState, nil
}
func (p *ProjectRunner) initProcessLog(name string) {
p.processLogs[name] = pclog.NewLogBuffer(p.project.LogLength)
}
log.Error().Msgf("Error: process %s doesn't exist", name)
return nil, fmt.Errorf("no such process: %s", name)
func (p *ProjectRunner) GetProcessState(name string) (*types.ProcessState, error) {
proc := p.getRunningProcess(name)
if proc != nil {
return proc.getState(), nil
} else {
p.statesMutex.Lock()
defer p.statesMutex.Unlock()
state, ok := p.processStates[name]
if !ok {
log.Error().Msgf("Error: process %s doesn't exist", name)
return nil, fmt.Errorf("can't get state of process %s: no such process", name)
}
return state, nil
}
}
func (p *ProjectRunner) GetProcessesState() (*types.ProcessesState, error) {
states := &types.ProcessesState{
States: make([]types.ProcessState, 0),
}
for name, _ := range p.processStates {
for name, _ := range p.project.Processes {
state, err := p.GetProcessState(name)
if err != nil {
continue
return nil, err
}
states.States = append(states.States, *state)
}
return states, nil
}
func (p *ProjectRunner) addRunningProcess(process *Process) {
p.mapMutex.Lock()
p.runProcMutex.Lock()
p.runningProcesses[process.getName()] = process
p.mapMutex.Unlock()
p.runProcMutex.Unlock()
}
func (p *ProjectRunner) getRunningProcess(name string) *Process {
p.mapMutex.Lock()
defer p.mapMutex.Unlock()
p.runProcMutex.Lock()
defer p.runProcMutex.Unlock()
if runningProc, ok := p.runningProcesses[name]; ok {
return runningProc
}
return nil
}
func (p *ProjectRunner) removeRunningProcess(name string) {
p.mapMutex.Lock()
delete(p.runningProcesses, name)
p.mapMutex.Unlock()
func (p *ProjectRunner) removeRunningProcess(process *Process) {
p.runProcMutex.Lock()
delete(p.runningProcesses, process.getName())
p.runProcMutex.Unlock()
}
func (p *ProjectRunner) StartProcess(name string) error {
@ -214,8 +210,7 @@ func (p *ProjectRunner) StartProcess(name string) error {
return fmt.Errorf("process %s is already running", name)
}
if processConfig, ok := p.project.Processes[name]; ok {
processConfig.Name = name
p.runProcess(processConfig)
p.runProcess(&processConfig)
} else {
return fmt.Errorf("no such process: %s", name)
}
@ -224,12 +219,16 @@ func (p *ProjectRunner) StartProcess(name string) error {
}
func (p *ProjectRunner) StopProcess(name string) error {
log.Info().Msgf("Stopping %s", name)
proc := p.getRunningProcess(name)
if proc == nil {
log.Error().Msgf("Process %s is not running", name)
return fmt.Errorf("process %s is not running", name)
}
_ = proc.shutDown()
err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to stop process %s", name)
}
return nil
}
@ -244,8 +243,7 @@ func (p *ProjectRunner) RestartProcess(name string) error {
}
if processConfig, ok := p.project.Processes[name]; ok {
processConfig.Name = name
p.runProcess(processConfig)
p.runProcess(&processConfig)
} else {
return fmt.Errorf("no such process: %s", name)
}
@ -253,8 +251,9 @@ func (p *ProjectRunner) RestartProcess(name string) error {
}
func (p *ProjectRunner) GetProcessInfo(name string) (*types.ProcessConfig, error) {
p.runProcMutex.Lock()
defer p.runProcMutex.Unlock()
if processConfig, ok := p.project.Processes[name]; ok {
processConfig.Name = name
return &processConfig, nil
} else {
return nil, fmt.Errorf("no such process: %s", name)
@ -262,8 +261,8 @@ func (p *ProjectRunner) GetProcessInfo(name string) (*types.ProcessConfig, error
}
func (p *ProjectRunner) ShutDownProject() {
p.mapMutex.Lock()
defer p.mapMutex.Unlock()
p.runProcMutex.Lock()
defer p.runProcMutex.Unlock()
runProc := p.runningProcesses
for _, proc := range runProc {
proc.prepareForShutDown()
@ -300,7 +299,7 @@ func (p *ProjectRunner) getProcessLog(name string) (*pclog.ProcessLogBuffer, err
if procLogs, ok := p.processLogs[name]; ok {
return procLogs, nil
}
log.Error().Msgf("Error: process %s doesn't exist", name)
log.Error().Msgf("process %s doesn't exist", name)
return nil, fmt.Errorf("process %s doesn't exist", name)
}
@ -331,6 +330,7 @@ func (p *ProjectRunner) GetProcessLogLength(name string) int {
func (p *ProjectRunner) GetLogsAndSubscribe(name string, observer pclog.LogObserver) error {
logs, err := p.getProcessLog(name)
if err != nil {
log.Err(err).Msgf("can't subscribe to process %s", name)
return err
}
logs.GetLogsAndSubscribe(observer)
@ -346,13 +346,152 @@ func (p *ProjectRunner) UnSubscribeLogger(name string, observer pclog.LogObserve
return nil
}
func (p *ProjectRunner) ScaleProcess(name string, scale int) error {
if scale < 1 {
err := fmt.Errorf("cannot scale process %s to a negative or zero value %d", name, scale)
log.Err(err).Msg("scale failed")
return err
}
if processConfig, ok := p.project.Processes[name]; ok {
scaleDelta := scale - processConfig.Replicas
if scaleDelta < 0 {
log.Info().Msgf("scaling down %s by %d", name, scaleDelta*-1)
p.scaleDownProcess(processConfig.Name, scale)
} else if scaleDelta > 0 {
log.Info().Msgf("scaling up %s by %d", name, scaleDelta)
p.scaleUpProcess(processConfig, scaleDelta, scale)
} else {
log.Info().Msgf("no change in scale of %s", name)
return nil
}
p.updateReplicaCount(processConfig.Name, scale)
} else {
return fmt.Errorf("no such process: %s", name)
}
return nil
}
func (p *ProjectRunner) scaleUpProcess(proc types.ProcessConfig, toAdd, scale int) {
origScale := proc.Replicas
for i := 0; i < toAdd; i++ {
proc.ReplicaNum = origScale + i
proc.Replicas = scale
proc.ReplicaName = proc.CalculateReplicaName()
p.addProcessAndRun(proc)
}
}
func (p *ProjectRunner) scaleDownProcess(name string, scale int) {
toRemove := []string{}
p.procConfMutex.Lock()
for _, proc := range p.project.Processes {
if proc.Name == name {
if proc.ReplicaNum >= scale {
toRemove = append(toRemove, proc.ReplicaName)
} else {
proc.Replicas = scale
p.project.Processes[proc.ReplicaName] = proc
}
}
}
p.procConfMutex.Unlock()
wg := sync.WaitGroup{}
for _, name := range toRemove {
wg.Add(1)
go func(name string) {
defer wg.Done()
if err := p.removeProcess(name); err != nil {
log.Err(err).Msgf("failed to scale down process %s", name)
}
}(name)
}
wg.Wait()
}
func (p *ProjectRunner) updateReplicaCount(name string, scale int) {
for _, proc := range p.project.Processes {
if proc.Name == name {
proc.Replicas = scale
p.project.Processes[proc.ReplicaName] = proc
if proc.ReplicaName != proc.CalculateReplicaName() {
p.renameProcess(proc.ReplicaName, proc.CalculateReplicaName())
}
}
}
}
func (p *ProjectRunner) renameProcess(name string, newName string) {
process := p.getRunningProcess(name)
if process != nil {
p.removeRunningProcess(process)
process.setName(newName)
p.addRunningProcess(process)
}
logs := p.removeProcessLogs(name)
if logs != nil {
p.processLogs[newName] = logs
}
state, err := p.GetProcessState(name)
if err == nil {
p.statesMutex.Lock()
defer p.statesMutex.Unlock()
delete(p.processStates, name)
state.Name = newName
p.processStates[newName] = state
}
config, ok := p.project.Processes[name]
if ok {
delete(p.project.Processes, name)
config.ReplicaName = newName
p.project.Processes[newName] = config
}
}
func (p *ProjectRunner) removeProcessLogs(name string) *pclog.ProcessLogBuffer {
p.logsMutex.Lock()
defer p.logsMutex.Unlock()
logs, ok := p.processLogs[name]
if ok {
logs.Close()
delete(p.processLogs, name)
}
return logs
}
func (p *ProjectRunner) removeProcess(name string) error {
p.removeProcessLogs(name)
p.procConfMutex.Lock()
delete(p.project.Processes, name)
p.procConfMutex.Unlock()
running := p.getRunningProcess(name)
if running != nil {
err := running.shutDownNoRestart()
if err != nil {
log.Err(err).Msgf("failed to remove process %s", name)
return err
} else {
running.waitForCompletion()
}
}
return nil
}
func (p *ProjectRunner) addProcessAndRun(proc types.ProcessConfig) {
p.statesMutex.Lock()
p.processStates[proc.ReplicaName] = types.NewProcessState(&proc)
p.statesMutex.Unlock()
p.project.Processes[proc.ReplicaName] = proc
p.initProcessLog(proc.ReplicaName)
p.runProcess(&proc)
}
func (p *ProjectRunner) selectRunningProcesses(procList []string) error {
if len(procList) == 0 {
return nil
}
newProcMap := types.Processes{}
err := p.project.WithProcesses(procList, func(process types.ProcessConfig) error {
newProcMap[process.Name] = process
newProcMap[process.ReplicaName] = process
return nil
})
if err != nil {
@ -413,3 +552,14 @@ func NewProjectRunner(project *types.Project, processesToRun []string, noDeps bo
runner.init()
return runner, nil
}
//func getProcessName(process *Process) string {
// return process.getNameWithSmartReplica()
//}
//
//func getProcessNameFromConf(process types.ProcessConfig, replica int) string {
// if process.Replicas > 1 {
// return fmt.Sprintf("%s-%d", process.Name, replica)
// }
// return process.Name
//}

View File

@ -17,7 +17,7 @@ func TestProject_GetDependenciesOrderNames(t *testing.T) {
tests := []struct {
name string
fields fields
want []string
want [][]string
wantErr bool
}{
{
@ -25,29 +25,35 @@ func TestProject_GetDependenciesOrderNames(t *testing.T) {
fields: fields{
Processes: map[string]types.ProcessConfig{
"Process1": {
Name: "Process1",
Name: "Process1",
ReplicaName: "Process1",
DependsOn: types.DependsOnConfig{
"Process2": {},
},
},
"Process2": {
Name: "Process2",
Name: "Process2",
ReplicaName: "Process2",
DependsOn: types.DependsOnConfig{
"Process3": {},
},
},
"Process3": {
Name: "Process3",
Name: "Process3",
ReplicaName: "Process3",
DependsOn: types.DependsOnConfig{
"Process4": {},
},
},
"Process4": {
Name: "Process4",
Name: "Process4",
ReplicaName: "Process4",
},
},
},
want: []string{"Process4", "Process3", "Process2", "Process1"},
want: [][]string{
{"Process4", "Process3", "Process2", "Process1"},
},
wantErr: false,
},
{
@ -55,20 +61,22 @@ func TestProject_GetDependenciesOrderNames(t *testing.T) {
fields: fields{
Processes: map[string]types.ProcessConfig{
"Process1": {
Name: "Process1",
Name: "Process1",
ReplicaName: "Process1",
DependsOn: types.DependsOnConfig{
"Process2": {},
},
},
"Process2": {
Name: "Process2",
Name: "Process2",
ReplicaName: "Process2",
DependsOn: types.DependsOnConfig{
"Process4": {},
},
},
},
},
want: []string{},
want: [][]string{},
wantErr: true,
},
{
@ -76,7 +84,8 @@ func TestProject_GetDependenciesOrderNames(t *testing.T) {
fields: fields{
Processes: map[string]types.ProcessConfig{
"Process1": {
Name: "Process1",
Name: "Process1",
ReplicaName: "Process1",
DependsOn: types.DependsOnConfig{
"Process2": {},
},
@ -87,7 +96,7 @@ func TestProject_GetDependenciesOrderNames(t *testing.T) {
},
},
},
want: []string{"Process1"},
want: [][]string{{"Process1"}},
wantErr: false,
},
{
@ -102,14 +111,118 @@ func TestProject_GetDependenciesOrderNames(t *testing.T) {
},
},
"Process2": {
Name: "Process2",
Name: "Process2",
ReplicaName: "Process2",
},
},
},
want: []string{"Process2"},
want: [][]string{{"Process2"}},
wantErr: false,
},
{
name: "WithReplicaDependees",
fields: fields{
Processes: map[string]types.ProcessConfig{
"Process1": {
Name: "Process1",
ReplicaName: "Process1",
DependsOn: types.DependsOnConfig{
"Process2": {},
},
},
"Process2-0": {
Name: "Process2",
ReplicaName: "Process2-0",
Replicas: 2,
},
"Process2-1": {
Name: "Process2",
ReplicaName: "Process2-1",
Replicas: 2,
},
},
},
want: [][]string{
{"Process2-0", "Process2-1", "Process1"},
{"Process2-1", "Process2-0", "Process1"},
},
wantErr: false,
},
{
name: "WithReplicas",
fields: fields{
Processes: map[string]types.ProcessConfig{
"Process1": {
Name: "Process1",
ReplicaName: "Process1",
},
"Process2-0": {
Name: "Process2",
ReplicaName: "Process2-0",
Replicas: 2,
DependsOn: types.DependsOnConfig{
"Process1": {},
},
},
"Process2-1": {
Name: "Process2",
ReplicaName: "Process2-1",
Replicas: 2,
DependsOn: types.DependsOnConfig{
"Process1": {},
},
},
},
},
want: [][]string{
{"Process1", "Process2-1", "Process2-0"},
{"Process1", "Process2-0", "Process2-1"},
},
wantErr: false,
},
{
name: "WithReplicasBoth",
fields: fields{
Processes: map[string]types.ProcessConfig{
"Process1-0": {
Name: "Process1",
ReplicaName: "Process1-0",
Replicas: 2,
},
"Process1-1": {
Name: "Process1",
ReplicaName: "Process1-1",
Replicas: 2,
},
"Process2-0": {
Name: "Process2",
ReplicaName: "Process2-0",
Replicas: 2,
DependsOn: types.DependsOnConfig{
"Process1": {},
},
},
"Process2-1": {
Name: "Process2",
ReplicaName: "Process2-1",
Replicas: 2,
DependsOn: types.DependsOnConfig{
"Process1": {},
},
},
},
},
want: [][]string{
{"Process1-0", "Process1-1", "Process2-0", "Process2-1"},
{"Process1-0", "Process1-1", "Process2-1", "Process2-0"},
{"Process1-1", "Process1-0", "Process2-0", "Process2-1"},
{"Process1-1", "Process1-0", "Process2-1", "Process2-0"},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &types.Project{
@ -123,8 +236,18 @@ func TestProject_GetDependenciesOrderNames(t *testing.T) {
t.Errorf("Project.GetDependenciesOrderNames() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Project.GetDependenciesOrderNames() = %v, want %v", got, tt.want)
//if !reflect.DeepEqual(got, tt.want) && (tt.wantOr != nil && !reflect.DeepEqual(got, tt.wantOr)) {
// t.Errorf("Project.GetDependenciesOrderNames() = %v, want %v", got, tt.want)
//}
found := false
for _, want := range tt.want {
if reflect.DeepEqual(got, want) {
found = true
break
}
}
if !found && !tt.wantErr {
t.Errorf("Project.GetDependenciesOrderNames() = %v, want one of %v", got, tt.want)
}
})
}

View File

@ -97,6 +97,10 @@ func (p *PcClient) RestartProcess(name string) error {
return RestartProcesses(p.address, p.port, name)
}
func (p *PcClient) ScaleProcess(name string, scale int) error {
return ScaleProcess(p.address, p.port, name, scale)
}
func (p *PcClient) IsAlive() error {
return p.logError(isAlive(p.address, p.port))
}

View File

@ -0,0 +1,31 @@
package client
import (
"encoding/json"
"fmt"
"github.com/rs/zerolog/log"
"net/http"
)
func ScaleProcess(address string, port int, name string, scale int) error {
url := fmt.Sprintf("http://%s:%d/process/scale/%s/%d", address, port, name, scale)
client := &http.Client{}
req, err := http.NewRequest(http.MethodPatch, url, nil)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
return err
}
if resp.StatusCode == http.StatusOK {
return nil
}
defer resp.Body.Close()
var respErr pcError
if err = json.NewDecoder(resp.Body).Decode(&respErr); err != nil {
log.Error().Msgf("failed to decode scale process %s response: %v", name, err)
return err
}
return fmt.Errorf(respErr.Error)
}

49
src/cmd/scale.go Normal file
View File

@ -0,0 +1,49 @@
/*
Copyright © 2023 NAME HERE <EMAIL ADDRESS>
*/
package cmd
import (
"fmt"
"github.com/f1bonacc1/process-compose/src/client"
"github.com/rs/zerolog/log"
"strconv"
"github.com/spf13/cobra"
)
// scaleCmd represents the scale command
var scaleCmd = &cobra.Command{
Use: "scale [PROCESS] [COUNT]",
Short: "Scale a process to a given count",
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
name := args[0]
count, err := strconv.Atoi(args[1])
if err != nil {
log.Error().Msgf("second argument must be an integer: %v", err)
return
}
err = client.ScaleProcess(pcAddress, port, name, count)
if err != nil {
log.Error().Msgf("Failed to scale processes %s: %v", name, err)
fmt.Println(err.Error())
return
}
log.Info().Msgf("Process %s scaled to %s", name, args[1])
},
}
func init() {
processCmd.AddCommand(scaleCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// scaleCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// scaleCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}

View File

@ -12,8 +12,11 @@ func BuildCommand(shellCmd string) *exec.Cmd {
return exec.Command(getRunnerShell(), getRunnerArg(), shellCmd)
}
func BuildCommandShellArg(shell ShellConfig, cmd string) *exec.Cmd {
return exec.Command(shell.ShellCommand, shell.ShellArgument, cmd)
func BuildCommandShellArg(shell ShellConfig, cmd string) *CmdWrapper {
return &CmdWrapper{
Cmd: exec.Command(shell.ShellCommand, shell.ShellArgument, cmd),
}
//return NewMockCommand()
}
func BuildCommandContext(ctx context.Context, shellCmd string) *exec.Cmd {

View File

@ -0,0 +1,42 @@
package command
import (
"io"
"os/exec"
)
type CmdWrapper struct {
Cmd *exec.Cmd
}
func (c *CmdWrapper) Start() error {
return c.Cmd.Start()
}
func (c *CmdWrapper) Wait() error {
return c.Cmd.Wait()
}
func (c *CmdWrapper) ExitCode() int {
return c.Cmd.ProcessState.ExitCode()
}
func (c *CmdWrapper) Pid() int {
return c.Cmd.Process.Pid
}
func (c *CmdWrapper) StdoutPipe() (io.ReadCloser, error) {
return c.Cmd.StdoutPipe()
}
func (c *CmdWrapper) StderrPipe() (io.ReadCloser, error) {
return c.Cmd.StderrPipe()
}
func (c *CmdWrapper) SetEnv(env []string) {
c.Cmd.Env = env
}
func (c *CmdWrapper) SetDir(dir string) {
c.Cmd.Dir = dir
}

View File

@ -0,0 +1,68 @@
package command
import (
"context"
"io"
)
type MockCommand struct {
dir string
env []string
cancel context.CancelFunc
stopChan chan struct{}
infoNoiseMaker *noiseMaker
errNoiseMaker *noiseMaker
}
func NewMockCommand() *MockCommand {
return &MockCommand{
stopChan: make(chan struct{}),
infoNoiseMaker: newNoiseMaker("info noise"),
errNoiseMaker: newNoiseMaker("error noise"),
}
}
func (c *MockCommand) Start() error {
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
go c.infoNoiseMaker.Run(ctx)
return nil
}
func (c *MockCommand) Stop(i int) error {
c.stopChan <- struct{}{}
c.cancel()
return nil
}
func (c *MockCommand) SetCmdArgs() {
}
func (c *MockCommand) Wait() error {
<-c.stopChan
return nil
}
func (c *MockCommand) ExitCode() int {
return 0
}
func (c *MockCommand) Pid() int {
return 123456
}
func (c *MockCommand) StdoutPipe() (io.ReadCloser, error) {
return c.infoNoiseMaker, nil
}
func (c *MockCommand) StderrPipe() (io.ReadCloser, error) {
return c.errNoiseMaker, nil
}
func (c *MockCommand) SetEnv(env []string) {
c.env = env
}
func (c *MockCommand) SetDir(dir string) {
c.dir = dir
}

View File

@ -0,0 +1,48 @@
package command
import (
"context"
"io"
"time"
)
type noiseMaker struct {
ticker *time.Ticker
noiseChan chan []byte
noiseData []byte
}
func newNoiseMaker(noiseData string) *noiseMaker {
return &noiseMaker{
ticker: time.NewTicker(time.Second),
noiseChan: make(chan []byte, 10),
noiseData: []byte(noiseData),
}
}
func (n *noiseMaker) Run(ctx context.Context) {
for {
select {
case t := <-n.ticker.C:
data := append(n.noiseData, " "+t.String()+"\n"...)
n.noiseChan <- data
case <-ctx.Done():
break
}
}
}
func (n *noiseMaker) Read(p []byte) (size int, err error) {
data, ok := <-n.noiseChan
if !ok {
return 0, io.EOF
}
copy(p, data)
return len(p), nil
}
func (n *noiseMaker) Close() error {
n.ticker.Stop()
close(n.noiseChan)
return nil
}

View File

@ -1,6 +1,6 @@
//go:build !windows
package app
package command
import (
"github.com/rs/zerolog/log"
@ -12,8 +12,8 @@ const (
max_sig = 31
)
func (p *Process) stop(sig int, parentOnly bool) error {
if p.command == nil {
func (c *CmdWrapper) Stop(sig int, parentOnly bool) error {
if c.cmd == nil {
return nil
}
if sig < min_sig || sig > max_sig {
@ -22,16 +22,16 @@ func (p *Process) stop(sig int, parentOnly bool) error {
log.
Debug().
Int("pid", p.command.Process.Pid).
Int("pid", c.Pid()).
Int("signal", sig).
Bool("parentOnly", parentOnly).
Msg("Stop Unix process.")
if parentOnly {
return syscall.Kill(p.command.Process.Pid, syscall.Signal(sig))
return syscall.Kill(c.Pid(), syscall.Signal(sig))
}
pgid, err := syscall.Getpgid(p.command.Process.Pid)
pgid, err := syscall.Getpgid(c.Pid())
if err == nil {
return syscall.Kill(-pgid, syscall.Signal(sig))
}
@ -39,6 +39,6 @@ func (p *Process) stop(sig int, parentOnly bool) error {
return err
}
func (p *Process) setProcArgs() {
p.command.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
func (c *CmdWrapper) SetCmdArgs() {
c.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
}

View File

@ -1,16 +1,16 @@
package app
package command
import (
"os/exec"
"strconv"
)
func (p *Process) stop(sig int, _parentOnly bool) error {
func (c *CmdWrapper) Stop(sig int, _parentOnly bool) error {
//p.command.Process.Kill()
kill := exec.Command("TASKKILL", "/T", "/F", "/PID", strconv.Itoa(p.command.Process.Pid))
kill := exec.Command("TASKKILL", "/T", "/F", "/PID", strconv.Itoa(c.Pid()))
return kill.Run()
}
func (p *Process) setProcArgs() {
func (c *CmdWrapper) SetCmdArgs() {
//empty for windows
}

View File

@ -151,6 +151,35 @@ const docTemplate = `{
}
}
},
"/process/scale/{name}/{scale}": {
"post": {
"description": "Scale a process",
"produces": [
"application/json"
],
"tags": [
"Process"
],
"summary": "Scale a process to a given replicas count",
"parameters": [
{
"type": "string",
"description": "Process Name",
"name": "name",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "Scaled Process Name",
"schema": {
"type": "string"
}
}
}
}
},
"/process/start/{name}": {
"post": {
"description": "Starts the process if the state is not 'running' or 'pending'",

View File

@ -139,6 +139,35 @@
}
}
},
"/process/scale/{name}/{scale}": {
"post": {
"description": "Scale a process",
"produces": [
"application/json"
],
"tags": [
"Process"
],
"summary": "Scale a process to a given replicas count",
"parameters": [
{
"type": "string",
"description": "Process Name",
"name": "name",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "Scaled Process Name",
"schema": {
"type": "string"
}
}
}
}
},
"/process/start/{name}": {
"post": {
"description": "Starts the process if the state is not 'running' or 'pending'",

View File

@ -109,6 +109,25 @@ paths:
summary: Restart a process
tags:
- Process
/process/scale/{name}/{scale}:
post:
description: Scale a process
parameters:
- description: Process Name
in: path
name: name
required: true
type: string
produces:
- application/json
responses:
"200":
description: Scaled Process Name
schema:
type: string
summary: Scale a process to a given replicas count
tags:
- Process
/process/start/{name}:
post:
description: Starts the process if the state is not 'running' or 'pending'

View File

@ -26,7 +26,9 @@ func Load(opts *LoaderOptions) (*types.Project, error) {
p := mustLoadProjectFromFile(file)
opts.projects = append(opts.projects, p)
}
return merge(opts)
mergedProject, err := merge(opts)
mergedProject.ValidateAfterMerge()
return mergedProject, err
}

View File

@ -100,3 +100,7 @@ func (b *ProcessLogBuffer) UnSubscribe(observer LogObserver) {
defer b.mx.Unlock()
delete(b.observers, observer.GetUniqueID())
}
func (b *ProcessLogBuffer) Close() {
b.observers = map[string]LogObserver{}
}

View File

@ -18,6 +18,7 @@ const (
ActionWrapLog = ActionName("log_wrap")
ActionLogSelection = ActionName("log_select")
ActionProcessStart = ActionName("process_start")
ActionProcessScale = ActionName("process_scale")
ActionProcessInfo = ActionName("process_info")
ActionProcessStop = ActionName("process_stop")
ActionProcessRestart = ActionName("process_restart")
@ -34,6 +35,7 @@ var defaultShortcuts = map[ActionName]tcell.Key{
ActionFollowLog: tcell.KeyF5,
ActionWrapLog: tcell.KeyF6,
ActionLogSelection: tcell.KeyCtrlS,
ActionProcessScale: tcell.KeyF2,
ActionProcessInfo: tcell.KeyF3,
ActionProcessStart: tcell.KeyF7,
ActionProcessStop: tcell.KeyF9,
@ -178,6 +180,9 @@ func getDefaultActions() ShortCuts {
false: "Select Off",
},
},
ActionProcessScale: {
Description: "Scale",
},
ActionProcessInfo: {
Description: "Info",
},

View File

@ -4,8 +4,8 @@ import (
"github.com/rivo/tview"
)
func (pv *pcView) showDialog(primitive tview.Primitive) {
pv.pages.AddPage(PageDialog, createDialogPage(primitive, 0, 0), true, true)
func (pv *pcView) showDialog(primitive tview.Primitive, width, height int) {
pv.pages.AddPage(PageDialog, createDialogPage(primitive, width, height), true, true)
pv.appView.SetFocus(primitive)
}

View File

@ -1,6 +1,7 @@
package tui
import (
"fmt"
"github.com/f1bonacc1/process-compose/src/types"
"github.com/gdamore/tcell/v2"
"github.com/rivo/tview"
@ -21,6 +22,7 @@ func (pv *pcView) createProcInfoForm(info *types.ProcessConfig) *tview.Form {
addStringIfNotEmpty("Command:", info.Command, f)
addStringIfNotEmpty("Working Directory:", info.WorkingDir, f)
addStringIfNotEmpty("Log Location:", info.LogLocation, f)
f.AddInputField("Replica:", fmt.Sprintf("%d/%d", info.ReplicaNum+1, info.Replicas), 0, nil, nil)
addSliceIfNotEmpty("Environment:", info.Environment, f)
addSliceIfNotEmpty("Depends On:", mapKeysToSlice(info.DependsOn), f)
f.AddCheckbox("Is Disabled:", info.Disabled, nil)

View File

@ -38,6 +38,12 @@ func (pv *pcView) fillTableData() {
runningProcCount += 1
}
}
// remove unnecessary rows, don't forget the title row (-1)
if pv.procTable.GetRowCount()-1 > len(states.States) {
for i := len(states.States); i < pv.procTable.GetRowCount()-1; i++ {
pv.procTable.RemoveRow(i)
}
}
if pv.procCountCell != nil {
pv.procCountCell.SetText(fmt.Sprintf("%d/%d", runningProcCount, len(pv.procNames)))
}

View File

@ -5,11 +5,13 @@ import (
"github.com/f1bonacc1/process-compose/src/client"
"github.com/f1bonacc1/process-compose/src/config"
"github.com/f1bonacc1/process-compose/src/updater"
"github.com/gdamore/tcell/v2"
"github.com/rs/zerolog/log"
"strconv"
"sync"
"time"
"github.com/f1bonacc1/process-compose/src/app"
"github.com/gdamore/tcell/v2"
"github.com/rivo/tview"
)
@ -146,6 +148,8 @@ func (pv *pcView) onAppKey(event *tcell.EventKey) *tcell.EventKey {
pv.updateHelpTextView()
case tcell.KeyCtrlC:
pv.terminateAppView()
case pv.shortcuts.ShortCutKeys[ActionProcessScale].key:
pv.showScale()
case pv.shortcuts.ShortCutKeys[ActionProcessInfo].key:
pv.showInfo()
case pv.shortcuts.ShortCutKeys[ActionLogFind].key:
@ -203,6 +207,38 @@ func (pv *pcView) showError(errMessage string) {
pv.pages.AddPage(PageDialog, createDialogPage(m, 50, 50), true, true)
}
func (pv *pcView) showScale() {
f := tview.NewForm()
f.SetCancelFunc(func() {
pv.pages.RemovePage(PageDialog)
})
f.SetItemPadding(3)
f.SetBorder(true)
f.SetFieldBackgroundColor(tcell.ColorBlack)
f.SetFieldTextColor(tcell.ColorLightSkyBlue)
name := pv.getSelectedProcName()
f.SetTitle("Scale " + name + " Process")
f.AddInputField("Replicas:", "1", 0, nil, nil)
f.AddButton("Scale", func() {
scale, err := strconv.Atoi(f.GetFormItem(0).(*tview.InputField).GetText())
if err != nil {
pv.showError("Invalid Scale: " + err.Error())
return
}
log.Info().Msgf("Scaling %s to %d", name, scale)
err = pv.project.ScaleProcess(name, scale)
if err != nil {
pv.showError("Invalid Scale: " + err.Error())
}
pv.pages.RemovePage(PageDialog)
})
f.AddButton("Cancel", func() {
pv.pages.RemovePage(PageDialog)
})
f.SetButtonsAlign(tview.AlignCenter)
pv.showDialog(f, 60, 10)
}
func (pv *pcView) showInfo() {
name := pv.getSelectedProcName()
info, err := pv.project.GetProcessInfo(name)
@ -211,7 +247,7 @@ func (pv *pcView) showInfo() {
return
}
form := pv.createProcInfoForm(info)
pv.showDialog(form)
pv.showDialog(form, 0, 0)
}
func (pv *pcView) handleShutDown() {
@ -251,7 +287,7 @@ func (pv *pcView) getSelectedProcName() string {
return ""
}
row, _ := pv.procTable.GetSelection()
if row > 0 && row <= len(pv.procNames) {
if row > 0 {
return pv.procTable.GetCell(row, 1).Text
}
return ""
@ -282,6 +318,7 @@ func (pv *pcView) updateHelpTextView() {
pv.shortcuts.ShortCutKeys[ActionLogSelection].writeToggleButton(pv.helpText, !pv.logSelect)
pv.shortcuts.ShortCutKeys[ActionLogFind].writeButton(pv.helpText)
fmt.Fprintf(pv.helpText, "%s ", "[lightskyblue::b]PROCESS:[-:-:-]")
pv.shortcuts.ShortCutKeys[ActionProcessScale].writeButton(pv.helpText)
pv.shortcuts.ShortCutKeys[ActionProcessInfo].writeButton(pv.helpText)
pv.shortcuts.ShortCutKeys[ActionProcessStart].writeButton(pv.helpText)
pv.shortcuts.ShortCutKeys[ActionProcessScreen].writeToggleButton(pv.helpText, procScrBool)

View File

@ -1,7 +1,9 @@
package types
import (
"fmt"
"github.com/f1bonacc1/process-compose/src/health"
"math"
"time"
)
@ -24,10 +26,13 @@ type ProcessConfig struct {
DisableAnsiColors bool `yaml:"disable_ansi_colors,omitempty"`
WorkingDir string `yaml:"working_dir"`
Namespace string `yaml:"namespace"`
Replicas int `yaml:"replicas"`
Extensions map[string]interface{} `yaml:",inline"`
ReplicaNum int
ReplicaName string
}
func (p ProcessConfig) GetDependencies() []string {
func (p *ProcessConfig) GetDependencies() []string {
dependencies := make([]string, len(p.DependsOn))
i := 0
@ -38,6 +43,33 @@ func (p ProcessConfig) GetDependencies() []string {
return dependencies
}
func (p *ProcessConfig) CalculateReplicaName() string {
if p.Replicas <= 1 {
return p.Name
}
myWidth := 1 + int(math.Log10(float64(p.Replicas)))
return fmt.Sprintf("%s-%0*d", p.Name, myWidth, p.ReplicaNum)
}
func NewProcessState(proc *ProcessConfig) *ProcessState {
state := &ProcessState{
Name: proc.ReplicaName,
Namespace: proc.Namespace,
Status: ProcessStatePending,
SystemTime: "",
Age: time.Duration(0),
IsRunning: false,
Health: ProcessHealthUnknown,
Restarts: 0,
ExitCode: 0,
Pid: 0,
}
if proc.Disabled {
state.Status = ProcessStateDisabled
}
return state
}
type ProcessState struct {
Name string `json:"name"`
Namespace string `json:"namespace"`

View File

@ -27,7 +27,7 @@ func (p *Project) GetDependenciesOrderNames() ([]string, error) {
order := []string{}
err := p.WithProcesses([]string{}, func(process ProcessConfig) error {
order = append(order, process.Name)
order = append(order, process.ReplicaName)
return nil
})
return order, err
@ -46,11 +46,10 @@ func (p *Project) GetLexicographicProcessNames() ([]string, error) {
func (p *Project) getProcesses(names ...string) ([]ProcessConfig, error) {
processes := []ProcessConfig{}
if len(names) == 0 {
for name, proc := range p.Processes {
for _, proc := range p.Processes {
if proc.Disabled {
continue
}
proc.Name = name
processes = append(processes, proc)
}
return processes, nil
@ -60,10 +59,21 @@ func (p *Project) getProcesses(names ...string) ([]ProcessConfig, error) {
if proc.Disabled {
continue
}
proc.Name = name
processes = append(processes, proc)
} else {
return processes, fmt.Errorf("no such process: %s", name)
found := false
for _, proc := range p.Processes {
if proc.Name == name {
found = true
if proc.Disabled {
continue
}
processes = append(processes, proc)
}
}
if !found {
return processes, fmt.Errorf("no such process: %s", name)
}
}
}
@ -76,10 +86,10 @@ func (p *Project) withProcesses(names []string, fn ProcessFunc, done map[string]
return err
}
for _, process := range processes {
if done[process.Name] {
if done[process.ReplicaName] {
continue
}
done[process.Name] = true
done[process.ReplicaName] = true
dependencies := process.GetDependencies()
if len(dependencies) > 0 {

View File

@ -12,7 +12,11 @@ func (p *Project) Validate() {
p.setConfigDefaults()
p.deprecationCheck()
p.validateProcessConfig()
p.assignDefaultNamespace()
}
func (p *Project) ValidateAfterMerge() {
p.assignDefaultProcessValues()
p.cloneReplicas()
}
func (p *Project) validateLogLevel() {
@ -58,11 +62,41 @@ func (p *Project) validateProcessConfig() {
}
}
func (p *Project) assignDefaultNamespace() {
func (p *Project) assignDefaultProcessValues() {
for name, proc := range p.Processes {
if proc.Namespace == "" {
proc.Namespace = DefaultNamespace
p.Processes[name] = proc
}
if proc.Replicas == 0 {
proc.Replicas = 1
}
proc.Name = name
p.Processes[name] = proc
}
}
func (p *Project) cloneReplicas() {
procsToAdd := make([]ProcessConfig, 0)
procsToDel := make([]string, 0)
for name, proc := range p.Processes {
if proc.Replicas > 1 {
procsToDel = append(procsToDel, name)
}
for replica := 0; replica < proc.Replicas; replica++ {
proc.ReplicaNum = replica
repName := proc.CalculateReplicaName()
proc.ReplicaName = repName
if proc.Replicas == 1 {
p.Processes[repName] = proc
} else {
procsToAdd = append(procsToAdd, proc)
}
}
}
for _, name := range procsToDel {
delete(p.Processes, name)
}
for _, proc := range procsToAdd {
p.Processes[proc.ReplicaName] = proc
}
}