stop any active worker threads before starting a new mining loop

This commit is contained in:
cryptonote-social 2020-08-15 16:36:18 -07:00
parent fbdcb961e0
commit af379def2c

View File

@ -66,6 +66,10 @@ var (
// used to send messages to main job loop to take various actions
pokeChannel chan int
// Worker thread synchronization vars
wg sync.WaitGroup // used to wait for stopped worker threads to finish
stopper uint32 // atomic int used to signal rxlib worker threads to stop mining
)
type PoolLoginArgs struct {
@ -276,14 +280,12 @@ func reconnectClient(loginNumber int) <-chan *client.MultiClientJob {
}
}
// Called by PoolLogin after succesful login.
func MiningLoop(jobChan <-chan *client.MultiClientJob, loginNumber int) {
crylog.Info("Mining loop", loginNumber, "started")
stopWorkers() // a previous mining loop may still have active threads, so we must stop them
// before this loop starts up.
stats.ResetRecent()
// Synchronization vars
var wg sync.WaitGroup // used to wait for stopped worker threads to finish
var stopper uint32 // atomic int used to signal rxlib worker threads to stop mining
wasJustMining := false
var job *client.MultiClientJob
for {
@ -291,8 +293,7 @@ func MiningLoop(jobChan <-chan *client.MultiClientJob, loginNumber int) {
case job = <-jobChan:
if job == nil {
crylog.Info("stratum client closed")
atomic.StoreUint32(&stopper, 1)
wg.Wait()
stopWorkers()
// See if we need to exit this loop or reconnect
jobChan = reconnectClient(loginNumber)
if jobChan == nil {
@ -303,7 +304,7 @@ func MiningLoop(jobChan <-chan *client.MultiClientJob, loginNumber int) {
continue
}
case poke := <-pokeChannel:
pokeRes := handlePoke(true /*wasJustMining*/, poke, &stopper, &wg)
pokeRes := handlePoke(true /*wasJustMining*/, poke)
switch pokeRes {
case HANDLED:
continue
@ -319,9 +320,7 @@ func MiningLoop(jobChan <-chan *client.MultiClientJob, loginNumber int) {
}
crylog.Info("Current job:", job.JobID, "Difficulty:", blockchain.TargetToDifficulty(job.Target))
// Stop existing mining, if any, and wait for mining threads to finish.
atomic.StoreUint32(&stopper, 1)
wg.Wait()
stopWorkers()
as := getMiningActivityState()
if as < 0 {
@ -357,18 +356,23 @@ func MiningLoop(jobChan <-chan *client.MultiClientJob, loginNumber int) {
atomic.StoreUint32(&stopper, 0)
for i := 0; i < threads; i++ {
wg.Add(1)
go goMine(&wg, &stopper, *job, i /*thread*/)
go goMine(*job, i /*thread*/)
}
}
defer crylog.Info("Mining loop terminated")
}
func handlePoke(wasMining bool, poke int, stopper *uint32, wg *sync.WaitGroup) int {
// Stop all active worker threads and wait for them to finish before returning
func stopWorkers() {
atomic.StoreUint32(&stopper, 1)
wg.Wait()
}
func handlePoke(wasMining bool, poke int) int {
crylog.Info("Handling poke:", poke, wasMining)
if poke == INCREASE_THREADS_POKE {
atomic.StoreUint32(stopper, 1)
wg.Wait()
stopWorkers()
configMutex.Lock()
t := rx.AddThread()
if t < 0 {
@ -383,8 +387,7 @@ func handlePoke(wasMining bool, poke int, stopper *uint32, wg *sync.WaitGroup) i
return USE_CACHED
}
if poke == DECREASE_THREADS_POKE {
atomic.StoreUint32(stopper, 1)
wg.Wait()
stopWorkers()
configMutex.Lock()
t := rx.RemoveThread()
if t < 0 {
@ -491,7 +494,7 @@ func printStats(isMining bool) {
crylog.Info("=====================================")
}
func goMine(wg *sync.WaitGroup, stopper *uint32, job client.MultiClientJob, thread int) {
func goMine(job client.MultiClientJob, thread int) {
defer wg.Done()
input, err := hex.DecodeString(job.Blob)
diffTarget := blockchain.TargetToDifficulty(job.Target)
@ -504,7 +507,7 @@ func goMine(wg *sync.WaitGroup, stopper *uint32, job client.MultiClientJob, thre
nonce := make([]byte, 4)
for {
res := rx.HashUntil(input, uint64(diffTarget), thread, hash, nonce, stopper)
res := rx.HashUntil(input, uint64(diffTarget), thread, hash, nonce, &stopper)
if res <= 0 {
stats.TallyHashes(-res)
break