moved to done channels for syncing dispatch loops

This commit is contained in:
cryptonote-social 2020-08-15 17:33:49 -07:00
parent af379def2c
commit 81e428de5f
2 changed files with 49 additions and 40 deletions

View File

@ -51,11 +51,12 @@ const (
var (
// miner config
configMutex sync.Mutex
plArgs *PoolLoginArgs
threads int
plCount int // incremented by PoolLogin
lastSeed []byte
configMutex sync.Mutex
plArgs *PoolLoginArgs
threads int
lastSeed []byte
stopMiningLoop uint32 // for stopping any existing mining loop
miningLoopDoneChan chan bool // for waiting on an existing mining loop to finish
batteryPower bool
screenIdle bool
@ -140,11 +141,17 @@ func getMiningActivityState() int {
func PoolLogin(args *PoolLoginArgs) *PoolLoginResponse {
configMutex.Lock()
defer configMutex.Unlock()
plCount++ // signals previous mining loop, if any, to exit
crylog.Info("PoolLogin:", plCount)
// close out any previous client connection
if cl.IsAlive() {
cl.Close()
}
// wait for any previous mining loop to terminate
if miningLoopDoneChan != nil {
atomic.StoreUint32(&stopMiningLoop, 1)
<-miningLoopDoneChan
miningLoopDoneChan = nil
atomic.StoreUint32(&stopMiningLoop, 0)
}
loginName := args.Username
if args.Wallet != "" {
loginName = args.Wallet + "." + args.Username
@ -186,7 +193,8 @@ func PoolLogin(args *PoolLoginArgs) *PoolLoginResponse {
plArgs = args
r.Code = 1
go stats.RefreshPoolStats(plArgs.Username)
go MiningLoop(jc, plCount)
miningLoopDoneChan = make(chan bool, 1)
go MiningLoop(jc)
return r
}
@ -244,18 +252,15 @@ func InitMiner(args *InitMinerArgs) *InitMinerResponse {
}
// Returns nil if connection could not be established because of failed call to PoolLogin.
func reconnectClient(loginNumber int) <-chan *client.MultiClientJob {
crylog.Info("Reconnecting loop:", loginNumber)
func reconnectClient() <-chan *client.MultiClientJob {
sleepSec := 3 * time.Second // time to sleep if connection attempt fails
for {
configMutex.Lock()
if loginNumber != plCount {
if atomic.LoadUint32(&stopMiningLoop) == 1 {
// PoolLogin was called since this loop was started, so this mining loop should
// terminate instead of reconnect.
configMutex.Unlock()
return nil
}
configMutex.Lock()
loginName := plArgs.Username
if plArgs.Wallet != "" {
loginName = plArgs.Wallet + "." + plArgs.Username
@ -264,6 +269,7 @@ func reconnectClient(loginNumber int) <-chan *client.MultiClientJob {
config := plArgs.Config
rigid := plArgs.RigID
crylog.Info("Attempting to reconnect...")
err, code, message, jc := cl.Connect("cryptonote.social:5555", false /*useTLS*/, agent, loginName, config, rigid)
configMutex.Unlock()
if err == nil {
@ -281,11 +287,14 @@ func reconnectClient(loginNumber int) <-chan *client.MultiClientJob {
}
// Called by PoolLogin after succesful login.
func MiningLoop(jobChan <-chan *client.MultiClientJob, loginNumber int) {
crylog.Info("Mining loop", loginNumber, "started")
stopWorkers() // a previous mining loop may still have active threads, so we must stop them
// before this loop starts up.
func MiningLoop(jobChan <-chan *client.MultiClientJob) {
defer func() { miningLoopDoneChan <- true }()
crylog.Info("Mining loop started")
// Set up fresh stats ....
stopWorkers()
stats.ResetRecent()
wasJustMining := false
var job *client.MultiClientJob
for {
@ -293,13 +302,14 @@ func MiningLoop(jobChan <-chan *client.MultiClientJob, loginNumber int) {
case job = <-jobChan:
if job == nil {
crylog.Info("stratum client closed")
stopWorkers()
// See if we need to exit this loop or reconnect
jobChan = reconnectClient(loginNumber)
jobChan = reconnectClient()
if jobChan == nil {
crylog.Info("Mining loop", loginNumber, "terminating")
crylog.Info("Mining loop terminating")
return
}
// Set up fresh stats for new connection
stopWorkers()
stats.ResetRecent()
continue
}
@ -404,14 +414,6 @@ func handlePoke(wasMining bool, poke int) int {
if poke == STATE_CHANGE_POKE {
return USE_CACHED
}
/*
isMiningNow := miningActive()
if wasMining != isMiningNow {
// mining state was toggled so fall through using last received job which will
// appropriately halt or restart any mining threads and/or print stats.
return USE_CACHED
}
*/
return HANDLED
}

View File

@ -63,9 +63,11 @@ type Client struct {
jobChannel chan *MultiClientJob
firstJob *MultiClientJob
mutex sync.Mutex
closeCount int // incremented with each call to close to signal old dispatch loops to exit
alive bool // true when the stratum client is connected. Set to false upon call to Close(), or when Connect() is called but
mutex sync.Mutex
stopDispatch bool // for stopping any existing dispatch loop
dispatchLoopDoneChan chan bool // for waiting on an existing dispatch loop to finish
alive bool // true when the stratum client is connected. Set to false upon call to Close(), or when Connect() is called but
// a new connection is yet to be established.
}
@ -88,6 +90,11 @@ func (cl *Client) Connect(
crylog.Error("client already connected!")
return errors.New("client already connected"), 0, "", nil
}
if cl.dispatchLoopDoneChan != nil {
<-cl.dispatchLoopDoneChan
cl.dispatchLoopDoneChan = nil
}
cl.address = address
if !useTLS {
cl.conn, err = net.DialTimeout("tcp", address, time.Second*30)
@ -166,7 +173,8 @@ func (cl *Client) Connect(
}
cl.alive = true
cl.firstJob = response.Result.Job
go cl.dispatchJobs(cl.closeCount)
cl.dispatchLoopDoneChan = make(chan bool, 1)
go cl.dispatchJobs()
if response.Warning != nil {
return nil, response.Warning.Code, response.Warning.Message, cl.jobChannel
}
@ -270,7 +278,6 @@ func (cl *Client) close() {
crylog.Warn("tried to close dead client")
return
}
cl.closeCount++
cl.alive = false
cl.conn.Close()
close(cl.jobChannel)
@ -288,10 +295,11 @@ type SubmitWorkResponse struct {
// DispatchJobs will forward incoming jobs to the JobChannel until error is received or the
// connection is closed. Client will be in not-alive state on return.
func (cl *Client) dispatchJobs(closeID int) {
func (cl *Client) dispatchJobs() {
defer func() { cl.dispatchLoopDoneChan <- true }()
cl.mutex.Lock()
if closeID != cl.closeCount {
crylog.Info("exiting obsolete dispatch loop:", closeID, cl.closeCount)
if !cl.alive {
crylog.Info("exiting dispatch loop")
cl.mutex.Unlock()
return
}
@ -305,13 +313,12 @@ func (cl *Client) dispatchJobs(closeID int) {
conn.SetReadDeadline(time.Now().Add(3600 * time.Second))
err := readJSON(response, reader)
cl.mutex.Lock()
if closeID != cl.closeCount {
crylog.Info("exiting obsolete dispatch loop:", closeID, cl.closeCount)
if !cl.alive {
cl.mutex.Unlock()
return
}
if err != nil {
crylog.Error("readJSON failed, closing client and exiting dispatch", closeID, ":", err)
crylog.Error("readJSON failed, closing client and exiting dispatch:", err)
cl.close()
cl.mutex.Unlock()
return