feat: shutdown in reverse dependency order

address code review

fix: unused variable

refactor: smaller functions

test: shutdown in order

Add shutdown order test

fix: tests
This commit is contained in:
Antonio Nuno Monteiro 2024-03-02 00:00:06 -08:00 committed by Eugene Berger
parent 82b51bae9d
commit df422e3253
7 changed files with 294 additions and 73 deletions

View File

@ -0,0 +1,29 @@
version: "0.5"
log_level: debug
log_length: 1000
processes:
procA:
command: |
trap 'echo "A: exit"' SIGTERM
echo "A: starting"
sleep 3
procB:
command: |
trap 'echo "B: exit"' SIGTERM
echo "B: starting"
sleep 3
depends_on:
procA:
condition: process_started
procC:
command: |
trap 'echo "C: exit"' SIGTERM
echo "C: starting"
sleep 3
depends_on:
procB:
condition: process_started

View File

@ -3,12 +3,13 @@ package app
import "github.com/f1bonacc1/process-compose/src/types"
type ProjectOpts struct {
project *types.Project
processesToRun []string
noDeps bool
mainProcess string
mainProcessArgs []string
isTuiOn bool
project *types.Project
processesToRun []string
noDeps bool
mainProcess string
mainProcessArgs []string
isTuiOn bool
isOrderedShutDown bool
}
func (p *ProjectOpts) WithProject(project *types.Project) *ProjectOpts {
@ -39,3 +40,8 @@ func (p *ProjectOpts) WithIsTuiOn(isTuiOn bool) *ProjectOpts {
p.isTuiOn = isTuiOn
return p
}
func (p *ProjectOpts) WithOrderedShutDown(isOrderedShutDown bool) *ProjectOpts {
p.isOrderedShutDown = isOrderedShutDown
return p
}

View File

@ -8,6 +8,7 @@ import (
"os"
"os/user"
"runtime"
"slices"
"sync"
"time"
@ -15,21 +16,22 @@ import (
)
type ProjectRunner struct {
procConfMutex sync.Mutex
project *types.Project
logsMutex sync.Mutex
processLogs map[string]*pclog.ProcessLogBuffer
statesMutex sync.Mutex
processStates map[string]*types.ProcessState
runProcMutex sync.Mutex
runningProcesses map[string]*Process
logger pclog.PcLogger
waitGroup sync.WaitGroup
exitCode int
projectState *types.ProjectState
mainProcess string
mainProcessArgs []string
isTuiOn bool
procConfMutex sync.Mutex
project *types.Project
logsMutex sync.Mutex
processLogs map[string]*pclog.ProcessLogBuffer
statesMutex sync.Mutex
processStates map[string]*types.ProcessState
runProcMutex sync.Mutex
runningProcesses map[string]*Process
logger pclog.PcLogger
waitGroup sync.WaitGroup
exitCode int
projectState *types.ProjectState
mainProcess string
mainProcessArgs []string
isTuiOn bool
isOrderedShutDown bool
}
func (p *ProjectRunner) GetLexicographicProcessNames() ([]string, error) {
@ -322,27 +324,111 @@ func (p *ProjectRunner) GetProcessPorts(name string) (*types.ProcessPorts, error
return ports, nil
}
func (p *ProjectRunner) runningProcessesReverseDependencies() map[string]map[string]*Process {
reverseDependencies := make(map[string]map[string]*Process)
// `p.runProcMutex` lock is assumed to have been acquired when calling
// this function. It is currently called by `ShutDownProject()`.
for _, process := range p.runningProcesses {
for k := range process.procConf.DependsOn {
if runningProc, ok := p.runningProcesses[k]; ok {
if _, ok := reverseDependencies[runningProc.getName()]; !ok {
dep := make(map[string]*Process)
dep[process.getName()] = process
reverseDependencies[runningProc.getName()] = dep
}
} else {
continue
}
}
}
return reverseDependencies
}
func (p *ProjectRunner) shutDownInOrder(wg *sync.WaitGroup, shutdownOrder []*Process) {
reverseDependencies := p.runningProcessesReverseDependencies()
for _, process := range shutdownOrder {
wg.Add(1)
go func(proc *Process) {
defer wg.Done()
waitForDepsWg := sync.WaitGroup{}
if revDeps, ok := reverseDependencies[proc.getName()]; ok {
for _, runningProc := range revDeps {
waitForDepsWg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
waitForDepsWg.Done()
}(runningProc)
}
}
waitForDepsWg.Wait()
log.Debug().Msgf("[%s]: waited for all dependencies to shut down", proc.getName())
err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
return
}
proc.waitForCompletion()
}(process)
}
}
func (p *ProjectRunner) shutDownAndWait(shutdownOrder []*Process) {
wg := sync.WaitGroup{}
if p.isOrderedShutDown {
p.shutDownInOrder(&wg, shutdownOrder)
} else {
for _, proc := range shutdownOrder {
err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
continue
}
wg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
wg.Done()
}(proc)
}
}
wg.Wait()
}
func (p *ProjectRunner) ShutDownProject() error {
p.runProcMutex.Lock()
defer p.runProcMutex.Unlock()
runProc := p.runningProcesses
for _, proc := range runProc {
shutdownOrder := []*Process{}
if p.isOrderedShutDown {
err := p.project.WithProcesses([]string{}, func(process types.ProcessConfig) error {
if runningProc, ok := p.runningProcesses[process.ReplicaName]; ok {
shutdownOrder = append(shutdownOrder, runningProc)
}
return nil
})
if err != nil {
log.Error().Msgf("Failed to build project run order: %s", err.Error())
}
slices.Reverse(shutdownOrder)
} else {
for _, proc := range p.runningProcesses {
shutdownOrder = append(shutdownOrder, proc)
}
}
var nameOrder []string
for _, v := range shutdownOrder {
nameOrder = append(nameOrder, v.getName())
}
log.Debug().Msgf("Shutting down %d processes. Order: %q", len(shutdownOrder), nameOrder)
for _, proc := range shutdownOrder {
proc.prepareForShutDown()
}
wg := sync.WaitGroup{}
for _, proc := range runProc {
err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
continue
}
wg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
wg.Done()
}(proc)
}
wg.Wait()
p.shutDownAndWait(shutdownOrder)
return nil
}
@ -644,10 +730,11 @@ func NewProjectRunner(opts *ProjectOpts) (*ProjectRunner, error) {
username = current.Username
}
runner := &ProjectRunner{
project: opts.project,
mainProcess: opts.mainProcess,
mainProcessArgs: opts.mainProcessArgs,
isTuiOn: opts.isTuiOn,
project: opts.project,
mainProcess: opts.mainProcess,
mainProcessArgs: opts.mainProcessArgs,
isTuiOn: opts.isTuiOn,
isOrderedShutDown: opts.isOrderedShutDown,
projectState: &types.ProjectState{
FileNames: opts.project.FileNames,
StartTime: time.Now(),

View File

@ -1,10 +1,14 @@
package app
import (
"bufio"
"github.com/f1bonacc1/process-compose/src/loader"
"github.com/f1bonacc1/process-compose/src/types"
"os"
"path/filepath"
"reflect"
"slices"
"strings"
"testing"
"time"
)
@ -406,3 +410,94 @@ func TestSystem_TestProcListToRun(t *testing.T) {
}
})
}
func TestSystem_TestProcListShutsDownInOrder(t *testing.T) {
fixture1 := filepath.Join("..", "..", "fixtures-code", "process-compose-shutdown-inorder.yaml")
t.Run("Single Proc with deps", func(t *testing.T) {
project, err := loader.Load(&loader.LoaderOptions{
FileNames: []string{fixture1},
})
if err != nil {
t.Errorf(err.Error())
return
}
numProc := len(project.Processes)
runner, err := NewProjectRunner(&ProjectOpts{
project: project,
processesToRun: []string{},
mainProcessArgs: []string{},
isOrderedShutDown: true,
})
if err != nil {
t.Errorf(err.Error())
return
}
if len(runner.project.Processes) != numProc {
t.Errorf("should have %d processes", numProc)
}
for name, proc := range runner.project.Processes {
if proc.Disabled {
t.Errorf("process %s is disabled", name)
}
}
file, err := os.CreateTemp("/tmp", "pc_log.*.log")
defer os.Remove(file.Name())
project.LogLocation = file.Name()
project.LoggerConfig = &types.LoggerConfig{
FieldsOrder: []string{"message"},
DisableJSON: true,
TimestampFormat: "",
NoMetadata: true,
FlushEachLine: true,
NoColor: true,
}
go runner.Run()
time.Sleep(10 * time.Millisecond)
states, err := runner.GetProcessesState()
if err != nil {
t.Errorf(err.Error())
return
}
want := 3
if len(states.States) != want {
t.Errorf("len(states.States) = %d, want %d", len(states.States), want)
}
time.Sleep(10 * time.Millisecond)
err = runner.ShutDownProject()
if err != nil {
t.Errorf(err.Error())
return
}
states, err = runner.GetProcessesState()
if err != nil {
t.Errorf(err.Error())
return
}
runningProcesses := 0
for _, processState := range states.States {
if processState.IsRunning {
runningProcesses++
}
}
want = 0
if runningProcesses != want {
t.Errorf("runningProcesses = %d, want %d", runningProcesses, want)
}
//read file and validate the shutdown order
scanner := bufio.NewScanner(file)
order := make([]string, 0)
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, "exit") {
order = append(order, line)
}
}
wantOrder := []string{"C: exit", "B: exit", "A: exit"}
if !slices.Equal(order, wantOrder) {
t.Errorf("content = %v, want %v", order, wantOrder)
return
}
})
}

