restructure client locking strategy to avoid sending on closed channel panic

This commit is contained in:
cryptonote-social 2020-07-31 14:09:26 -07:00
parent 64378a3397
commit e4c78caa61
2 changed files with 59 additions and 42 deletions

View File

@ -101,31 +101,7 @@ func Mine(s ScreenStater, threads int, uname, rigid string, saver bool, excludeH
wasJustMining := false
for {
clMutex.Lock()
clientAlive = false
clMutex.Unlock()
sleepSec := 3 * time.Second
for {
if startDiff > 0 {
if len(config) > 0 {
config += ";"
}
config += "start_diff=" + strconv.Itoa(startDiff)
}
err := cl.Connect(uname, config, rigid, useTLS)
if err != nil {
crylog.Warn("Client failed to connect:", err)
time.Sleep(sleepSec)
sleepSec += time.Second
continue
}
break
}
clMutex.Lock()
clientAlive = true
clMutex.Unlock()
crylog.Info("Connected")
connectClient(cl, uname, rigid, startDiff, config, useTLS)
var cachedJob *client.MultiClientJob
for {
onoff := getActivityMessage(excludeHrStart, excludeHrEnd, threads)
@ -191,6 +167,47 @@ func Mine(s ScreenStater, threads int, uname, rigid string, saver bool, excludeH
}
}
// connectClient will try to connect to the stratum server and won't return until successful. It will
// also start job dispatching loop.
func connectClient(cl *client.Client, uname, rigid string, startDiff int, config string, useTLS bool) {
clMutex.Lock()
clientAlive = false
clMutex.Unlock()
sleepSec := 3 * time.Second
for {
if startDiff > 0 {
if len(config) > 0 {
config += ";"
}
config += "start_diff=" + strconv.Itoa(startDiff)
}
err := cl.Connect(uname, config, rigid, useTLS)
if err != nil {
crylog.Warn("Client failed to connect:", err)
time.Sleep(sleepSec)
sleepSec += time.Second
continue
}
break
}
clMutex.Lock()
clientAlive = true
clMutex.Unlock()
go func() {
err := cl.DispatchJobs()
if err != nil {
crylog.Error("Job dispatcher exitted with error:", err)
}
clMutex.Lock()
clientAlive = false
clMutex.Unlock()
cl.Close()
}()
crylog.Info("Connected")
}
func timeExcluded(startHr, endHr int) bool {
currHr := time.Now().Hour()
if startHr < endHr {

View File

@ -54,10 +54,12 @@ type MultiClientJob struct {
}
type Client struct {
JobChannel chan *MultiClientJob
address string
agent string
conn net.Conn
JobChannel chan *MultiClientJob
firstJob *MultiClientJob
responseChannel chan *SubmitWorkResponse
}
@ -134,10 +136,7 @@ func (cl *Client) Connect(uname, pw, rigid string, useTLS bool) error {
crylog.Error("Didn't get job result from login response:", response.Error)
return errors.New("no job")
}
// We pass in the connection+channels explicitly instead of just the client object itself to
// avoid race conditions should the client be closed & reconnected (which generates new
// conn+channels)
go dispatchJobs(cl.conn, cl.JobChannel, cl.responseChannel, response.Result.Job)
cl.firstJob = response.Result.Job
return nil
}
@ -230,6 +229,8 @@ func (cl *Client) String() string {
func (cl *Client) Close() {
cl.conn.Close()
close(cl.JobChannel)
close(cl.responseChannel)
}
type SubmitWorkResponse struct {
@ -241,21 +242,23 @@ type SubmitWorkResponse struct {
Error map[string]interface{} `json:"error"`
}
func dispatchJobs(conn net.Conn, jobChannel chan *MultiClientJob, responseChannel chan *SubmitWorkResponse, job *MultiClientJob) {
jobChannel <- job
reader := bufio.NewReaderSize(conn, MAX_REQUEST_SIZE)
// DispatchJobs will forward incoming jobs to the JobChannel until error is received or the
// connection is closed.
func (cl *Client) DispatchJobs() error {
cl.JobChannel <- cl.firstJob
cl.firstJob = nil
reader := bufio.NewReaderSize(cl.conn, MAX_REQUEST_SIZE)
for {
response := &SubmitWorkResponse{}
conn.SetReadDeadline(time.Now().Add(3600 * time.Second))
cl.conn.SetReadDeadline(time.Now().Add(3600 * time.Second))
err := readJSON(response, reader)
if err != nil {
crylog.Error("readJSON failed:", err)
break
return err
}
if response.Method != "job" {
if response.ID == SUBMIT_WORK_JSON_ID {
responseChannel <- response
cl.responseChannel <- response
continue
}
crylog.Warn("Unexpected response:", *response)
@ -263,13 +266,10 @@ func dispatchJobs(conn net.Conn, jobChannel chan *MultiClientJob, responseChanne
}
if response.Job == nil {
crylog.Error("Didn't get job:", *response)
break
return errors.New("didn't get job as expected")
}
jobChannel <- response.Job
cl.JobChannel <- response.Job
}
conn.Close()
close(jobChannel)
close(responseChannel)
}
func readJSON(response interface{}, reader *bufio.Reader) error {