initial client work for chat support

This commit is contained in:
cryptonote-social 2020-12-24 16:49:37 -08:00
parent 29adb10295
commit d908527792
8 changed files with 364 additions and 66 deletions

View File

@ -4,6 +4,7 @@ import "C"
import ( import (
"github.com/cryptonote-social/csminer/minerlib" "github.com/cryptonote-social/csminer/minerlib"
"github.com/cryptonote-social/csminer/minerlib/chat"
) )
//export PoolLogin //export PoolLogin
@ -46,12 +47,31 @@ func GetMinerState() (
secondsOld int, secondsOld int,
lifetimeHashes int64, lifetimeHashes int64,
paid, owed, accumulated float64, paid, owed, accumulated float64,
timeToReward *C.char) { timeToReward *C.char,
chatsAvailable bool) {
resp := minerlib.GetMiningState() resp := minerlib.GetMiningState()
return resp.MiningActivity, resp.Threads, resp.RecentHashrate, return resp.MiningActivity, resp.Threads, resp.RecentHashrate,
C.CString(resp.PoolUsername), resp.SecondsOld, resp.LifetimeHashes, C.CString(resp.PoolUsername), resp.SecondsOld, resp.LifetimeHashes,
resp.Paid, resp.Owed, resp.Accumulated, C.CString(resp.TimeToReward) resp.Paid, resp.Owed, resp.Accumulated, C.CString(resp.TimeToReward),
resp.ChatsAvailable
}
//export NextChat
func NextChat() (
username *C.char,
message *C.char,
timestamp int64) {
nc := chat.NextChatReceived()
if nc == nil {
return C.CString(""), C.CString(""), 0
}
return C.CString(nc.Username), C.CString(nc.Message), nc.Timestamp
}
//export SendChat
func SendChat(message *C.char) {
chat.SendChat(C.GoString(message))
} }
//export IncreaseThreads //export IncreaseThreads

View File