View File

@ -29,6 +29,7 @@ func getProjectRunner(process []string, noDeps bool, mainProcess string, mainPro
WithMainProcessArgs(mainProcessArgs).
WithProject(project).
WithProcessesToRun(process).
WithOrderedShutDown(*pcFlags.IsOrderedShutDown).
WithNoDeps(noDeps),
)
if err != nil {

View File

@ -63,6 +63,7 @@ func init() {
rootCmd.Flags().BoolVarP(pcFlags.Headless, "tui", "t", *pcFlags.Headless, "enable TUI (-t=false) (env: "+config.EnvVarNameTui+")")
rootCmd.PersistentFlags().BoolVar(pcFlags.KeepTuiOn, "keep-tui", *pcFlags.KeepTuiOn, "keep TUI running even after all processes exit")
rootCmd.PersistentFlags().BoolVar(pcFlags.NoServer, "no-server", *pcFlags.NoServer, "disable HTTP server (env: "+config.EnvVarNameNoServer+")")
rootCmd.PersistentFlags().BoolVar(pcFlags.IsOrderedShutDown, "ordered-shutdown", *pcFlags.IsOrderedShutDown, "shut down processes in reverse dependency order")
rootCmd.Flags().BoolVarP(pcFlags.HideDisabled, "hide-disabled", "d", *pcFlags.HideDisabled, "hide disabled processes")
rootCmd.Flags().IntVarP(pcFlags.RefreshRate, "ref-rate", "r", *pcFlags.RefreshRate, "TUI refresh rate in seconds")
rootCmd.PersistentFlags().IntVarP(pcFlags.PortNum, "port", "p", *pcFlags.PortNum, "port number (env: "+config.EnvVarNamePort+")")

View File

@ -33,43 +33,45 @@ const (
// Flags represents PC configuration flags.
type Flags struct {
RefreshRate *int
PortNum *int
Address *string
LogLevel *string
LogFile *string
LogLength *int
LogFollow *bool
LogTailLength *int
Headless *bool
Command *string
Write *bool
NoDependencies *bool
HideDisabled *bool
SortColumn *string
IsReverseSort *bool
NoServer *bool
KeepTuiOn *bool
RefreshRate *int
PortNum *int
Address *string
LogLevel *string
LogFile *string
LogLength *int
LogFollow *bool
LogTailLength *int
Headless *bool
Command *string
Write *bool
NoDependencies *bool
HideDisabled *bool
SortColumn *string
IsReverseSort *bool
NoServer *bool
KeepTuiOn *bool
IsOrderedShutDown *bool
}
// NewFlags returns new configuration flags.
func NewFlags() *Flags {
return &Flags{
RefreshRate: toPtr(DefaultRefreshRate),
Headless: toPtr(getTuiDefault()),
PortNum: toPtr(getPortDefault()),
Address: toPtr(DefaultAddress),
LogLength: toPtr(DefaultLogLength),
LogLevel: toPtr(DefaultLogLevel),
LogFile: toPtr(GetLogFilePath()),
LogFollow: toPtr(false),
LogTailLength: toPtr(math.MaxInt),
NoDependencies: toPtr(false),
HideDisabled: toPtr(false),
SortColumn: toPtr(DefaultSortColumn),
IsReverseSort: toPtr(false),
NoServer: toPtr(getNoServerDefault()),
KeepTuiOn: toPtr(false),
RefreshRate: toPtr(DefaultRefreshRate),
Headless: toPtr(getTuiDefault()),
PortNum: toPtr(getPortDefault()),
Address: toPtr(DefaultAddress),
LogLength: toPtr(DefaultLogLength),
LogLevel: toPtr(DefaultLogLevel),
LogFile: toPtr(GetLogFilePath()),
LogFollow: toPtr(false),
LogTailLength: toPtr(math.MaxInt),
NoDependencies: toPtr(false),
HideDisabled: toPtr(false),
SortColumn: toPtr(DefaultSortColumn),
IsReverseSort: toPtr(false),
NoServer: toPtr(getNoServerDefault()),
KeepTuiOn: toPtr(false),
IsOrderedShutDown: toPtr(false),
}
}