From bdabbed6ead469324994590b315f6611d6b98a52 Mon Sep 17 00:00:00 2001 From: cryptonote-social Date: Wed, 12 Aug 2020 12:01:50 -0700 Subject: [PATCH] yuuge progress on minerlib refactoring --- capi/capi.go | 8 +- capi/niceapi.h | 22 ++--- capi/test.c | 108 +++++++++++++--------- minerlib/minerlib.go | 191 ++++++++++++++++++++++++++++----------- rx/rx.go | 27 ++++-- stratum/client/client.go | 51 ++++++----- 6 files changed, 264 insertions(+), 143 deletions(-) diff --git a/capi/capi.go b/capi/capi.go index eec143b..147c848 100644 --- a/capi/capi.go +++ b/capi/capi.go @@ -26,14 +26,14 @@ func PoolLogin( return resp.Code, C.CString(resp.Message) } -//export StartMiner -func StartMiner(threads int, excludeHrStart, excludeHrEnd int) (code int, message *C.char) { - args := &minerlib.StartMinerArgs{ +//export InitMiner +func InitMiner(threads int, excludeHrStart, excludeHrEnd int) (code int, message *C.char) { + args := &minerlib.InitMinerArgs{ Threads: threads, ExcludeHourStart: excludeHrStart, ExcludeHourEnd: excludeHrEnd, } - resp := minerlib.StartMiner(args) + resp := minerlib.InitMiner(args) return resp.Code, C.CString(resp.Message) } diff --git a/capi/niceapi.h b/capi/niceapi.h index 4da74d6..346e8bb 100644 --- a/capi/niceapi.h +++ b/capi/niceapi.h @@ -54,7 +54,7 @@ pool_login_response pool_login(const pool_login_args *args) { } -typedef struct start_miner_args { +typedef struct init_miner_args { // threads specifies the initial # of threads to mine with. Must be >=1 int threads; @@ -62,28 +62,28 @@ typedef struct start_miner_args { // to 0 if there is no excluded range. int exclude_hour_start; int exclude_hour_end; -} start_miner_args; +} init_miner_args; -typedef struct start_miner_response { - // code == 1: miner started successfully. +typedef struct init_miner_response { + // code == 1: miner init successful. // - // code == 2: miner started successfully but hugepages could not be enabled, so mining may be + // code == 2: miner init successful but hugepages could not be enabled, so mining may be // slow. You can suggest to the user that a machine restart might help resolve this. // - // code > 2: miner failed to start due to bad config, see details in message. For example, an + // code > 2: miner init failed due to bad config, see details in message. For example, an // invalid number of threads or invalid hour range may have been specified. // // code < 0: non-recoverable error, message will provide details. program should exit after // showing message. int code; const char* message; // must be freed by the caller -} start_miner_response; +} init_miner_response; // call only after successful pool_login. This should only be called once! -start_miner_response start_miner(const start_miner_args *args) { - struct StartMiner_return r = - StartMiner((GoInt)args->threads, (GoInt)args->exclude_hour_start, (GoInt)args->exclude_hour_end); - start_miner_response response; +init_miner_response init_miner(const init_miner_args *args) { + struct InitMiner_return r = + InitMiner((GoInt)args->threads, (GoInt)args->exclude_hour_start, (GoInt)args->exclude_hour_end); + init_miner_response response; response.code = (int)r.r0; if (strlen(r.r1) > 0) { response.message = r.r1; diff --git a/capi/test.c b/capi/test.c index 543d7fc..372c4e1 100644 --- a/capi/test.c +++ b/capi/test.c @@ -5,49 +5,13 @@ #include "niceapi.h" int main(int argc, char* argv[]) { - // Pool login... - pool_login_args pl_args; - pl_args.username = "cryptonote-social"; - if (argc > 1) { - printf("using arg for username: %s\n", argv[1]); - pl_args.username = argv[1]; - } - pl_args.rigid = NULL; - - pl_args.wallet = NULL; - if (argc > 2) { - printf("using arg for wallet: %s\n", argv[2]); - pl_args.wallet = argv[2]; - } - - pl_args.agent = "Super Power Ultimate Miner (S.P.U.M.) v0.6.9"; - - pl_args.config = NULL; - - pool_login_response pl_resp = pool_login(&pl_args); - if (pl_resp.code < 0) { - printf("Oh no, login failed: %s\n", pl_resp.message); - free((void*)pl_resp.message); - return 1; - } - if (pl_resp.code > 1) { - printf("Pool server didn't like login info: %s\n", pl_resp.message); - free((void*)pl_resp.message); - return 2; - } - printf("Successful login.\n"); - if (pl_resp.message) { - printf(" Pool returned warning: %s\n", pl_resp.message); - free((void*)pl_resp.message); - } - - // Starting the miner.... - start_miner_args sm_args; - sm_args.threads = 1; + // Miner initialization + init_miner_args sm_args; + sm_args.threads = 2; sm_args.exclude_hour_start = 0; sm_args.exclude_hour_end = 0; - start_miner_response sm_resp = start_miner(&sm_args); + init_miner_response sm_resp = init_miner(&sm_args); if (sm_resp.code > 2) { printf("Bad config options specified: %s\n", sm_resp.message); free((void*)sm_resp.message); @@ -61,9 +25,67 @@ int main(int argc, char* argv[]) { if (sm_resp.code == 2) { printf("Huge Pages could not be enabled -- mining may be slow. Consider restarting your machine and trying again.\n"); } - printf("Miner started.\n"); + printf("Miner initialized.\n"); - printf("Sleeping indefinitely\n"); - sleep(99999); + pool_login_args pl_args; + pl_args.agent = "Super Power Ultimate Miner (S.P.U.M.) v0.6.9"; + pl_args.rigid = NULL; + pl_args.wallet = NULL; + pl_args.config = NULL; + + // Login loop. Alternate between 2 accounts every minute to make sure account switching works. + while (true) { + pl_args.username = "cryptonote-social"; + if (argc > 1) { + printf("using arg for username: %s\n", argv[1]); + pl_args.username = argv[1]; + } + if (argc > 2) { + printf("using arg for wallet: %s\n", argv[2]); + pl_args.wallet = argv[2]; + } + printf("Logging in with user: %s\n", pl_args.username); + pool_login_response pl_resp = pool_login(&pl_args); + if (pl_resp.code < 0) { + printf("Oh no, login failed: %s\n", pl_resp.message); + free((void*)pl_resp.message); + return 1; + } + if (pl_resp.code > 1) { + printf("Pool server didn't like login info: %s\n", pl_resp.message); + free((void*)pl_resp.message); + return 2; + } + printf("Successful login #1.\n"); + if (pl_resp.message) { + printf(" Pool returned warning: %s\n", pl_resp.message); + free((void*)pl_resp.message); + } + + printf("Sleeping for a minute before trying another login.\n"); + sleep(60); + + printf("Trying to login with a new user (donate-getmonero-org).\n"); + pl_args.username = "donate-getmonero-org"; + pl_resp = pool_login(&pl_args); + if (pl_resp.code < 0) { + printf("Oh no, login 2 failed: %s\n", pl_resp.message); + free((void*)pl_resp.message); + return 1; + } + if (pl_resp.code > 1) { + printf("Pool server didn't like login 2 info: %s\n", pl_resp.message); + free((void*)pl_resp.message); + return 2; + } + printf("Successful login #2.\n"); + if (pl_resp.message) { + printf(" Pool returned warning: %s\n", pl_resp.message); + free((void*)pl_resp.message); + } + + printf("Sleeping for a minute before looping again.\n"); + sleep(60); + } return 0; } diff --git a/minerlib/minerlib.go b/minerlib/minerlib.go index 203a1fc..55653bd 100644 --- a/minerlib/minerlib.go +++ b/minerlib/minerlib.go @@ -9,7 +9,7 @@ import ( "bytes" "encoding/hex" "runtime" - // "sync" + "sync" "sync/atomic" "time" ) @@ -35,13 +35,6 @@ const ( ) var ( - plArgs *PoolLoginArgs - cl client.Client - - screenIdle int32 // only mine when this is > 0 - batteryPower int32 // only mine when this is > 0 - manualMinerToggle int32 // whether paused mining has been manually overridden - // miner client stats sharesAccepted int64 sharesRejected int64 @@ -52,9 +45,20 @@ var ( lastStatsUpdateTime time.Time // miner config - threads int + configMutex sync.Mutex + plArgs *PoolLoginArgs + threads int + loggedIn bool + + // stratum client + cl client.Client ) +func tallyHashes(hashes int64) { + atomic.AddInt64(&clientSideHashes, hashes) + atomic.AddInt64(&recentHashes, hashes) +} + func resetRecentStats() { atomic.StoreInt64(&recentHashes, 0) lastStatsResetTime = time.Now() @@ -95,17 +99,19 @@ type PoolLoginResponse struct { } func PoolLogin(args *PoolLoginArgs) *PoolLoginResponse { - r := &PoolLoginResponse{} - - screenIdle = 0 - batteryPower = 0 - manualMinerToggle = 0 - + configMutex.Lock() + defer configMutex.Unlock() loginName := args.Username if args.Wallet != "" { loginName = args.Wallet + "." + args.Username } - err, code, message := cl.Connect("cryptonote.social:5555", false /*useTLS*/, args.Agent, loginName, args.Config, args.RigID) + agent := args.Agent + config := args.Config + rigid := args.RigID + loggedIn = false + + r := &PoolLoginResponse{} + err, code, message := cl.Connect("cryptonote.social:5555", false /*useTLS*/, agent, loginName, config, rigid) if err != nil { if code != 0 { crylog.Error("Pool server did not allow login due to error:") @@ -132,13 +138,15 @@ func PoolLogin(args *PoolLoginArgs) *PoolLoginResponse { crylog.Warn(":::::::::::::::::::::::::::::::::::::::::::::::::::::::::\n") r.Message = message } + // login successful - r.Code = 1 + loggedIn = true plArgs = args + r.Code = 1 return r } -type StartMinerArgs struct { +type InitMinerArgs struct { // threads specifies the initial # of threads to mine with. Must be >=1 Threads int @@ -147,13 +155,13 @@ type StartMinerArgs struct { ExcludeHourStart, ExcludeHourEnd int } -type StartMinerResponse struct { - // code == 1: miner started successfully. +type InitMinerResponse struct { + // code == 1: miner init successful // - // code == 2: miner started successfully but hugepages could not be enabled, so mining may be + // code == 2: miner init successful but hugepages could not be enabled, so mining may be // slow. You can suggest to the user that a machine restart might help resolve this. // - // code > 2: miner failed to start due to bad config, see details in message. For example, an + // code > 2: miner init failed due to bad config, see details in message. For example, an // invalid number of threads or invalid hour range may have been specified. // // code < 0: non-recoverable error, message will provide details. program should exit after @@ -162,10 +170,10 @@ type StartMinerResponse struct { Message string } -// StartMiner configures the miner and must be called only after a call to PoolLogin was -// successful. You should only call this method once. -func StartMiner(args *StartMinerArgs) *StartMinerResponse { - r := &StartMinerResponse{} +// InitMiner configures the miner and must be called exactly once before any other method +// is called. +func InitMiner(args *InitMinerArgs) *InitMinerResponse { + r := &InitMinerResponse{} hr1 := args.ExcludeHourStart hr2 := args.ExcludeHourEnd if hr1 > 24 || hr1 < 0 || hr2 > 24 || hr1 < 0 { @@ -173,21 +181,7 @@ func StartMiner(args *StartMinerArgs) *StartMinerResponse { r.Message = "exclude_hour_start and exclude_hour_end must each be between 0 and 24" return r } - // Make sure connection was established - if !cl.IsAlive() { - r.Code = -1 - r.Message = "StartMiner cannot be called until you first make a successful call to PoolLogin" - return r - } - jobChan, seedHashStr := cl.StartDispatching() - seed, err := hex.DecodeString(seedHashStr) - if err != nil { - // shouldn't happen? - r.Code = -2 - r.Message = "Could not decode initial RandomX seed hash from pool" - return r - } - code := rx.InitRX(seed, args.Threads, runtime.GOMAXPROCS(0)) + code := rx.InitRX(args.Threads) if code < 0 { crylog.Error("Failed to initialize RandomX") r.Code = -3 @@ -201,50 +195,87 @@ func StartMiner(args *StartMinerArgs) *StartMinerResponse { } startTime = time.Now() threads = args.Threads - go MiningLoop(jobChan, seed) + go func() { + MiningLoop(awaitLogin()) + }() return r } -func reconnectClient(args *PoolLoginArgs) <-chan *client.MultiClientJob { - loginName := args.Username - if args.Wallet != "" { - loginName = args.Wallet + "." + args.Username +func awaitLogin() <-chan *client.MultiClientJob { + crylog.Info("Awaiting login") + for { + configMutex.Lock() + li := loggedIn + configMutex.Unlock() + if li { + crylog.Info("Logged in!") + return cl.StartDispatching() + } + time.Sleep(time.Second) } +} + +func reconnectClient() <-chan *client.MultiClientJob { sleepSec := 3 * time.Second // time to sleep if connection attempt fails for { + configMutex.Lock() + if !loggedIn { + // Client is being reconnected by the user, await until successful. + configMutex.Unlock() + return awaitLogin() + } + loginName := plArgs.Username + if plArgs.Wallet != "" { + loginName = plArgs.Wallet + "." + plArgs.Username + } + agent := plArgs.Agent + config := plArgs.Config + rigid := plArgs.RigID + crylog.Info("Reconnecting...") - err, code, message := cl.Connect("cryptonote.social:5555", false /*useTLS*/, args.Agent, loginName, args.Config, args.RigID) + err, code, message := cl.Connect("cryptonote.social:5555", false /*useTLS*/, agent, loginName, config, rigid) + configMutex.Unlock() if err == nil { - crylog.Info("Reconnected.") - jobChan, _ := cl.StartDispatching() - return jobChan + return awaitLogin() } if code != 0 { crylog.Fatal("Pool server did not allow login due to error:", message) panic("can't handle pool login error during reconnect") } crylog.Error("Couldn't connect to pool server:", err) + crylog.Info("Sleeping for", sleepSec, "seconds before trying again.") time.Sleep(sleepSec) sleepSec += time.Second } } -func MiningLoop(jobChan <-chan *client.MultiClientJob, lastSeed []byte) { +func MiningLoop(jobChan <-chan *client.MultiClientJob) { crylog.Info("Mining loop started") + lastSeed := []byte{} + + // Synchronization vars + var wg sync.WaitGroup // used to wait for stopped worker threads to finish + var stopper uint32 // atomic int used to signal rxlib worker threads to stop mining resetRecentStats() - var job *client.MultiClientJob + //wasJustMining := false + for { + var job *client.MultiClientJob select { case job = <-jobChan: if job == nil { crylog.Warn("stratum client died") - jobChan = reconnectClient(plArgs) + jobChan = reconnectClient() continue } } crylog.Info("Current job:", job.JobID, "Difficulty:", blockchain.TargetToDifficulty(job.Target)) + // Stop existing mining, if any, and wait for mining threads to finish. + atomic.StoreUint32(&stopper, 1) + wg.Wait() + // Check if we need to reinitialize rx dataset newSeed, err := hex.DecodeString(job.SeedHash) if err != nil { @@ -253,11 +284,63 @@ func MiningLoop(jobChan <-chan *client.MultiClientJob, lastSeed []byte) { } if bytes.Compare(newSeed, lastSeed) != 0 { crylog.Info("New seed:", job.SeedHash) - rx.InitRX(newSeed, threads, runtime.GOMAXPROCS(0)) + rx.SeedRX(newSeed, runtime.GOMAXPROCS(0)) lastSeed = newSeed resetRecentStats() } + + atomic.StoreUint32(&stopper, 0) + for i := 0; i < threads; i++ { + wg.Add(1) + go goMine(&wg, &stopper, *job, i /*thread*/) + } } defer crylog.Info("Mining loop terminated") } + +func goMine(wg *sync.WaitGroup, stopper *uint32, job client.MultiClientJob, thread int) { + defer wg.Done() + input, err := hex.DecodeString(job.Blob) + diffTarget := blockchain.TargetToDifficulty(job.Target) + if err != nil { + crylog.Error("invalid blob:", job.Blob) + return + } + + hash := make([]byte, 32) + nonce := make([]byte, 4) + + for { + res := rx.HashUntil(input, uint64(diffTarget), thread, hash, nonce, stopper) + if res <= 0 { + tallyHashes(-res) + break + } + tallyHashes(res) + crylog.Info("Share found by thread", thread, "w/ target:", blockchain.HashDifficulty(hash)) + fnonce := hex.EncodeToString(nonce) + // If the client is alive, submit the share in a separate thread so we can resume hashing + // immediately, otherwise wait until it's alive. + for { + if cl.IsAlive() { + break + } + time.Sleep(time.Second) + } + go func(fnonce, jobid string) { + resp, err := cl.SubmitWork(fnonce, jobid) + if err != nil { + crylog.Warn("Submit work client failure:", jobid, err) + return + } + if len(resp.Error) > 0 { + atomic.AddInt64(&sharesRejected, 1) + crylog.Warn("Submit work server error:", jobid, resp.Error) + return + } + atomic.AddInt64(&sharesAccepted, 1) + atomic.AddInt64(&poolSideHashes, diffTarget) + }(fnonce, job.JobID) + } +} diff --git a/rx/rx.go b/rx/rx.go index 457081e..989e6af 100644 --- a/rx/rx.go +++ b/rx/rx.go @@ -19,20 +19,27 @@ import ( ) // Call this every time the seed hash provided by the daemon changes before performing any hashing. +// Only call when all existing threads are stopped. Returns false if an unrecoverable error +// occurred. +func SeedRX(seedHash []byte, initThreads int) bool { + if len(seedHash) == 0 { + crylog.Error("Bad seed hash:", seedHash) + return false + } + b := C.seed_rxlib( + (*C.char)(unsafe.Pointer(&seedHash[0])), + (C.uint32_t)(len(seedHash)), + (C.int)(initThreads)) + return bool(b) +} + +// Call this once. // return values: // 1: success // 2: success, but no huge pages. // -1: unexpected failure -func InitRX(seedHash []byte, threads int, initThreads int) int { - if len(seedHash) == 0 { - crylog.Error("Bad seed hash:", seedHash) - return -1 - } - i := C.init_rxlib( - (*C.char)(unsafe.Pointer(&seedHash[0])), - (C.uint32_t)(len(seedHash)), - (C.int)(threads), - (C.int)(initThreads)) +func InitRX(threads int) int { + i := C.init_rxlib((C.int)(threads)) return int(i) } diff --git a/stratum/client/client.go b/stratum/client/client.go index 042d0bc..1722eba 100644 --- a/stratum/client/client.go +++ b/stratum/client/client.go @@ -86,7 +86,8 @@ func (cl *Client) Connect( cl.mutex.Lock() defer cl.mutex.Unlock() if cl.alive { - return errors.New("client already connected. call close first if you wish to reconnect"), 0, "" + crylog.Warn("client already connected. closing then proceeding to reconnect") + cl.close() } cl.address = address if !useTLS { @@ -172,17 +173,18 @@ func (cl *Client) Connect( return nil, 0, "" } -func (cl *Client) StartDispatching() (jobChan <-chan *MultiClientJob, seedHash string) { +func (cl *Client) StartDispatching() (jobChan <-chan *MultiClientJob) { cl.mutex.Lock() defer cl.mutex.Unlock() if !cl.alive { crylog.Fatal("must call connect successfully first") - return nil, "" + return nil } go cl.dispatchJobs() - return cl.jobChannel, cl.firstJob.SeedHash + return cl.jobChannel } +// if error is returned then client will be closed and put in not-alive state func (cl *Client) SubmitMulticlientWork(username string, rigid string, nonce string, connNonce []byte, jobid string, targetDifficulty int64) (*SubmitWorkResponse, error) { submitRequest := &struct { ID uint64 `json:"id"` @@ -207,43 +209,45 @@ func (cl *Client) SubmitMulticlientWork(username string, rigid string, nonce str return cl.submitRequest(submitRequest) } +// if error is returned then client will be closed and put in not-alive state func (cl *Client) submitRequest(submitRequest interface{}) (*SubmitWorkResponse, error) { cl.mutex.Lock() - defer cl.mutex.Unlock() + if !cl.alive { + return nil, errors.New("client not alive") + } data, err := json.Marshal(submitRequest) if err != nil { crylog.Error("json marshalling failed:", err, "for client:", cl) + cl.close() + cl.mutex.Unlock() return nil, err } cl.conn.SetWriteDeadline(time.Now().Add(60 * time.Second)) data = append(data, '\n') if _, err = cl.conn.Write(data); err != nil { crylog.Error("writing request failed:", err, "for client:", cl) + cl.close() + cl.mutex.Unlock() return nil, err } - timeout := make(chan bool) - go func() { - time.Sleep(30 * time.Second) - timeout <- true - }() - var response *SubmitWorkResponse - select { - case response = <-cl.responseChannel: - case <-timeout: - crylog.Error("response timeout") - return nil, fmt.Errorf("submit work failure: response timeout") - } + respChan := cl.responseChannel + cl.mutex.Unlock() + + // await the response + response := <-respChan if response == nil { - crylog.Error("got nil response") + crylog.Error("got nil response, client closed?", cl.IsAlive()) return nil, fmt.Errorf("submit work failure: nil response") } if response.ID != SUBMIT_WORK_JSON_ID { - crylog.Error("got unexpected response:", response.ID) + crylog.Error("got unexpected response:", response.ID, "Closing connection.") + cl.Close() return nil, fmt.Errorf("submit work failure: unexpected response") } return response, nil } +// if error is returned then client will be closed and put in not-alive state func (cl *Client) SubmitWork(nonce string, jobid string) (*SubmitWorkResponse, error) { submitRequest := &struct { ID uint64 `json:"id"` @@ -269,6 +273,10 @@ func (cl *Client) String() string { func (cl *Client) Close() { cl.mutex.Lock() defer cl.mutex.Unlock() + cl.close() +} + +func (cl *Client) close() { if !cl.alive { crylog.Warn("tried to close dead client") return @@ -289,7 +297,7 @@ type SubmitWorkResponse struct { } // DispatchJobs will forward incoming jobs to the JobChannel until error is received or the -// connection is closed. +// connection is closed. Client will be in not-alive state on return. func (cl *Client) dispatchJobs() { cl.mutex.Lock() cl.jobChannel <- cl.firstJob @@ -322,7 +330,8 @@ func (cl *Client) dispatchJobs() { continue } if response.Job == nil { - crylog.Error("Didn't get job:", *response) + crylog.Error("Didn't get job as expected:", *response) + cl.Close() return } cl.mutex.Lock()