@ -3,6 +3,7 @@
#include <stdbool.h> #include <stdbool.h>
#include <stddef.h> #include <stddef.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdint.h>
#include <string.h> #include <string.h>
typedef struct pool_login_args { typedef struct pool_login_args {
@ -114,6 +115,10 @@ typedef struct get_miner_state_response {
// //
// MINING_ACTIVE_USER_OVERRIDE = 2 // MINING_ACTIVE_USER_OVERRIDE = 2
// indicates miner is actively mining, and is in "user forced active mining override" state. // indicates miner is actively mining, and is in "user forced active mining override" state.
//
// MINING_ACTIVE_CHATS_TO_SEND = 3
// indicates miner is actively mining to generate a share so that a chat message can be delivered.
int mining_activity; int mining_activity;
int threads; // number of threads actively mining int threads; // number of threads actively mining
@ -149,6 +154,8 @@ typedef struct get_miner_state_response {
const char* time_to_reward; // An estimate of the time to next reward in a pretty-printable const char* time_to_reward; // An estimate of the time to next reward in a pretty-printable
// format, e.g. "3.5 days". This is just an estimate based on pool // format, e.g. "3.5 days". This is just an estimate based on pool
// hashrate and other dynamic factors // hashrate and other dynamic factors
bool chats_available; // whether there are chat messages available to display (see next_chat)
} get_miner_state_response; } get_miner_state_response;
get_miner_state_response get_miner_state() { get_miner_state_response get_miner_state() {
@ -158,16 +165,42 @@ get_miner_state_response get_miner_state() {
response.threads = (int)r.r1; response.threads = (int)r.r1;
response.recent_hashrate = (float)r.r2; response.recent_hashrate = (float)r.r2;
response.username = r.r3; response.username = r.r3;
response.time_to_reward = r.r9;
response.seconds_old = (int)r.r4; response.seconds_old = (int)r.r4;
response.lifetime_hashes = (long)r.r5; response.lifetime_hashes = (long)r.r5;
response.paid = (float)r.r6; response.paid = (float)r.r6;
response.owed = (float)r.r7; response.owed = (float)r.r7;
response.accumulated = (float)r.r8; response.accumulated = (float)r.r8;
response.time_to_reward = r.r9;
response.chats_available = (bool)r.r10;
return response; return response;
} }
typedef struct next_chat_response {
// NOTE: you must free() each const char*
const char* username; // username of the user who sent the chat (ascii)
const char* message; // the chat message (unicode)
int64_t timestamp; // unix timestamp of when the chat was received by chat server
} next_chat_response;
// Return the next available chat message. If there are no chat messages left to return,
// the chat response will have empty username/message
next_chat_response next_chat() {
struct NextChat_return r = NextChat();
next_chat_response response;
response.username = r.r0;
response.message = r.r1;
response.timestamp = (int64_t)r.r2;
return response;
}
// Queue a chat message for sending. Returns a code indicating if successful (0) or not. Right now
// only success is returned. Message might not be sent immediately, e.g. miner may wait to send it
// with the next mined share.
int send_chat(char *message) {
SendChat(message);
return 0;
}
// Increase the number of threads by 1. This may fail. get_miner_state will // Increase the number of threads by 1. This may fail. get_miner_state will
// always report the true number of current threads. // always report the true number of current threads.
void increase_threads() { void increase_threads() {

View File

@ -30,13 +30,30 @@ int main(int argc, char* argv[]) {
report_lock_screen_state(true); // pretend screen is locked so we will mine report_lock_screen_state(true); // pretend screen is locked so we will mine
pool_login_args pl_args; pool_login_args pl_args;
pl_args.agent = "Super Power Ultimate Miner (S.P.U.M.) v0.6.9"; pl_args.agent = "csminer / minerlib test script";
pl_args.rigid = NULL; pl_args.rigid = NULL;
pl_args.wallet = NULL; pl_args.wallet = NULL;
pl_args.config = NULL; pl_args.config = NULL;
get_miner_state_response ms_resp;
// Login loop. Alternate between 2 accounts every minute to make sure account switching works. // Login loop. Alternate between 2 accounts every minute to make sure account switching works.
while (true) { while (true) {
printf("Entering get_miner_state polling loop, 30 polls with 1 second sleep inbetween\n");
for (int i = 0; i < 30; ++i) {
ms_resp = get_miner_state();
printf("Hashrate was: %f\n", ms_resp.recent_hashrate);
printf("Threads active: %d\n", ms_resp.threads);
printf("Mining activity state: %d\n", ms_resp.mining_activity);
free((void*)ms_resp.username);
free((void*)ms_resp.time_to_reward);
increase_threads();
sleep(.5);
decrease_threads();
sleep(.5);
}
pl_args.username = "cryptonote-social"; pl_args.username = "cryptonote-social";
if (argc > 1) { if (argc > 1) {
printf("using arg for username: %s\n", argv[1]); printf("using arg for username: %s\n", argv[1]);
@ -61,6 +78,28 @@ int main(int argc, char* argv[]) {
} }
} }
free((void*)pl_resp.message); free((void*)pl_resp.message);
send_chat("testing chat sending this is the chat message");
printf("Entering get_miner_state polling loop, 60 polls with 1 second sleep inbetween\n");
for (int i = 0; i < 60; ++i) {
ms_resp = get_miner_state();
printf("Hashrate was: %f\n", ms_resp.recent_hashrate);
printf("Threads active: %d\n", ms_resp.threads);
printf("Mining activity state: %d\n", ms_resp.mining_activity);
printf("Chats available: %s\n", ms_resp.chats_available ? "yes" : "no");
if (ms_resp.chats_available) {
next_chat_response nc_resp;
nc_resp = next_chat();
printf("Got chat message: [ %s ] %s (%ld)\n", nc_resp.username, nc_resp.message, nc_resp.timestamp);
free((void*)nc_resp.username);
free((void*)nc_resp.message);
}
free((void*)ms_resp.username);
free((void*)ms_resp.time_to_reward);
sleep(1);
}
sleep(10); sleep(10);
printf("Setting screen state to active\n"); printf("Setting screen state to active\n");
@ -76,8 +115,8 @@ int main(int argc, char* argv[]) {
report_power_state(false); report_power_state(false);
printf("Sleeping for 2 minutes before trying another login.\n"); printf("Sleeping for 2 minutes before trying another login.\n");
sleep(60); sleep(30);
get_miner_state_response ms_resp = get_miner_state(); ms_resp = get_miner_state();
printf("Hashrate was: %f\n", ms_resp.recent_hashrate); printf("Hashrate was: %f\n", ms_resp.recent_hashrate);
printf("Threads active: %d\n", ms_resp.threads); printf("Threads active: %d\n", ms_resp.threads);
printf("Mining activity state: %d\n", ms_resp.mining_activity); printf("Mining activity state: %d\n", ms_resp.mining_activity);
@ -93,6 +132,14 @@ int main(int argc, char* argv[]) {
printf("Hashrate was: %f\n", ms_resp.recent_hashrate); printf("Hashrate was: %f\n", ms_resp.recent_hashrate);
printf("Threads active: %d\n", ms_resp.threads); printf("Threads active: %d\n", ms_resp.threads);
printf("Mining activity state: %d\n", ms_resp.mining_activity); printf("Mining activity state: %d\n", ms_resp.mining_activity);
printf("Chats available: %s\n", ms_resp.chats_available ? "yes" : "no");
if (ms_resp.chats_available) {
next_chat_response nc_resp;
nc_resp = next_chat();
printf("Got chat message: [ %s ] %s (%ld)\n", nc_resp.username, nc_resp.message, nc_resp.timestamp);
free((void*)nc_resp.username);
free((void*)nc_resp.message);
}
free((void*)ms_resp.username); free((void*)ms_resp.username);
free((void*)ms_resp.time_to_reward); free((void*)ms_resp.time_to_reward);
sleep(1); sleep(1);
@ -115,8 +162,8 @@ int main(int argc, char* argv[]) {
} }
free((void*)pl_resp.message); free((void*)pl_resp.message);
printf("Sleeping for 2 minutes before looping again.\n"); printf("Sleeping for 30 sec before looping again.\n");
sleep(60); sleep(30);
ms_resp = get_miner_state(); ms_resp = get_miner_state();
printf("Hashrate was: %f\n", ms_resp.recent_hashrate); printf("Hashrate was: %f\n", ms_resp.recent_hashrate);
printf("Threads active: %d\n", ms_resp.threads); printf("Threads active: %d\n", ms_resp.threads);
@ -126,8 +173,8 @@ int main(int argc, char* argv[]) {
printf("Decreasing threads\n"); printf("Decreasing threads\n");
decrease_threads(); decrease_threads();
printf("Entering get_miner_state polling loop, 60 polls with 1 second sleep inbetween\n"); printf("Entering get_miner_state polling loop, 30 polls with 1 second sleep inbetween\n");
for (int i = 0; i < 60; ++i) { for (int i = 0; i < 30; ++i) {
ms_resp = get_miner_state(); ms_resp = get_miner_state();
printf("Hashrate was: %f\n", ms_resp.recent_hashrate); printf("Hashrate was: %f\n", ms_resp.recent_hashrate);
printf("Threads active: %d\n", ms_resp.threads); printf("Threads active: %d\n", ms_resp.threads);

View File

@ -12,7 +12,7 @@ import (
const ( const (
APPLICATION_NAME = "cryptonote.social Monero miner" APPLICATION_NAME = "cryptonote.social Monero miner"
VERSION_STRING = "0.2.0" VERSION_STRING = "0.3.0"
STATS_WEBPAGE = "https://cryptonote.social/xmr" STATS_WEBPAGE = "https://cryptonote.social/xmr"
DONATE_USERNAME = "donate-getmonero-org" DONATE_USERNAME = "donate-getmonero-org"

View File

@ -7,10 +7,12 @@ import (
"errors" "errors"
"os" "os"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/cryptonote-social/csminer/crylog" "github.com/cryptonote-social/csminer/crylog"
"github.com/cryptonote-social/csminer/minerlib" "github.com/cryptonote-social/csminer/minerlib"
"github.com/cryptonote-social/csminer/minerlib/chat"
"github.com/cryptonote-social/csminer/stratum/client" "github.com/cryptonote-social/csminer/stratum/client"
) )
@ -141,6 +143,11 @@ func Mine(c *MinerConfig) error {
minerlib.RemoveMiningActivityOverride() minerlib.RemoveMiningActivityOverride()
} }
} }
if strings.HasPrefix(b, "c ") {
chatMsg := b[2:]
chat.SendChat(chatMsg)
}
} }
crylog.Error("Scanning terminated") crylog.Error("Scanning terminated")
return errors.New("didn't expect keyboard scanning to terminate") return errors.New("didn't expect keyboard scanning to terminate")
@ -220,8 +227,11 @@ func printStatsPeriodically() {
} }
printStats(false) printStats(false)
for { for {
<-time.After(30 * time.Second) <-time.After(3 * time.Second)
printStats(true) // print full stats only if actively mining //printStats(true) // print full stats only if actively mining
for c := chat.NextChatReceived(); c != nil; c = chat.NextChatReceived() {
crylog.Info("\n\nCHAT MESSAGE RECEIVED:\n[", c.Username, "] ", c.Message, "\n")
}
} }
} }
@ -256,6 +266,8 @@ func getActivityMessage(activityState int) string {
return "ACTIVE" return "ACTIVE"
case minerlib.MINING_ACTIVE_USER_OVERRIDE: case minerlib.MINING_ACTIVE_USER_OVERRIDE:
return "ACTIVE: keyboard override. <enter> to undo override." return "ACTIVE: keyboard override. <enter> to undo override."
case minerlib.MINING_ACTIVE_CHATS_TO_SEND:
return "ACTIVE: sending chat message."
} }
crylog.Fatal("Unknown activity state:", activityState) crylog.Fatal("Unknown activity state:", activityState)
if activityState > 0 { if activityState > 0 {

93
minerlib/chat/chat.go Normal file
View File

@ -0,0 +1,93 @@
package chat
import (
"github.com/cryptonote-social/csminer/crylog"
"github.com/cryptonote-social/csminer/stratum/client"
"sync"
)
var (
mutex sync.Mutex
chatQueue []string
chatToSendIndex int
receivedQueue []*client.ChatResult
chatReceivedIndex int
nextToken int
)
func SendChat(chat string) {
mutex.Lock()
defer mutex.Unlock()
chatQueue = append(chatQueue, chat)
crylog.Info("Chat queued for sending:", chat)
}
// GetChatToSend returns the next queued chat message that needs to be delivered. The function
// will return the same result until ChatSent is called. It will return (nil, -1) if there are no
// chats to send at this time.
func GetChatToSend() (chat string, id int) {
mutex.Lock()
defer mutex.Unlock()
if chatToSendIndex >= len(chatQueue) {
return "", -1
}
return chatQueue[chatToSendIndex], chatToSendIndex
}
func HasChatsToSend() bool {
mutex.Lock()
defer mutex.Unlock()
return chatToSendIndex < len(chatQueue)
}
func ChatSent(id int) {
mutex.Lock()
defer mutex.Unlock()
if id == chatToSendIndex {
crylog.Info("Chat message delivered:", chatQueue[id])
chatToSendIndex++
}
}
func ChatsReceived(chats []client.ChatResult, chatToken int, fetchedToken int) {
if len(chats) == 0 {
return
}
crylog.Info("Chats received:", chats)
mutex.Lock()
defer mutex.Unlock()
if nextToken != fetchedToken {
crylog.Warn("Skipping dupe chats:", chats)
return // these chats are already handled
}
for i := range chats {
receivedQueue = append(receivedQueue, &chats[i])
}
nextToken = chatToken
}
func HasChats() bool {
mutex.Lock()
defer mutex.Unlock()
return chatReceivedIndex < len(receivedQueue)
}
func NextChatReceived() *client.ChatResult {
mutex.Lock()
defer mutex.Unlock()
if chatReceivedIndex < len(receivedQueue) {
chatReceivedIndex++
return receivedQueue[chatReceivedIndex-1]
}
return nil
}
func NextToken() int {
mutex.Lock()
defer mutex.Unlock()
return nextToken
}

View File

@ -3,12 +3,14 @@ package minerlib
import ( import (
"github.com/cryptonote-social/csminer/blockchain" "github.com/cryptonote-social/csminer/blockchain"
"github.com/cryptonote-social/csminer/crylog" "github.com/cryptonote-social/csminer/crylog"
"github.com/cryptonote-social/csminer/minerlib/chat"
"github.com/cryptonote-social/csminer/minerlib/stats" "github.com/cryptonote-social/csminer/minerlib/stats"
"github.com/cryptonote-social/csminer/rx" "github.com/cryptonote-social/csminer/rx"
"github.com/cryptonote-social/csminer/stratum/client" "github.com/cryptonote-social/csminer/stratum/client"
"bytes" "bytes"
"encoding/hex" "encoding/hex"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"runtime" "runtime"
@ -46,6 +48,9 @@ const (
// Indicates miner is actively mining due to user-initiated override // Indicates miner is actively mining due to user-initiated override
MINING_ACTIVE_USER_OVERRIDE = 2 MINING_ACTIVE_USER_OVERRIDE = 2
// Indicates miner is actively mining to deliver a chat message
MINING_ACTIVE_CHATS_TO_SEND = 3
// for PokeChannel stuff: // for PokeChannel stuff:
HANDLED = 1 HANDLED = 1
USE_CACHED = 2 USE_CACHED = 2
@ -149,6 +154,10 @@ func getMiningActivityState() int {
return MINING_ACTIVE_USER_OVERRIDE return MINING_ACTIVE_USER_OVERRIDE
} }
if chat.HasChatsToSend() {
return MINING_ACTIVE_CHATS_TO_SEND
}
if timeExcluded() { if timeExcluded() {
return MINING_PAUSED_TIME_EXCLUDED return MINING_PAUSED_TIME_EXCLUDED
} }
@ -383,6 +392,10 @@ func MiningLoop(jobChan <-chan *client.MultiClientJob, done chan<- bool) {
stopWorkers() stopWorkers()
if job.ChatToken != chat.NextToken() {
go GetChats()
}
// Check if we need to reinitialize rx dataset // Check if we need to reinitialize rx dataset
newSeed, err := hex.DecodeString(job.SeedHash) newSeed, err := hex.DecodeString(job.SeedHash)
if err != nil { if err != nil {
@ -470,6 +483,7 @@ type GetMiningStateResponse struct {
stats.Snapshot stats.Snapshot
MiningActivity int MiningActivity int
Threads int Threads int
ChatsAvailable bool
} }
// poke the job dispatcher to refresh recent stats. result may not be immediate but should happen // poke the job dispatcher to refresh recent stats. result may not be immediate but should happen
@ -505,6 +519,7 @@ func GetMiningState() *GetMiningStateResponse {
Snapshot: *s, Snapshot: *s,
MiningActivity: as, MiningActivity: as,
Threads: threads, Threads: threads,
ChatsAvailable: chat.HasChats(),
} }
} }
@ -600,6 +615,23 @@ func printStats(isMining bool) {
} }
*/ */
func GetChats() {
nt := chat.NextToken()
crylog.Info("Getting chats:", nt)
resp, err := cl.GetChats(nt)
if err != nil {
crylog.Error("Failed to retrieve chats:", nt, err)
}
cr := &client.GetChatsResult{}
err = json.Unmarshal(*resp.Result, cr)
if err != nil {
crylog.Warn("Failed to unmarshal GetChatsResult:", err)
cl.Close()
return
}
chat.ChatsReceived(cr.Chats, cr.NextToken, nt)
}
func goMine(job client.MultiClientJob, thread int) { func goMine(job client.MultiClientJob, thread int) {
defer wg.Done() defer wg.Done()
input, err := hex.DecodeString(job.Blob) input, err := hex.DecodeString(job.Blob)
@ -631,10 +663,12 @@ func goMine(job client.MultiClientJob, thread int) {
} }
time.Sleep(time.Second) time.Sleep(time.Second)
} }
resp, err := cl.SubmitWork(fnonce, jobid) chatMsg, chatID := chat.GetChatToSend()
//crylog.Info("sending chatmsg:", chatMsg)
resp, err := cl.SubmitWork(fnonce, jobid, chatMsg, chatID)
if err != nil { if err != nil {
cl.Close()
crylog.Warn("Submit work client failure:", jobid, err) crylog.Warn("Submit work client failure:", jobid, err)
cl.Close()
return return
} }
if resp.Error != nil { if resp.Error != nil {
@ -642,15 +676,29 @@ func goMine(job client.MultiClientJob, thread int) {
crylog.Warn("Submit work server error:", jobid, resp.Error) crylog.Warn("Submit work server error:", jobid, resp.Error)
return return
} }
chat.ChatSent(chatID)
stats.ShareAccepted(diffTarget) stats.ShareAccepted(diffTarget)
swr := resp.Result if resp.Result == nil {
if swr != nil { crylog.Warn("nil result")
if swr.PoolMargin > 0.0 { cl.Close()
stats.RefreshPoolStats2(swr) return
} else { }
crylog.Warn("Didn't get pool stats in response:", resp.Result) swr := &client.SubmitWorkResult{}
updatePoolStats(true) err = json.Unmarshal(*resp.Result, swr)
} if err != nil {
crylog.Warn("Failed to unmarshal SubmitWorkResult:", jobid, err)
cl.Close()
return
}
if swr.PoolMargin > 0.0 {
stats.RefreshPoolStats2(swr)
} else {
crylog.Warn("Didn't get pool stats in response:", resp.Result)
updatePoolStats(true)
}
//crylog.Info("Chat token:", resp.ChatToken, chat.NextToken())
if resp.ChatToken > 0 && resp.ChatToken != chat.NextToken() {
go GetChats()
} }
}(fnonce, job.JobID) }(fnonce, job.JobID)
} }
@ -741,6 +789,8 @@ func getActivityMessage(activityState int) string {
return "ACTIVE" return "ACTIVE"
case MINING_ACTIVE_USER_OVERRIDE: case MINING_ACTIVE_USER_OVERRIDE:
return "ACTIVE: user override." return "ACTIVE: user override."
case MINING_ACTIVE_CHATS_TO_SEND:
return "ACTIVE: sending chat message."
} }
crylog.Error("Unknown activity state:", activityState) crylog.Error("Unknown activity state:", activityState)
if activityState > 0 { if activityState > 0 {

View File

@ -21,6 +21,7 @@ import (
const ( const (
SUBMIT_WORK_JSON_ID = 999 SUBMIT_WORK_JSON_ID = 999
CONNECT_JSON_ID = 666 CONNECT_JSON_ID = 666
GET_CHATS_JSON_ID = 9999
MAX_REQUEST_SIZE = 50000 // Max # of bytes we will read per request MAX_REQUEST_SIZE = 50000 // Max # of bytes we will read per request
@ -35,6 +36,8 @@ type Job struct {
// For self-select mode: // For self-select mode:
PoolWallet string `json:"pool_wallet"` PoolWallet string `json:"pool_wallet"`
ExtraNonce string `json:"extra_nonce"` ExtraNonce string `json:"extra_nonce"`
ChatToken int `json:"chat_token"` // custom field
} }
type RXJob struct { type RXJob struct {
@ -72,24 +75,60 @@ type SubmitWorkResult struct {
NetworkDifficulty int64 // difficulty, possibly smoothed over the last several blocks NetworkDifficulty int64 // difficulty, possibly smoothed over the last several blocks
// TODO: These pool config values rarely change, so we should fetch these in some other way // TODO: These pool config values rarely change, so we should fetch these in some other way
// instead of returning them from each SubmitWork call. // instead of returning them from each SubmitWork call, or perhaps only return them when
// they change.
PoolMargin float64 PoolMargin float64
PoolFee float64 PoolFee float64
} }
type SubmitWorkResponse struct { type ChatResult struct {
ID uint64 `json:"id"` Username string // user sending the chat
Jsonrpc string `json:"jsonrpc"` Message string // the chat message
Method string `json:"method"` Timestamp int64 // unix time in seconds when the chat message was sent
Job *MultiClientJob `json:"params"` }
Result *SubmitWorkResult
Error interface{} `json:"error"` type GetChatsResult struct {
Chats []ChatResult
NextToken int
}
type Response struct {
ID uint64 `json:"id"`
Jsonrpc string `json:"jsonrpc"`
Method string `json:"method"`
Job *MultiClientJob `json:"params"` // used to send jobs over the connection
Result *json.RawMessage `json:"result"` // used to return SubmitWork or GetChats results
Error interface{} `json:"error"`
ChatToken int `json:"chat_token"` // custom field
}
type loginResponse struct {
ID uint64 `json:"id"`
Jsonrpc string `json:"jsonrpc"`
Result *struct {
ID string `json:"id"`
Job *MultiClientJob `job:"job"`
} `json:"result"`
Error *struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"error"`
// our own custom field for reporting login warnings without forcing disconnect from error:
Warning *struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"warning"`
ChatToken int `json:"chat_token"` // custom field
} }
type Client struct { type Client struct {
address string address string
conn net.Conn conn net.Conn
responseChannel chan *SubmitWorkResponse responseChannel chan *Response
mutex sync.Mutex mutex sync.Mutex
@ -164,23 +203,7 @@ func (cl *Client) Connect(
} }
// Now read the login response // Now read the login response
response := &struct { response := &loginResponse{}
ID uint64 `json:"id"`
Jsonrpc string `json:"jsonrpc"`
Result *struct {
ID string `json:"id"`
Job *MultiClientJob `job:"job"`
} `json:"result"`
Error *struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"error"`
// our own custom field for reporting login warnings without forcing disconnect from error:
Warning *struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"warning"`
}{}
cl.conn.SetReadDeadline(time.Now().Add(30 * time.Second)) cl.conn.SetReadDeadline(time.Now().Add(30 * time.Second))
rdr := bufio.NewReaderSize(cl.conn, MAX_REQUEST_SIZE) rdr := bufio.NewReaderSize(cl.conn, MAX_REQUEST_SIZE)
err = readJSON(response, rdr) err = readJSON(response, rdr)
@ -193,7 +216,7 @@ func (cl *Client) Connect(
return errors.New("stratum server error"), response.Error.Code, response.Error.Message, nil return errors.New("stratum server error"), response.Error.Code, response.Error.Message, nil
} }
cl.responseChannel = make(chan *SubmitWorkResponse) cl.responseChannel = make(chan *Response)
cl.alive = true cl.alive = true
jc := make(chan *MultiClientJob) jc := make(chan *MultiClientJob)
go dispatchJobs(cl.conn, jc, response.Result.Job, cl.responseChannel) go dispatchJobs(cl.conn, jc, response.Result.Job, cl.responseChannel)
@ -204,7 +227,7 @@ func (cl *Client) Connect(
} }
// if error is returned then client will be closed and put in not-alive state // if error is returned then client will be closed and put in not-alive state
func (cl *Client) SubmitMulticlientWork(username string, rigid string, nonce string, connNonce []byte, jobid string, targetDifficulty int64) (*SubmitWorkResponse, error) { func (cl *Client) SubmitMulticlientWork(username string, rigid string, nonce string, connNonce []byte, jobid string, targetDifficulty int64) (*Response, error) {
submitRequest := &struct { submitRequest := &struct {
ID uint64 `json:"id"` ID uint64 `json:"id"`
Method string `json:"method"` Method string `json:"method"`
@ -225,11 +248,11 @@ func (cl *Client) SubmitMulticlientWork(username string, rigid string, nonce str
}{"696969", jobid, nonce, "", username, rigid, targetDifficulty, connNonce}, }{"696969", jobid, nonce, "", username, rigid, targetDifficulty, connNonce},
} }
return cl.submitRequest(submitRequest) return cl.submitRequest(submitRequest, SUBMIT_WORK_JSON_ID)
} }
// if error is returned then client will be closed and put in not-alive state // if error is returned then client will be closed and put in not-alive state
func (cl *Client) submitRequest(submitRequest interface{}) (*SubmitWorkResponse, error) { func (cl *Client) submitRequest(submitRequest interface{}, expectedResponseID uint64) (*Response, error) {
cl.mutex.Lock() cl.mutex.Lock()
if !cl.alive { if !cl.alive {
cl.mutex.Unlock() cl.mutex.Unlock()
@ -257,15 +280,31 @@ func (cl *Client) submitRequest(submitRequest interface{}) (*SubmitWorkResponse,
crylog.Error("got nil response, closing") crylog.Error("got nil response, closing")
return nil, fmt.Errorf("submit work failure: nil response") return nil, fmt.Errorf("submit work failure: nil response")
} }
if response.ID != SUBMIT_WORK_JSON_ID { if response.ID != expectedResponseID {
crylog.Error("got unexpected response:", response.ID, "Closing connection.") crylog.Error("got unexpected response:", response.ID, "wanted:", expectedResponseID, "Closing connection.")
return nil, fmt.Errorf("submit work failure: unexpected response") return nil, fmt.Errorf("submit work failure: unexpected response")
} }
return response, nil return response, nil
} }
func (cl *Client) GetChats(chatToken int) (*Response, error) {
chatRequest := &struct {
ID uint64 `json:"id"`
Method string `json:"method"`
Params interface{} `json:"params"`
}{
ID: GET_CHATS_JSON_ID,
Method: "get_chats",
Params: &struct {
ChatToken int `json:"chat_token"`
}{chatToken},
}
return cl.submitRequest(chatRequest, GET_CHATS_JSON_ID)
}
// if error is returned then client will be closed and put in not-alive state // if error is returned then client will be closed and put in not-alive state
func (cl *Client) SubmitWork(nonce string, jobid string) (*SubmitWorkResponse, error) { func (cl *Client) SubmitWork(nonce string, jobid string, chat string, chatID int) (*Response, error) {
submitRequest := &struct { submitRequest := &struct {
ID uint64 `json:"id"` ID uint64 `json:"id"`
Method string `json:"method"` Method string `json:"method"`
@ -278,9 +317,12 @@ func (cl *Client) SubmitWork(nonce string, jobid string) (*SubmitWorkResponse, e
JobID string `json:"job_id"` JobID string `json:"job_id"`
Nonce string `json:"nonce"` Nonce string `json:"nonce"`
Result string `json:"result"` Result string `json:"result"`
}{"696969", jobid, nonce, ""},
Chat string `json:"chat"`
ChatID int `json:"chat_id"`
}{"696969", jobid, nonce, "", chat, chatID},
} }
return cl.submitRequest(submitRequest) return cl.submitRequest(submitRequest, SUBMIT_WORK_JSON_ID)
} }
func (cl *Client) Close() { func (cl *Client) Close() {
@ -293,9 +335,9 @@ func (cl *Client) Close() {
cl.conn.Close() cl.conn.Close()
} }
// DispatchJobs will forward incoming jobs to the JobChannel until error is received or the // dispatchJobs will forward incoming jobs to the JobChannel until error is received or the
// connection is closed. Client will be in not-alive state on return. // connection is closed. Client will be in not-alive state on return.
func dispatchJobs(conn net.Conn, jobChan chan<- *MultiClientJob, firstJob *MultiClientJob, responseChan chan<- *SubmitWorkResponse) { func dispatchJobs(conn net.Conn, jobChan chan<- *MultiClientJob, firstJob *MultiClientJob, responseChan chan<- *Response) {
defer func() { defer func() {
close(jobChan) close(jobChan)
close(responseChan) close(responseChan)
@ -303,25 +345,26 @@ func dispatchJobs(conn net.Conn, jobChan chan<- *MultiClientJob, firstJob *Multi
jobChan <- firstJob jobChan <- firstJob
reader := bufio.NewReaderSize(conn, MAX_REQUEST_SIZE) reader := bufio.NewReaderSize(conn, MAX_REQUEST_SIZE)
for { for {
response := &SubmitWorkResponse{} response := &Response{}
conn.SetReadDeadline(time.Now().Add(3600 * time.Second)) conn.SetReadDeadline(time.Now().Add(3600 * time.Second))
err := readJSON(response, reader) err := readJSON(response, reader)
if err != nil { if err != nil {
crylog.Error("readJSON failed, closing client and exiting dispatch:", err) crylog.Error("readJSON failed, closing client:", err)
break break
} }
if response.Method != "job" { if response.Method != "job" {
if response.ID == SUBMIT_WORK_JSON_ID { if response.ID == SUBMIT_WORK_JSON_ID || response.ID == GET_CHATS_JSON_ID {
responseChan <- response responseChan <- response
continue continue
} }
crylog.Warn("Unexpected response:", *response) crylog.Warn("Unexpected response from stratum server. Ignoring:", *response)
continue continue
} }
if response.Job == nil { if response.Job == nil {
crylog.Error("Didn't get job as expected:", *response) crylog.Error("Didn't get job as expected from stratum server, closing client:", *response)
break break
} }
response.Job.ChatToken = response.ChatToken
jobChan <- response.Job jobChan <- response.Job
} }
} }