put client locking into client itself instead of outside

This commit is contained in:
cryptonote-social 2020-08-11 21:54:39 -07:00
parent e1332018d3
commit 3498d7c1bc
2 changed files with 111 additions and 64 deletions

View File

@ -9,7 +9,7 @@ import (
"bytes"
"encoding/hex"
"runtime"
"sync"
// "sync"
"sync/atomic"
"time"
)
@ -35,9 +35,8 @@ const (
)
var (
cl *client.Client
clMutex sync.Mutex
clientAlive bool // true when the stratum client is connected and healthy
plArgs *PoolLoginArgs
cl client.Client
screenIdle int32 // only mine when this is > 0
batteryPower int32 // only mine when this is > 0
@ -97,21 +96,16 @@ type PoolLoginResponse struct {
func PoolLogin(args *PoolLoginArgs) *PoolLoginResponse {
r := &PoolLoginResponse{}
cl = client.NewClient("cryptonote.social:5555", args.Agent)
screenIdle = 0
batteryPower = 0
manualMinerToggle = 0
clMutex.Lock()
defer clMutex.Unlock()
clientAlive = false
loginName := args.Username
if args.Wallet != "" {
loginName = args.Wallet + "." + args.Username
}
err, code, message := cl.Connect(loginName, args.Config, args.RigID, false /*useTLS*/)
err, code, message := cl.Connect("cryptonote.social:5555", false /*useTLS*/, args.Agent, loginName, args.Config, args.RigID)
if err != nil {
if code != 0 {
crylog.Error("Pool server did not allow login due to error:")
@ -139,8 +133,8 @@ func PoolLogin(args *PoolLoginArgs) *PoolLoginResponse {
r.Message = message
}
// login successful
clientAlive = true
r.Code = 1
plArgs = args
return r
}
@ -180,25 +174,20 @@ func StartMiner(args *StartMinerArgs) *StartMinerResponse {
return r
}
// Make sure connection was established
clMutex.Lock()
alive := clientAlive
firstJob := cl.FirstJob
clMutex.Unlock()
if !alive {
if !cl.IsAlive() {
r.Code = -1
r.Message = "StartMiner cannot be called until you first make a successful call to PoolLogin"
return r
}
newSeed, err := hex.DecodeString(firstJob.SeedHash)
jobChan, seedHashStr := cl.StartDispatching()
seed, err := hex.DecodeString(seedHashStr)
if err != nil {
// shouldn't happen?
crylog.Error("Invalid seed hash:", firstJob.SeedHash)
r.Code = -2
r.Message = "Invalid seed hash from pool server"
r.Message = "Could not decode initial RandomX seed hash from pool"
return r
}
code := rx.InitRX(newSeed, args.Threads, runtime.GOMAXPROCS(0))
code := rx.InitRX(seed, args.Threads, runtime.GOMAXPROCS(0))
if code < 0 {
crylog.Error("Failed to initialize RandomX")
r.Code = -3
@ -210,41 +199,46 @@ func StartMiner(args *StartMinerArgs) *StartMinerResponse {
} else {
r.Code = 1
}
threads = args.Threads
go MiningLoop(newSeed)
startTime = time.Now()
threads = args.Threads
go MiningLoop(jobChan, seed)
return r
}
func startJobDispatcher() chan *client.MultiChannelJob {
go func() {
err := cl.DispatchJobs()
func reconnectClient(args *PoolLoginArgs) <-chan *client.MultiClientJob {
loginName := args.Username
if args.Wallet != "" {
loginName = args.Wallet + "." + args.Username
}
sleepSec := 3 * time.Second // time to sleep if connection attempt fails
for {
err, code, message := cl.Connect("cryptonote.social:5555", false /*useTLS*/, args.Agent, loginName, args.Config, args.RigID)
if err != nil {
crylog.Error("Job dispatcher exitted with error:", err)
os.Exit(1)
if code != 0 {
crylog.Fatal("Pool server did not allow login due to error:", message)
panic("can't handle pool login error during reconnect")
}
crylog.Error("Couldn't connect to pool server:", err)
time.Sleep(sleepSec)
sleepSec += time.Second
}
clMutex.Lock()
if clientAlive {
clientAlive = false
cl.Close()
}
clMutex.Unlock()
}()
jobChan, _ := cl.StartDispatching()
return jobChan
}
}
func MiningLoop(lastSeed []byte) {
func MiningLoop(jobChan <-chan *client.MultiClientJob, lastSeed []byte) {
crylog.Info("Mining loop started")
jobChannel := startJobDispatcher()
resetRecentStats()
var job *client.MultiClientJob
for {
select {
case job = <-jobChannel:
case job = <-jobChan:
if job == nil {
crylog.Warn("stratum client died, reconnecting")
// TODO reconnectClient()
crylog.Warn("stratum client died")
jobChan = reconnectClient(plArgs)
continue
}
}
crylog.Info("Current job:", job.JobID, "Difficulty:", blockchain.TargetToDifficulty(job.Target))

View File

@ -14,6 +14,7 @@ import (
"github.com/cryptonote-social/csminer/crylog"
"io"
"net"
"sync"
"time"
)
@ -56,38 +57,49 @@ type MultiClientJob struct {
}
type Client struct {
JobChannel chan *MultiClientJob
FirstJob *MultiClientJob
address string
agent string
conn net.Conn
responseChannel chan *SubmitWorkResponse
jobChannel chan *MultiClientJob
firstJob *MultiClientJob
mutex sync.Mutex
alive bool // true when the stratum client is connected and healthy
}
func NewClient(address string, agent string) *Client {
return &Client{
address: address,
agent: agent,
}
func (cl *Client) IsAlive() bool {
cl.mutex.Lock()
defer cl.mutex.Unlock()
return cl.alive
}
// Connect to the stratum server port with the given login info. Returns error if connection could
// not be established, or if the stratum server itself returned an error. In the latter case,
// code and message will also be specified. If the stratum server returned just a warning, then
// error will be nil, but code & message will be specified.
func (cl *Client) Connect(uname, pw, rigid string, useTLS bool) (err error, code int, message string) {
//
// After calling Connect, if it returns successfully, the client is guaranteed to be in the alive
// state. Call StartDispatching() to begin job dispatching.
func (cl *Client) Connect(
address string, useTLS bool, agent string,
uname, pw, rigid string) (err error, code int, message string) {
cl.mutex.Lock()
defer cl.mutex.Unlock()
if cl.alive {
return errors.New("client already connected. call close first if you wish to reconnect"), 0, ""
}
if !useTLS {
cl.conn, err = net.DialTimeout("tcp", cl.address, time.Second*30)
cl.conn, err = net.DialTimeout("tcp", address, time.Second*30)
} else {
cl.conn, err = tls.Dial("tcp", cl.address, nil /*Config*/)
cl.conn, err = tls.Dial("tcp", address, nil /*Config*/)
}
if err != nil {
crylog.Error("Dial failed:", err, cl)
return err, 0, ""
}
cl.responseChannel = make(chan *SubmitWorkResponse)
cl.JobChannel = make(chan *MultiClientJob)
cl.jobChannel = make(chan *MultiClientJob)
// send login
loginRequest := &struct {
ID uint64 `json:"id"`
@ -152,13 +164,25 @@ func (cl *Client) Connect(uname, pw, rigid string, useTLS bool) (err error, code
crylog.Error("Didn't get job result from login response:", response.Error)
return errors.New("stratum server error"), response.Error.Code, response.Error.Message
}
cl.FirstJob = response.Result.Job
cl.alive = true
cl.firstJob = response.Result.Job
if response.Warning != nil {
return nil, response.Warning.Code, response.Warning.Message
}
return nil, 0, ""
}
func (cl *Client) StartDispatching() (jobChan <-chan *MultiClientJob, seedHash string) {
cl.mutex.Lock()
defer cl.mutex.Unlock()
if !cl.alive {
crylog.Fatal("must call connect successfully first")
return nil, ""
}
go cl.dispatchJobs()
return cl.jobChannel, cl.firstJob.SeedHash
}
func (cl *Client) SubmitMulticlientWork(username string, rigid string, nonce string, connNonce []byte, jobid string, targetDifficulty int64) (*SubmitWorkResponse, error) {
submitRequest := &struct {
ID uint64 `json:"id"`
@ -184,6 +208,8 @@ func (cl *Client) SubmitMulticlientWork(username string, rigid string, nonce str
}
func (cl *Client) submitRequest(submitRequest interface{}) (*SubmitWorkResponse, error) {
cl.mutex.Lock()
defer cl.mutex.Unlock()
data, err := json.Marshal(submitRequest)
if err != nil {
crylog.Error("json marshalling failed:", err, "for client:", cl)
@ -237,12 +263,21 @@ func (cl *Client) SubmitWork(nonce string, jobid string) (*SubmitWorkResponse, e
}
func (cl *Client) String() string {
cl.mutex.Lock()
defer cl.mutex.Unlock()
return "stratum_client:" + cl.address
}
func (cl *Client) Close() {
cl.mutex.Lock()
defer cl.mutex.Unlock()
if !cl.alive {
crylog.Warn("tried to close dead client")
return
}
cl.alive = false
cl.conn.Close()
close(cl.JobChannel)
close(cl.jobChannel)
close(cl.responseChannel)
}
@ -257,21 +292,32 @@ type SubmitWorkResponse struct {
// DispatchJobs will forward incoming jobs to the JobChannel until error is received or the
// connection is closed.
func (cl *Client) DispatchJobs() error {
cl.JobChannel <- cl.FirstJob
cl.FirstJob = nil
reader := bufio.NewReaderSize(cl.conn, MAX_REQUEST_SIZE)
func (cl *Client) dispatchJobs() {
cl.mutex.Lock()
cl.jobChannel <- cl.firstJob
cl.firstJob = nil
conn := cl.conn
cl.mutex.Unlock()
reader := bufio.NewReaderSize(conn, MAX_REQUEST_SIZE)
for {
response := &SubmitWorkResponse{}
cl.conn.SetReadDeadline(time.Now().Add(3600 * time.Second))
conn.SetReadDeadline(time.Now().Add(3600 * time.Second))
err := readJSON(response, reader)
if err != nil {
crylog.Error("readJSON failed:", err)
return err
crylog.Error("readJSON failed, closing client and exiting dispatch:", err)
cl.Close()
return
}
if response.Method != "job" {
if response.ID == SUBMIT_WORK_JSON_ID {
cl.mutex.Lock()
if !cl.alive {
cl.mutex.Unlock()
crylog.Info("exiting dead client dispatch loop")
return
}
cl.responseChannel <- response
cl.mutex.Unlock()
continue
}
crylog.Warn("Unexpected response:", *response)
@ -279,9 +325,16 @@ func (cl *Client) DispatchJobs() error {
}
if response.Job == nil {
crylog.Error("Didn't get job:", *response)
return errors.New("didn't get job as expected")
return
}
cl.JobChannel <- response.Job
cl.mutex.Lock()
if !cl.alive {
cl.mutex.Unlock()
crylog.Info("exiting dead client dispatch loop")
return
}
cl.jobChannel <- response.Job
cl.mutex.Unlock()
}
}