simplify client implementation to fix race condition

This commit is contained in:
cryptonote-social 2020-08-17 21:38:55 -07:00
parent 827030daed
commit 4f49bda761
2 changed files with 72 additions and 90 deletions

View File

@ -61,8 +61,10 @@ const (
var (
// miner config
configMutex sync.Mutex
plArgs *PoolLoginArgs // nil if nobody is currently logged in
configMutex sync.Mutex
// plArgs (pool login args) is nil if nobody is currently logged in, which also implies
// dispatch loop isn't active.
plArgs *PoolLoginArgs
threads int
lastSeed []byte
excludeHourStart, excludeHourEnd int
@ -337,10 +339,8 @@ func MiningLoop(jobChan <-chan *client.MultiClientJob, done chan<- bool) {
var job *client.MultiClientJob
sleepSec := 3 * time.Second // time to sleep if connection attempt fails
for {
crylog.Info("selecting")
select {
case poke := <-pokeChannel:
crylog.Info("poke!", poke)
if poke == EXIT_LOOP_POKE {
crylog.Info("Stopping mining loop")
stopWorkers()
@ -353,9 +353,9 @@ func MiningLoop(jobChan <-chan *client.MultiClientJob, done chan<- bool) {
}
case job = <-jobChan:
crylog.Info("job!", job)
if job == nil {
crylog.Info("stratum client closed, reconnecting...")
cl.Close()
newChan := reconnectClient()
if newChan == nil {
crylog.Info("reconnect failed. sleeping for", sleepSec, "seconds before trying again")
@ -373,7 +373,6 @@ func MiningLoop(jobChan <-chan *client.MultiClientJob, done chan<- bool) {
crylog.Info("Current job:", job.JobID, "Difficulty:", blockchain.TargetToDifficulty(job.Target))
case <-time.After(15 * time.Second):
crylog.Info("select timeout!")
break
}
@ -392,7 +391,6 @@ func MiningLoop(jobChan <-chan *client.MultiClientJob, done chan<- bool) {
stats.ResetRecent()
}
crylog.Info("Hi")
as := getMiningActivityState()
crylog.Info("Activity state:", as)
if as < 0 {
@ -416,7 +414,7 @@ func MiningLoop(jobChan <-chan *client.MultiClientJob, done chan<- bool) {
}
}
defer crylog.Info("Mining loop terminated")
crylog.Info("Mining loop terminated")
}
// Stop all active worker threads and wait for them to finish before returning. Should
@ -428,7 +426,6 @@ func stopWorkers() {
}
func handlePoke(poke int) {
crylog.Info("Handling poke:", poke)
switch poke {
case INCREASE_THREADS_POKE:
stopWorkers()
@ -479,6 +476,12 @@ type GetMiningStateResponse struct {
// poke the job dispatcher to refresh recent stats. result may not be immediate but should happen
// quickly.
func RequestRecentStatsUpdate() {
configMutex.Lock()
defer configMutex.Unlock()
if plArgs == nil {
// dispatch loop inactive so there are no stats to update
return
}
go pokeJobDispatcher(UPDATE_STATS_POKE) // own gorouting so as not to block
}
@ -520,11 +523,37 @@ func updatePoolStats(isMining bool) {
}
func IncreaseThreads() {
go pokeJobDispatcher(INCREASE_THREADS_POKE)
configMutex.Lock()
defer configMutex.Unlock()
if plArgs != nil {
go pokeJobDispatcher(INCREASE_THREADS_POKE)
return
}
// dispatch loop isn't active so just handle this here
t := rx.AddThread()
if t < 0 {
configMutex.Unlock()
crylog.Error("Failed to add another thread")
return
}
threads = t
}
func DecreaseThreads() {
go pokeJobDispatcher(DECREASE_THREADS_POKE)
configMutex.Lock()
defer configMutex.Unlock()
if plArgs != nil {
go pokeJobDispatcher(DECREASE_THREADS_POKE)
return
}
// dispatch loop isn't active so just handle this here
t := rx.RemoveThread()
if t < 0 {
configMutex.Unlock()
crylog.Error("Failed to decrease threads")
return
}
threads = t
}
// Poke the job dispatcher. Though it should be unlikely, this method may block if the channel is
@ -606,6 +635,7 @@ func goMine(job client.MultiClientJob, thread int) {
}
resp, err := cl.SubmitWork(fnonce, jobid)
if err != nil {
cl.Close()
crylog.Warn("Submit work client failure:", jobid, err)
return
}
@ -633,7 +663,9 @@ func OverrideMiningActivityState(mine bool) {
}
crylog.Info("New mining override state:", newState)
miningOverride = newState
go pokeJobDispatcher(STATE_CHANGE_POKE) // call in own goroutine in case it blocks
if plArgs != nil {
go pokeJobDispatcher(STATE_CHANGE_POKE) // call in own goroutine in case it blocks
}
}
func RemoveMiningActivityOverride() {
@ -644,7 +676,9 @@ func RemoveMiningActivityOverride() {
}
crylog.Info("Removing mining override")
miningOverride = 0
go pokeJobDispatcher(STATE_CHANGE_POKE) // call in own goroutine in case it blocks
if plArgs != nil {
go pokeJobDispatcher(STATE_CHANGE_POKE) // call in own goroutine in case it blocks
}
}
func ReportIdleScreenState(isIdle bool) {
@ -655,7 +689,9 @@ func ReportIdleScreenState(isIdle bool) {
}
crylog.Info("Screen idle state changed to:", isIdle)
screenIdle = isIdle
go pokeJobDispatcher(STATE_CHANGE_POKE) // call in own goroutine in case it blocks
if plArgs != nil {
go pokeJobDispatcher(STATE_CHANGE_POKE) // call in own goroutine in case it blocks
}
}
func ReportPowerState(battery bool) {
@ -666,7 +702,9 @@ func ReportPowerState(battery bool) {
}
crylog.Info("Battery state changed to:", battery)
batteryPower = battery
go pokeJobDispatcher(STATE_CHANGE_POKE) // call in own goroutine in case it blocks
if plArgs != nil {
go pokeJobDispatcher(STATE_CHANGE_POKE) // call in own goroutine in case it blocks
}
}
// configMutex should be locked before calling

View File

@ -60,14 +60,9 @@ type Client struct {
address string
conn net.Conn
responseChannel chan *SubmitWorkResponse
jobChannel chan *MultiClientJob
firstJob *MultiClientJob
mutex sync.Mutex
doneChanMutex sync.Mutex
dispatchLoopDoneChan chan bool // for waiting on an existing dispatch loop to finish
alive bool // true when the stratum client is connected. Set to false upon call to Close(), or when Connect() is called but
// a new connection is yet to be established.
}
@ -89,30 +84,12 @@ func (cl *Client) IsAlive() bool {
// code and message will also be specified. If the stratum server returned just a warning, then
// error will be nil, but code & message will be specified.
func (cl *Client) Connect(
address string, useTLS bool, agent string,
uname, pw, rigid string) (err error, code int, message string, jobChan <-chan *MultiClientJob) {
cl.doneChanMutex.Lock()
defer cl.doneChanMutex.Unlock()
if cl.dispatchLoopDoneChan != nil {
cl.Close() // just in case caller forgot to call Close before trying a new connection
// wait until previous dispatch loop completes
crylog.Info("Waiting on dispatch loop to terminate")
<-cl.dispatchLoopDoneChan
crylog.Info("Dispatch loop done")
cl.dispatchLoopDoneChan = nil
}
return cl.connect(address, useTLS, agent, uname, pw, rigid)
}
func (cl *Client) connect(
address string, useTLS bool, agent string,
uname, pw, rigid string) (err error, code int, message string, jobChan <-chan *MultiClientJob) {
crylog.Info("in connect")
cl.Close() // just in case caller forgot to call close before trying a new connection
cl.mutex.Lock()
defer cl.mutex.Unlock()
crylog.Info("got by lock")
cl.alive = false
cl.address = address
if !useTLS {
@ -124,8 +101,6 @@ func (cl *Client) connect(
crylog.Error("Dial failed:", err, cl)
return err, 0, "", nil
}
cl.responseChannel = make(chan *SubmitWorkResponse)
cl.jobChannel = make(chan *MultiClientJob)
// send login
loginRequest := &struct {
ID uint64 `json:"id"`
@ -159,8 +134,6 @@ func (cl *Client) connect(
return err, 0, "", nil
}
crylog.Info("got by write")
// Now read the login response
response := &struct {
ID uint64 `json:"id"`
@ -192,15 +165,16 @@ func (cl *Client) connect(
crylog.Error("Didn't get job result from login response:", response.Error)
return errors.New("stratum server error"), response.Error.Code, response.Error.Message, nil
}
cl.alive = true
cl.firstJob = response.Result.Job
cl.dispatchLoopDoneChan = make(chan bool, 1)
go cl.dispatchJobs(cl.dispatchLoopDoneChan)
crylog.Info("Connect successful")
cl.responseChannel = make(chan *SubmitWorkResponse)
cl.alive = true
jc := make(chan *MultiClientJob)
go dispatchJobs(cl.conn, jc, response.Result.Job, cl.responseChannel)
if response.Warning != nil {
return nil, response.Warning.Code, response.Warning.Message, cl.jobChannel
return nil, response.Warning.Code, response.Warning.Message, jc
}
return nil, 0, "", cl.jobChannel
return nil, 0, "", jc
}
// if error is returned then client will be closed and put in not-alive state
@ -238,7 +212,6 @@ func (cl *Client) submitRequest(submitRequest interface{}) (*SubmitWorkResponse,
data, err := json.Marshal(submitRequest)
if err != nil {
crylog.Error("json marshalling failed:", err, "for client")
cl.close()
cl.mutex.Unlock()
return nil, err
}
@ -246,7 +219,6 @@ func (cl *Client) submitRequest(submitRequest interface{}) (*SubmitWorkResponse,
data = append(data, '\n')
if _, err = cl.conn.Write(data); err != nil {
crylog.Error("writing request failed:", err, "for client")
cl.close()
cl.mutex.Unlock()
return nil, err
}
@ -256,12 +228,11 @@ func (cl *Client) submitRequest(submitRequest interface{}) (*SubmitWorkResponse,
// await the response
response := <-respChan
if response == nil {
crylog.Error("got nil response, client closed?", cl.IsAlive())
crylog.Error("got nil response, closing")
return nil, fmt.Errorf("submit work failure: nil response")
}
if response.ID != SUBMIT_WORK_JSON_ID {
crylog.Error("got unexpected response:", response.ID, "Closing connection.")
cl.Close()
return nil, fmt.Errorf("submit work failure: unexpected response")
}
return response, nil
@ -289,19 +260,12 @@ func (cl *Client) SubmitWork(nonce string, jobid string) (*SubmitWorkResponse, e
func (cl *Client) Close() {
cl.mutex.Lock()
defer cl.mutex.Unlock()
cl.close()
}
// client must be locked before calling
func (cl *Client) close() {
if !cl.alive {
crylog.Warn("tried to close dead client")
return
}
cl.alive = false
cl.conn.Close()
close(cl.jobChannel)
close(cl.responseChannel)
}
type SubmitWorkResponse struct {
@ -315,57 +279,37 @@ type SubmitWorkResponse struct {
// DispatchJobs will forward incoming jobs to the JobChannel until error is received or the
// connection is closed. Client will be in not-alive state on return.
func (cl *Client) dispatchJobs(done chan<- bool) {
func dispatchJobs(conn net.Conn, jobChan chan<- *MultiClientJob, firstJob *MultiClientJob, responseChan chan<- *SubmitWorkResponse) {
defer func() {
crylog.Info("Sending true on dispatch done channel")
done <- true
crylog.Info("done sent")
close(jobChan)
close(responseChan)
}()
cl.mutex.Lock()
if !cl.alive {
crylog.Info("exiting dispatch loop")
cl.mutex.Unlock()
return
}
cl.jobChannel <- cl.firstJob
cl.firstJob = nil
conn := cl.conn
cl.mutex.Unlock()
crylog.Info("starting dispatch loop")
jobChan <- firstJob
reader := bufio.NewReaderSize(conn, MAX_REQUEST_SIZE)
for {
response := &SubmitWorkResponse{}
conn.SetReadDeadline(time.Now().Add(3600 * time.Second))
err := readJSON(response, reader)
cl.mutex.Lock()
if !cl.alive {
cl.mutex.Unlock()
return
}
if err != nil {
crylog.Error("readJSON failed, closing client and exiting dispatch:", err)
cl.close()
cl.mutex.Unlock()
return
break
}
if response.Method != "job" {
if response.ID == SUBMIT_WORK_JSON_ID {
cl.responseChannel <- response
cl.mutex.Unlock()
responseChan <- response
continue
}
crylog.Warn("Unexpected response:", *response)
cl.mutex.Unlock()
continue
}
if response.Job == nil {
crylog.Error("Didn't get job as expected:", *response)
cl.close()
cl.mutex.Unlock()
return
break
}
cl.jobChannel <- response.Job
cl.mutex.Unlock()
jobChan <- response.Job
}
crylog.Info("dispatch loop done")
}
func readJSON(response interface{}, reader *bufio.Reader) error {