Run Process

This commit is contained in:
Berger Eugene 2022-04-26 00:07:17 +03:00
parent 37e1ffd48a
commit 296cfd0c71
7 changed files with 78 additions and 43 deletions

View File

@ -64,7 +64,6 @@ processes:
backoff_seconds: 2
log_location: pc_tst.log
environment:
- 'ABC=222'

View File

@ -41,3 +41,19 @@ func StopProcess(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"name": name})
}
// @Summary Start a process
// @Produce json
// @Param name path string true "Process Name"
// @Success 200 {string} string "Started Process Name"
// @Router /process/start/{name} [post]
func StartProcess(c *gin.Context) {
name := c.Param("name")
err := app.PROJ.StartProcess(name)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"name": name})
}

View File

@ -32,6 +32,7 @@ func InitRoutes() *gin.Engine {
r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler, url))
r.GET("/processes", GetProcesses)
r.PATCH("/process/stop/:name", StopProcess)
r.POST("/process/start/:name", StartProcess)
return r
}

View File

@ -1,6 +1,10 @@
package app
import "sync"
import (
"sync"
"github.com/f1bonacc1/process-compose/src/pclog"
)
type Project struct {
Version string `yaml:"version"`
@ -12,6 +16,8 @@ type Project struct {
runningProcesses map[string]*Process
processStates map[string]*ProcessState
mapMutex sync.Mutex
logger pclog.PcLogger
wg sync.WaitGroup
}
type Processes map[string]ProcessConfig
@ -51,6 +57,7 @@ const (
)
const (
ProcessStateDisabled = "Disabled"
ProcessStatePending = "Pending"
ProcessStateRunning = "Running"
ProcessStateRestarting = "Restarting"

View File

@ -8,7 +8,6 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"github.com/f1bonacc1/process-compose/src/pclog"
"github.com/joho/godotenv"
@ -23,7 +22,7 @@ func (p *Project) Run() {
p.initProcessStates()
p.runningProcesses = make(map[string]*Process)
runOrder := []ProcessConfig{}
p.WithServices([]string{}, func(process ProcessConfig) error {
p.WithProcesses([]string{}, func(process ProcessConfig) error {
runOrder = append(runOrder, process)
return nil
})
@ -31,38 +30,40 @@ func (p *Project) Run() {
for _, v := range runOrder {
nameOrder = append(nameOrder, v.Name)
}
var logger pclog.PcLogger = pclog.NewNilLogger("")
p.logger = pclog.NewNilLogger()
if isStringDefined(p.LogLocation) {
logger = pclog.NewLogger(p.LogLocation)
defer logger.Close()
p.logger = pclog.NewLogger(p.LogLocation)
defer p.logger.Close()
}
log.Debug().Msgf("Spinning up %d processes. Order: %q", len(runOrder), nameOrder)
var wg sync.WaitGroup
for _, proc := range runOrder {
procLogger := logger
if isStringDefined(proc.LogLocation) {
procLogger = pclog.NewLogger(proc.LogLocation)
}
process := NewProcess(p.Environment, procLogger, proc, p.GetProcessState(proc.Name), 1)
p.addRunningProcess(process)
wg.Add(1)
go func() {
defer p.removeRunningProcess(process.GetName())
defer wg.Done()
if err := p.WaitIfNeeded(process.procConf); err != nil {
log.Error().Msgf("Error: %s", err.Error())
log.Error().Msgf("Error: process %s won't run", process.GetName())
process.WontRun()
} else {
process.Run()
}
}()
p.runProcess(proc)
}
wg.Wait()
p.wg.Wait()
}
func (p *Project) WaitIfNeeded(process ProcessConfig) error {
func (p *Project) runProcess(proc ProcessConfig) {
procLogger := p.logger
if isStringDefined(proc.LogLocation) {
procLogger = pclog.NewLogger(proc.LogLocation)
}
process := NewProcess(p.Environment, procLogger, proc, p.GetProcessState(proc.Name), 1)
p.addRunningProcess(process)
p.wg.Add(1)
go func() {
defer p.removeRunningProcess(process.GetName())
defer p.wg.Done()
if err := p.waitIfNeeded(process.procConf); err != nil {
log.Error().Msgf("Error: %s", err.Error())
log.Error().Msgf("Error: process %s won't run", process.GetName())
process.WontRun()
} else {
process.Run()
}
}()
}
func (p *Project) waitIfNeeded(process ProcessConfig) error {
for k := range process.DependsOn {
if runningProc := p.getRunningProcess(k); runningProc != nil {
@ -85,15 +86,15 @@ func (p *Project) WaitIfNeeded(process ProcessConfig) error {
func (p *Project) initProcessStates() {
p.processStates = make(map[string]*ProcessState)
for key, proc := range p.Processes {
if proc.Disabled {
continue
}
p.processStates[key] = &ProcessState{
Name: key,
Status: ProcessStatePending,
Restarts: 0,
ExitCode: 0,
}
if proc.Disabled {
p.processStates[key].Status = ProcessStateDisabled
}
}
}
@ -126,6 +127,22 @@ func (p *Project) removeRunningProcess(name string) {
p.mapMutex.Unlock()
}
func (p *Project) StartProcess(name string) error {
proc := p.getRunningProcess(name)
if proc != nil {
log.Error().Msgf("Process %s is already running", name)
return fmt.Errorf("process %s is already running", name)
}
if proc, ok := p.Processes[name]; ok {
proc.Name = name
p.runProcess(proc)
} else {
return fmt.Errorf("no such process: %s", name)
}
return nil
}
func (p *Project) StopProcess(name string) error {
proc := p.getRunningProcess(name)
if proc == nil {
@ -136,7 +153,7 @@ func (p *Project) StopProcess(name string) error {
return nil
}
func (p *Project) GetProcesses(names ...string) ([]ProcessConfig, error) {
func (p *Project) getProcesses(names ...string) ([]ProcessConfig, error) {
processes := []ProcessConfig{}
if len(names) == 0 {
for name, proc := range p.Processes {
@ -166,12 +183,12 @@ func (p *Project) GetProcesses(names ...string) ([]ProcessConfig, error) {
type ProcessFunc func(process ProcessConfig) error
// WithProcesses run ProcesseFunc on each Process and dependencies in dependency order
func (p *Project) WithServices(names []string, fn ProcessFunc) error {
func (p *Project) WithProcesses(names []string, fn ProcessFunc) error {
return p.withProcesses(names, fn, map[string]bool{})
}
func (p *Project) withProcesses(names []string, fn ProcessFunc, done map[string]bool) error {
processes, err := p.GetProcesses(names...)
processes, err := p.getProcesses(names...)
if err != nil {
return err
}
@ -198,7 +215,7 @@ func (p *Project) withProcesses(names []string, fn ProcessFunc, done map[string]
func (p *Project) GetDependenciesOrderNames() ([]string, error) {
order := []string{}
err := p.WithServices([]string{}, func(process ProcessConfig) error {
err := p.WithProcesses([]string{}, func(process ProcessConfig) error {
order = append(order, process.Name)
return nil
})
@ -208,10 +225,7 @@ func (p *Project) GetDependenciesOrderNames() ([]string, error) {
func (p *Project) GetLexicographicProcessNames() ([]string, error) {
names := []string{}
for name, proc := range p.Processes {
if proc.Disabled {
continue
}
for name := range p.Processes {
names = append(names, name)
}
sort.Strings(names)

View File

@ -1,7 +1,6 @@
package app
import (
"fmt"
"os"
"path/filepath"
"reflect"
@ -34,7 +33,6 @@ func TestSystem_TestFixtures(t *testing.T) {
}
func TestSystem_TestComposeWithLog(t *testing.T) {
fmt.Println(os.Getwd())
fixture := filepath.Join("..", "..", "fixtures", "process-compose-with-log.yaml")
t.Run(fixture, func(t *testing.T) {
project := CreateProject(fixture)

View File

@ -3,7 +3,7 @@ package pclog
type PcNilLog struct {
}
func NewNilLogger(outputPath string) *PcNilLog {
func NewNilLogger() *PcNilLog {
return &PcNilLog{}
}