Start with refactor

This commit is contained in:
Mariano Sorgente 2019-10-21 22:48:27 +09:00
parent d4896a2f65
commit 3916e9032b
14 changed files with 609 additions and 490 deletions

View File

@ -23,17 +23,17 @@ sh install.sh
When running the servers on Mac OS, allow the application to accept incoming connections.
Run the servers in the following order (you can also use ipython):
```bash
./lib/chiavdf/fast_vdf/vdf 8889
./lib/chiavdf/fast_vdf/vdf 8890
./lib/chiavdf/fast_vdf/server 8889
./lib/chiavdf/fast_vdf/server 8890
python -m src.server.start_plotter
python -m src.server.start_timelord
python -m src.server.start_farmer
python -m src.server.start_full_node "127.0.0.1" 8002 "-f" "-t"
python -m src.server.start_full_node "127.0.0.1" 8004
python -m src.server.start_full_node "127.0.0.1" 8002 "-f"
python -m src.server.start_full_node "127.0.0.1" 8004 "-t"
python -m src.server.start_full_node "127.0.0.1" 8005
```
Try running one of the full nodes after the other ones, to test initial sync.
Try running one of the full nodes a few minutes after the other ones, to test initial sync.
Configuration of peers can be changed in src/config.
You can also run the simulation, which runs all servers at once.

View File

@ -21,5 +21,5 @@ g++ -o compile_asm.o -c compile_asm.cpp $compile_flags -O0
g++ -o compile_asm compile_asm.o $link_flags
./compile_asm
as -o asm_compiled.o asm_compiled.s
g++ -o vdf.o -c vdf.cpp $compile_flags -O3
g++ -o vdf vdf.o asm_compiled.o $link_flags
g++ -o server.o -c server.cpp $compile_flags -O3
g++ -o server server.o asm_compiled.o $link_flags

View File

@ -0,0 +1,199 @@
#include <boost/asio.hpp>
#include "vdf.h"
using boost::asio::ip::tcp;
const int max_length = 2048;
std::mutex socket_mutex;
void PrintInfo(std::string input) {
print("VDF Server: " + input);
}
void CreateAndWriteProof(integer D, form x, int64_t num_iterations, WesolowskiCallback& weso, bool& stop_signal, tcp::socket& sock) {
Proof result = CreateProofOfTimeNWesolowski(D, x, num_iterations, 0, weso, 2, 0, stop_signal);
if (stop_signal == true) {
PrintInfo("Got stop signal before completing the proof!");
return ;
}
std::vector<unsigned char> bytes = ConvertIntegerToBytes(integer(num_iterations), 8);
bytes.insert(bytes.end(), result.y.begin(), result.y.end());
bytes.insert(bytes.end(), result.proof.begin(), result.proof.end());
std::string str_result = BytesToStr(bytes);
std::lock_guard<std::mutex> lock(socket_mutex);
PrintInfo("Generated proof = " + str_result);;
boost::asio::write(sock, boost::asio::buffer(str_result.c_str(), str_result.size()));
}
void PollTimelord(tcp::socket& sock, bool& got_iters) {
// Wait for 15s, if no iters come, poll each 5 seconds the timelord.
int seconds = 0;
while (!got_iters) {
std::this_thread::sleep_for (std::chrono::seconds(1));
seconds++;
if (seconds >= 15 && (seconds - 15) % 5 == 0) {
socket_mutex.lock();
boost::asio::write(sock, boost::asio::buffer("POLL", 4));
socket_mutex.unlock();
}
}
}
void session(tcp::socket sock) {
try {
char disc[350];
char disc_size[5];
boost::system::error_code error;
memset(disc,0x00,sizeof(disc)); // For null termination
memset(disc_size,0x00,sizeof(disc_size)); // For null termination
boost::asio::read(sock, boost::asio::buffer(disc_size, 3), error);
int disc_int_size = atoi(disc_size);
boost::asio::read(sock, boost::asio::buffer(disc, disc_int_size), error);
integer D(disc);
PrintInfo("Discriminant = " + to_string(D.impl));
// Init VDF the discriminant...
if (error == boost::asio::error::eof)
return ; // Connection closed cleanly by peer.
else if (error)
throw boost::system::system_error(error); // Some other error.
if (getenv( "warn_on_corruption_in_production" )!=nullptr) {
warn_on_corruption_in_production=true;
}
if (is_vdf_test) {
PrintInfo( "=== Test mode ===" );
}
if (warn_on_corruption_in_production) {
PrintInfo( "=== Warn on corruption enabled ===" );
}
assert(is_vdf_test); //assertions should be disabled in VDF_MODE==0
init_gmp();
allow_integer_constructor=true; //make sure the old gmp allocator isn't used
set_rounding_mode();
integer L=root(-D, 4);
form f=form::generator(D);
bool stop_signal = false;
std::set<uint64_t> seen_iterations;
std::vector<std::thread> threads;
WesolowskiCallback weso(1000000);
//mpz_init(weso.forms[0].a.impl);
//mpz_init(weso.forms[0].b.impl);
//mpz_init(weso.forms[0].c.impl);
forms[0]=f;
weso.D = D;
weso.L = L;
weso.kl = 10;
bool stopped = false;
bool got_iters = false;
std::thread vdf_worker(repeated_square, f, D, L, std::ref(weso), std::ref(stopped));
std::thread poll_thread(PollTimelord, std::ref(sock), std::ref(got_iters));
// Tell client that I'm ready to get the challenges.
boost::asio::write(sock, boost::asio::buffer("OK", 2));
char data[10];
while (!stopped) {
memset(data, 0, sizeof(data));
boost::asio::read(sock, boost::asio::buffer(data, 1), error);
int size = data[0] - '0';
memset(data, 0, sizeof(data));
boost::asio::read(sock, boost::asio::buffer(data, size), error);
int iters = atoi(data);
PrintInfo("Got iterations " + to_string(iters));
got_iters = true;
if (seen_iterations.size() > 0 && *seen_iterations.begin() <= iters) {
PrintInfo("Ignoring " + to_string(iters) + ", too high.");
continue;
}
if (seen_iterations.size() > 2 && iters != 0) {
PrintInfo("Ignoring " + to_string(iters) + ", already have 3 iters.");
continue;
}
if (iters == 0) {
stopped = true;
poll_thread.join();
for (int t = 0; t < threads.size(); t++) {
threads[t].join();
}
vdf_worker.join();
} else {
if (seen_iterations.find(iters) == seen_iterations.end()) {
seen_iterations.insert(iters);
threads.push_back(std::thread(CreateAndWriteProof, D, f, iters, std::ref(weso), std::ref(stopped),
std::ref(sock)));
}
}
}
} catch (std::exception& e) {
PrintInfo("Exception in thread: " + to_string(e.what()));
}
try {
// Tell client I've stopped everything, wait for ACK and close.
boost::system::error_code error;
PrintInfo("Stopped everything! Ready for the next challenge.");
std::lock_guard<std::mutex> lock(socket_mutex);
boost::asio::write(sock, boost::asio::buffer("STOP", 4));
char ack[5];
memset(ack,0x00,sizeof(ack));
boost::asio::read(sock, boost::asio::buffer(ack, 3), error);
assert (strncmp(ack, "ACK", 3) == 0);
} catch (std::exception& e) {
PrintInfo("Exception in thread: " + to_string(e.what()));
}
}
void server(boost::asio::io_context& io_context, unsigned short port)
{
tcp::acceptor a(io_context, tcp::endpoint(tcp::v4(), port));
for (;;)
{
std::thread t(session, a.accept());
t.join();
}
}
int main(int argc, char* argv[])
{
forms.reserve(3000000);
for (int i = 0; i < 3000000; i++) {
mpz_inits(forms[i].a.impl, forms[i].b.impl, forms[i].c.impl, NULL);
}
try
{
if (argc != 2)
{
PrintInfo("Usage: blocking_tcp_echo_server <port>");
return 1;
}
boost::asio::io_context io_context;
server(io_context, std::atoi(argv[1]));
}
catch (std::exception& e)
{
PrintInfo("Exception: " + to_string(e.what()));
}
return 0;
}

View File

@ -667,194 +667,3 @@ Proof CreateProofOfTimeNWesolowski(integer& D, form x, int64_t num_iterations,
final_proof.proof = proof_bytes;
return final_proof;
}
std::mutex socket_mutex;
void NWesolowskiMain(integer D, form x, int64_t num_iterations, WesolowskiCallback& weso, bool& stop_signal, tcp::socket& sock) {
Proof result = CreateProofOfTimeNWesolowski(D, x, num_iterations, 0, weso, 2, 0, stop_signal);
if (stop_signal == true) {
std::cout << "Got stop signal before completing the proof!\n";
return ;
}
std::vector<unsigned char> bytes = ConvertIntegerToBytes(integer(num_iterations), 8);
bytes.insert(bytes.end(), result.y.begin(), result.y.end());
bytes.insert(bytes.end(), result.proof.begin(), result.proof.end());
std::string str_result = BytesToStr(bytes);
std::lock_guard<std::mutex> lock(socket_mutex);
std::cout << "VDF server: Generated proof = " << str_result << "\n";
boost::asio::write(sock, boost::asio::buffer(str_result.c_str(), str_result.size()));
}
void PollTimelord(tcp::socket& sock, bool& got_iters) {
// Wait for 15s, if no iters come, poll each 5 seconds the timelord.
int seconds = 0;
while (!got_iters) {
std::this_thread::sleep_for (std::chrono::seconds(1));
seconds++;
if (seconds >= 15 && (seconds - 15) % 5 == 0) {
socket_mutex.lock();
boost::asio::write(sock, boost::asio::buffer("POLL", 4));
socket_mutex.unlock();
}
}
}
const int max_length = 2048;
void session(tcp::socket sock) {
try {
char disc[350];
char disc_size[5];
boost::system::error_code error;
memset(disc,0x00,sizeof(disc)); // For null termination
memset(disc_size,0x00,sizeof(disc_size)); // For null termination
boost::asio::read(sock, boost::asio::buffer(disc_size, 3), error);
int disc_int_size = atoi(disc_size);
boost::asio::read(sock, boost::asio::buffer(disc, disc_int_size), error);
integer D(disc);
std::cout << "Discriminant = " << D.impl << "\n";
// Init VDF the discriminant...
if (error == boost::asio::error::eof)
return ; // Connection closed cleanly by peer.
else if (error)
throw boost::system::system_error(error); // Some other error.
if (getenv( "warn_on_corruption_in_production" )!=nullptr) {
warn_on_corruption_in_production=true;
}
if (is_vdf_test) {
print( "=== Test mode ===" );
}
if (warn_on_corruption_in_production) {
print( "=== Warn on corruption enabled ===" );
}
assert(is_vdf_test); //assertions should be disabled in VDF_MODE==0
init_gmp();
allow_integer_constructor=true; //make sure the old gmp allocator isn't used
set_rounding_mode();
integer L=root(-D, 4);
form f=form::generator(D);
bool stop_signal = false;
std::set<uint64_t> seen_iterations;
std::vector<std::thread> threads;
WesolowskiCallback weso(1000000);
//mpz_init(weso.forms[0].a.impl);
//mpz_init(weso.forms[0].b.impl);
//mpz_init(weso.forms[0].c.impl);
forms[0]=f;
weso.D = D;
weso.L = L;
weso.kl = 10;
bool stopped = false;
bool got_iters = false;
std::thread vdf_worker(repeated_square, f, D, L, std::ref(weso), std::ref(stopped));
std::thread poll_thread(PollTimelord, std::ref(sock), std::ref(got_iters));
// Tell client that I'm ready to get the challenges.
boost::asio::write(sock, boost::asio::buffer("OK", 2));
char data[10];
while (!stopped) {
memset(data, 0, sizeof(data));
boost::asio::read(sock, boost::asio::buffer(data, 1), error);
int size = data[0] - '0';
memset(data, 0, sizeof(data));
boost::asio::read(sock, boost::asio::buffer(data, size), error);
int iters = atoi(data);
std::cout << "Got iterations " << iters << "\n";
got_iters = true;
if (seen_iterations.size() > 0 && *seen_iterations.begin() <= iters) {
std::cout << "Ignoring..." << iters << "\n";
continue;
}
if (seen_iterations.size() > 2 && iters != 0) {
std::cout << "Ignoring..." << iters << "\n";
continue;
}
if (iters == 0) {
stopped = true;
poll_thread.join();
for (int t = 0; t < threads.size(); t++) {
threads[t].join();
}
vdf_worker.join();
} else {
if (seen_iterations.find(iters) == seen_iterations.end()) {
seen_iterations.insert(iters);
threads.push_back(std::thread(NWesolowskiMain, D, f, iters, std::ref(weso), std::ref(stopped),
std::ref(sock)));
}
}
}
} catch (std::exception& e) {
std::cerr << "Exception in thread: " << e.what() << "\n";
}
try {
// Tell client I've stopped everything, wait for ACK and close.
boost::system::error_code error;
std::cout << "Stopped everything! Ready for the next challenge.\n";
std::lock_guard<std::mutex> lock(socket_mutex);
boost::asio::write(sock, boost::asio::buffer("STOP", 4));
char ack[5];
memset(ack,0x00,sizeof(ack));
boost::asio::read(sock, boost::asio::buffer(ack, 3), error);
assert (strncmp(ack, "ACK", 3) == 0);
} catch (std::exception& e) {
std::cerr << "Exception in thread: " << e.what() << "\n";
}
}
void server(boost::asio::io_context& io_context, unsigned short port)
{
tcp::acceptor a(io_context, tcp::endpoint(tcp::v4(), port));
for (;;)
{
std::thread t(session, a.accept());
t.join();
}
}
int main(int argc, char* argv[])
{
forms.reserve(1000000);
for (int i = 0; i < 1000000; i++) {
mpz_inits(forms[i].a.impl, forms[i].b.impl, forms[i].c.impl, NULL);
}
try
{
if (argc != 2)
{
std::cerr << "Usage: blocking_tcp_echo_server <port>\n";
return 1;
}
boost::asio::io_context io_context;
server(io_context, std::atoi(argv[1]));
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}

View File

@ -1,3 +1,4 @@
from src.store.full_node_store import FullNodeStore
from src.consensus.block_rewards import calculate_block_reward
import logging
from enum import Enum
@ -33,15 +34,15 @@ class ReceiveBlockResult(Enum):
class Blockchain:
def __init__(self, override_constants: Dict = {}):
def __init__(self, store: FullNodeStore, override_constants: Dict = {}):
# Allow passing in custom overrides for any consesus parameters
self.constants: Dict = consensus_constants
for key, value in override_constants.items():
self.constants[key] = value
self.store = store
self.heads: List[FullBlock] = []
self.lca_block: FullBlock = None
self.blocks: Dict[bytes32, FullBlock] = {}
self.height_to_hash: Dict[uint64, bytes32] = {}
self.genesis = FullBlock.from_bytes(self.constants["GENESIS_BLOCK"])
result = self.receive_block(self.genesis)
@ -63,10 +64,17 @@ class Blockchain:
return True
return False
def get_trunk_block(self, header_hash: bytes32) -> TrunkBlock:
return self.blocks[header_hash].trunk_block
async def get_block(self, header_hash: bytes32) -> Optional[FullBlock]:
return await self.store.get_block(header_hash)
def get_trunk_blocks_by_height(self, heights: List[uint64], tip_header_hash: bytes32) -> List[TrunkBlock]:
async def get_trunk_block(self, header_hash: bytes32) -> Optional[TrunkBlock]:
bl = await self.store.get_block(header_hash)
if bl:
return bl.trunk_block
else:
return None
async def get_trunk_blocks_by_height(self, heights: List[uint64], tip_header_hash: bytes32) -> List[TrunkBlock]:
"""
Returns a list of trunk blocks, one for each height requested.
"""
@ -74,15 +82,17 @@ class Blockchain:
sorted_heights = sorted([(height, index) for index, height in enumerate(heights)], reverse=True)
if tip_header_hash not in self.blocks:
curr_full_block: Optional[FullBlock] = await self.store.get_block(tip_header_hash)
if not curr_full_block:
raise BlockNotInBlockchain(f"Header hash {tip_header_hash} not present in chain.")
curr_block: TrunkBlock = self.blocks[tip_header_hash].trunk_block
curr_block = curr_full_block.trunk_block
trunks: List[Tuple[int, TrunkBlock]] = []
for height, index in sorted_heights:
if height > curr_block.challenge.height:
raise ValueError("Height is not valid for tip {tip_header_hash}")
while height < curr_block.challenge.height:
curr_block = self.blocks[curr_block.header.data.prev_header_hash].trunk_block
curr_full_block = (await self.store.get_block(curr_block.header.data.prev_header_hash)).trunk_block
trunks.append((index, curr_block))
return [b for index, b in sorted(trunks)]
@ -116,12 +126,12 @@ class Blockchain:
else:
raise ValueError("Invalid genesis block")
def get_next_difficulty(self, header_hash: bytes32) -> uint64:
async def get_next_difficulty(self, header_hash: bytes32) -> uint64:
"""
Returns the difficulty of the next block that extends onto header_hash.
Used to calculate the number of iterations.
"""
block = self.blocks.get(header_hash, None)
block = await self.store.get_block(header_hash)
if block is None:
raise Exception("Given header_hash must reference block already added")
@ -135,7 +145,7 @@ class Blockchain:
# epoch, as opposed to the first block (as in Bitcoin).
elif next_height % self.constants["DIFFICULTY_EPOCH"] != self.constants["DIFFICULTY_DELAY"]:
# Not at a point where difficulty would change
prev_block = self.blocks.get(block.prev_header_hash, None)
prev_block = await self.store.get_block(block.prev_header_hash)
if prev_block is None:
raise Exception("Previous block is invalid.")
return uint64(block.trunk_block.challenge.total_weight - prev_block.trunk_block.challenge.total_weight)
@ -165,28 +175,31 @@ class Blockchain:
block2 = curr
elif curr.height == height3:
block3 = curr
curr = self.blocks[curr.prev_header_hash]
curr = await self.store.get_block(curr.prev_header_hash)
assert curr is not None
# Once we are before the fork point (and before the LCA), we can use the height_to_hash map
if not block1 and height1 >= 0:
# height1 could be -1, for the first difficulty calculation
block1 = self.blocks[self.height_to_hash[height1]]
block1 = await self.store.get_block(self.height_to_hash[height1])
if not block2:
block2 = self.blocks[self.height_to_hash[height2]]
block2 = await self.store.get_block(self.height_to_hash[height2])
if not block3:
block3 = self.blocks[self.height_to_hash[height3]]
block3 = await self.store.get_block(self.height_to_hash[height3])
assert block1 is not None and block2 is not None and block3 is not None
# Current difficulty parameter (diff of block h = i - 1)
Tc = self.get_next_difficulty(block.prev_header_hash)
Tc = await self.get_next_difficulty(block.prev_header_hash)
# Previous difficulty parameter (diff of block h = i - 2048 - 1)
Tp = self.get_next_difficulty(block2.prev_header_hash)
Tp = await self.get_next_difficulty(block2.prev_header_hash)
if block1:
timestamp1 = block1.trunk_block.header.data.timestamp # i - 512 - 1
else:
# In the case of height == -1, there is no timestamp here, so assume the genesis block
# took constants["BLOCK_TIME_TARGET"] seconds to mine.
timestamp1 = (self.blocks[self.height_to_hash[uint64(0)]].trunk_block.header.data.timestamp
- self.constants["BLOCK_TIME_TARGET"])
genesis = await self.store.get_block(self.height_to_hash[uint64(0)])
assert genesis is not None
timestamp1 = (genesis.trunk_block.header.data.timestamp - self.constants["BLOCK_TIME_TARGET"])
timestamp2 = block2.trunk_block.header.data.timestamp # i - 2048 + 512 - 1
timestamp3 = block3.trunk_block.header.data.timestamp # i - 512 - 1
@ -210,12 +223,12 @@ class Blockchain:
else:
return max([uint64(1), new_difficulty, uint64(Tc // self.constants["DIFFICULTY_FACTOR"])])
def get_next_ips(self, header_hash) -> uint64:
async def get_next_ips(self, header_hash) -> uint64:
"""
Returns the VDF speed in iterations per seconds, to be used for the next block. This depends on
the number of iterations of the last epoch, and changes at the same block as the difficulty.
"""
block = self.blocks.get(header_hash, None)
block = await self.store.get_block(header_hash)
if block is None:
raise Exception("Given header_hash must reference block already added")
@ -227,12 +240,12 @@ class Blockchain:
elif next_height % self.constants["DIFFICULTY_EPOCH"] != self.constants["DIFFICULTY_DELAY"]:
# Not at a point where ips would change, so return the previous ips
# TODO: cache this for efficiency
prev_block = self.blocks.get(block.prev_header_hash, None)
prev_block = await self.store.get_block(block.prev_header_hash)
if prev_block is None:
raise Exception("Previous block is invalid.")
proof_of_space = block.trunk_block.proof_of_space
challenge_hash = block.trunk_block.proof_of_time.output.challenge_hash
difficulty = self.get_next_difficulty(prev_block.header_hash)
difficulty = await self.get_next_difficulty(prev_block.header_hash)
iterations = block.trunk_block.challenge.total_iters - prev_block.trunk_block.challenge.total_iters
return calculate_ips_from_iterations(proof_of_space, challenge_hash, difficulty, iterations,
self.constants["MIN_BLOCK_TIME"])
@ -257,13 +270,15 @@ class Blockchain:
block1 = curr
elif curr.height == height2:
block2 = curr
curr = self.blocks[curr.prev_header_hash]
curr = await self.store.get_block(curr.prev_header_hash)
assert curr is not None
# Once we are before the fork point (and before the LCA), we can use the height_to_hash map
if not block1 and height1 >= 0:
# height1 could be -1, for the first difficulty calculation
block1 = self.blocks[self.height_to_hash[height1]]
block1 = await self.store.get_block(self.height_to_hash[height1])
if not block2:
block2 = self.blocks[self.height_to_hash[height2]]
block2 = await self.store.get_block(self.height_to_hash[height2])
assert block1 is not None and block2 is not None
if block1:
timestamp1 = block1.trunk_block.header.data.timestamp
@ -271,57 +286,58 @@ class Blockchain:
else:
# In the case of height == -1, there is no timestamp here, so assume the genesis block
# took constants["BLOCK_TIME_TARGET"] seconds to mine.
timestamp1 = (self.blocks[self.height_to_hash[uint64(0)]].trunk_block.header.data.timestamp
- self.constants["BLOCK_TIME_TARGET"])
iters1 = self.blocks[self.height_to_hash[uint64(0)]].trunk_block.challenge.total_iters
genesis = await self.store.get_block(self.height_to_hash[uint64(0)])
assert genesis is not None
timestamp1 = genesis.trunk_block.header.data.timestamp - self.constants["BLOCK_TIME_TARGET"]
iters1 = genesis.trunk_block.challenge.total_iters
timestamp2 = block2.trunk_block.header.data.timestamp
iters2 = block2.trunk_block.challenge.total_iters
return uint64((iters2 - iters1) // (timestamp2 - timestamp1))
def receive_block(self, block: FullBlock) -> ReceiveBlockResult:
async def receive_block(self, block: FullBlock) -> ReceiveBlockResult:
"""
Adds a new block into the blockchain, if it's valid and connected to the current
blockchain, regardless of whether it is the child of a head, or another block.
"""
genesis: bool = block.height == 0 and len(self.heads) == 0
if block.header_hash in self.blocks:
if await self.store.get_block(block.header_hash) is not None:
return ReceiveBlockResult.ALREADY_HAVE_BLOCK
if not self.validate_block(block, genesis):
if not await self.validate_block(block, genesis):
return ReceiveBlockResult.INVALID_BLOCK
if block.prev_header_hash not in self.blocks and not genesis:
if await self.store.get_block(block.prev_header_hash) is None and not genesis:
return ReceiveBlockResult.DISCONNECTED_BLOCK
# Block is valid and connected, so it can be added to the blockchain.
self.blocks[block.header_hash] = block
if self._reconsider_heads(block):
await self.store.save_block(block)
if await self._reconsider_heads(block):
return ReceiveBlockResult.ADDED_TO_HEAD
else:
return ReceiveBlockResult.ADDED_AS_ORPHAN
def validate_unfinished_block(self, block: FullBlock, genesis: bool = False) -> bool:
async def validate_unfinished_block(self, block: FullBlock, genesis: bool = False) -> bool:
"""
Block validation algorithm. Returns true if the candidate block is fully valid
(except for proof of time). The same as validate_block, but without proof of time
and challenge validation.
"""
# 1. Check previous pointer(s) / flyclient
if not genesis and block.prev_header_hash not in self.blocks:
if not genesis and await self.store.get_block(block.prev_header_hash) is None:
return False
# 2. Check Now+2hrs > timestamp > avg timestamp of last 11 blocks
if not genesis:
last_timestamps: List[uint64] = []
prev_block: Optional[FullBlock] = self.blocks[block.prev_header_hash]
prev_block: Optional[FullBlock] = await self.store.get_block(block.prev_header_hash)
curr = prev_block
while len(last_timestamps) < self.constants["NUMBER_OF_TIMESTAMPS"]:
last_timestamps.append(curr.trunk_block.header.data.timestamp)
try:
curr = self.blocks[curr.prev_header_hash]
curr = await self.store.get_block(curr.prev_header_hash)
except KeyError:
break
if len(last_timestamps) != self.constants["NUMBER_OF_TIMESTAMPS"] and curr.body.coinbase.height != 0:
@ -384,7 +400,7 @@ class Blockchain:
# TODO: 13. check fees
return True
def validate_block(self, block: FullBlock, genesis: bool = False) -> bool:
async def validate_block(self, block: FullBlock, genesis: bool = False) -> bool:
"""
Block validation algorithm. Returns true iff the candidate block is fully valid,
and extends something in the blockchain.
@ -394,8 +410,8 @@ class Blockchain:
return False
if not genesis:
difficulty: uint64 = self.get_next_difficulty(block.prev_header_hash)
ips: uint64 = self.get_next_ips(block.prev_header_hash)
difficulty: uint64 = await self.get_next_difficulty(block.prev_header_hash)
ips: uint64 = await self.get_next_ips(block.prev_header_hash)
else:
difficulty: uint64 = uint64(self.constants["DIFFICULTY_STARTING"])
ips: uint64 = uint64(self.constants["VDF_IPS_STARTING"])
@ -422,7 +438,9 @@ class Blockchain:
return False
if not genesis:
prev_block: FullBlock = self.blocks[block.prev_header_hash]
prev_block: FullBlock = await self.store.get_block(block.prev_header_hash)
if not prev_block:
return False
# 5. and check if PoT.output.challenge_hash matches
if (block.trunk_block.proof_of_time.output.challenge_hash !=
@ -457,7 +475,7 @@ class Blockchain:
return True
def _reconsider_heights(self, old_lca: FullBlock, new_lca: FullBlock):
async def _reconsider_heights(self, old_lca: FullBlock, new_lca: FullBlock):
"""
Update the mapping from height to block hash, when the lca changes.
"""
@ -468,18 +486,18 @@ class Blockchain:
self.height_to_hash[uint64(curr_new.height)] = curr_new.header_hash
if curr_new.height == 0:
return
curr_new = self.blocks[curr_new.prev_header_hash].trunk_block
curr_new = (await self.store.get_block(curr_new.prev_header_hash)).trunk_block
elif curr_old.height > curr_new.height:
del self.height_to_hash[uint64(curr_old.height)]
curr_old = self.blocks[curr_old.prev_header_hash].trunk_block
curr_old = (await self.store.get_block(curr_old.prev_header_hash)).trunk_block
else:
if curr_new.header_hash == curr_old.header_hash:
return
self.height_to_hash[uint64(curr_new.height)] = curr_new.header_hash
curr_new = self.blocks[curr_new.prev_header_hash].trunk_block
curr_old = self.blocks[curr_old.prev_header_hash].trunk_block
curr_new = (await self.store.get_block(curr_new.prev_header_hash)).trunk_block
curr_old = (await self.store.get_block(curr_old.prev_header_hash)).trunk_block
def _reconsider_lca(self):
async def _reconsider_lca(self):
"""
Update the least common ancestor of the heads. This is useful, since we can just assume
there is one block per height before the LCA (and use the height_to_hash dict).
@ -488,12 +506,12 @@ class Blockchain:
heights: List[uint32] = [b.height for b in cur]
while any(h != heights[0] for h in heights):
i = heights.index(max(heights))
cur[i] = self.blocks[cur[i].prev_header_hash]
cur[i] = await self.store.get_block(cur[i].prev_header_hash)
heights[i] = cur[i].height
self._reconsider_heights(self.lca_block, cur[0])
self.lca_block = cur[0]
def _reconsider_heads(self, block: FullBlock) -> bool:
async def _reconsider_heads(self, block: FullBlock) -> bool:
"""
When a new block is added, this is called, to check if the new block is heavier
than one of the heads.

View File

@ -1,6 +1,6 @@
import logging
import asyncio
import yaml
from yaml import safe_load
from hashlib import sha256
from typing import List, Dict, Set, Tuple, Any
@ -17,7 +17,7 @@ from src.consensus.constants import constants
from src.server.outbound_message import OutboundMessage, Delivery, Message, NodeType
class Database:
class FarmerState:
lock = asyncio.Lock()
plotter_responses_header_hash: Dict[bytes32, bytes32] = {}
plotter_responses_challenge: Dict[bytes32, bytes32] = {}
@ -33,9 +33,9 @@ class Database:
proof_of_time_estimate_ips: uint64 = uint64(3000)
config = yaml.safe_load(open("src/config/farmer.yaml", "r"))
config = safe_load(open("src/config/farmer.yaml", "r"))
log = logging.getLogger(__name__)
db = Database()
state: FarmerState = FarmerState()
"""
@ -50,13 +50,13 @@ async def challenge_response(challenge_response: plotter_protocol.ChallengeRespo
of space is sufficiently good, and if so, we ask for the whole proof.
"""
async with db.lock:
if challenge_response.quality in db.plotter_responses_challenge:
async with state.lock:
if challenge_response.quality in state.plotter_responses_challenge:
log.warning(f"Have already seen quality {challenge_response.quality}")
return
height: uint32 = db.challenge_to_height[challenge_response.challenge_hash]
height: uint32 = state.challenge_to_height[challenge_response.challenge_hash]
difficulty: uint64 = uint64(0)
for posf in db.challenges[height]:
for posf in state.challenges[height]:
if posf.challenge_hash == challenge_response.challenge_hash:
difficulty = posf.difficulty
if difficulty == 0:
@ -65,14 +65,14 @@ async def challenge_response(challenge_response: plotter_protocol.ChallengeRespo
number_iters: uint64 = calculate_iterations_quality(challenge_response.quality,
challenge_response.plot_size,
difficulty,
db.proof_of_time_estimate_ips,
state.proof_of_time_estimate_ips,
constants["MIN_BLOCK_TIME"])
estimate_secs: float = number_iters / db.proof_of_time_estimate_ips
estimate_secs: float = number_iters / state.proof_of_time_estimate_ips
log.info(f"Estimate: {estimate_secs}, rate: {db.proof_of_time_estimate_ips}")
log.info(f"Estimate: {estimate_secs}, rate: {state.proof_of_time_estimate_ips}")
if estimate_secs < config['pool_share_threshold'] or estimate_secs < config['propagate_threshold']:
async with db.lock:
db.plotter_responses_challenge[challenge_response.quality] = challenge_response.challenge_hash
async with state.lock:
state.plotter_responses_challenge[challenge_response.quality] = challenge_response.challenge_hash
request = plotter_protocol.RequestProofOfSpace(challenge_response.quality)
yield OutboundMessage(NodeType.PLOTTER, Message("request_proof_of_space", request), Delivery.RESPOND)
@ -85,15 +85,15 @@ async def respond_proof_of_space(response: plotter_protocol.RespondProofOfSpace)
and request a pool partial, a header signature, or both, if the proof is good enough.
"""
async with db.lock:
async with state.lock:
pool_sks: List[PrivateKey] = [PrivateKey.from_bytes(bytes.fromhex(ce)) for ce in config["pool_sks"]]
assert response.proof.pool_pubkey in [sk.get_public_key() for sk in pool_sks]
challenge_hash: bytes32 = db.plotter_responses_challenge[response.quality]
challenge_height: uint32 = db.challenge_to_height[challenge_hash]
challenge_hash: bytes32 = state.plotter_responses_challenge[response.quality]
challenge_height: uint32 = state.challenge_to_height[challenge_hash]
new_proof_height: uint32 = uint32(challenge_height + 1)
difficulty: uint64 = uint64(0)
for posf in db.challenges[challenge_height]:
for posf in state.challenges[challenge_height]:
if posf.challenge_hash == challenge_hash:
difficulty = posf.difficulty
if difficulty == 0:
@ -102,28 +102,28 @@ async def respond_proof_of_space(response: plotter_protocol.RespondProofOfSpace)
computed_quality = response.proof.verify_and_get_quality(challenge_hash)
assert response.quality == computed_quality
async with db.lock:
db.plotter_responses_proofs[response.quality] = response.proof
db.plotter_responses_proof_hash_to_qual[response.proof.get_hash()] = response.quality
async with state.lock:
state.plotter_responses_proofs[response.quality] = response.proof
state.plotter_responses_proof_hash_to_qual[response.proof.get_hash()] = response.quality
number_iters: uint64 = calculate_iterations_quality(computed_quality,
response.proof.size,
difficulty,
db.proof_of_time_estimate_ips,
state.proof_of_time_estimate_ips,
constants["MIN_BLOCK_TIME"])
async with db.lock:
estimate_secs: float = number_iters / db.proof_of_time_estimate_ips
async with state.lock:
estimate_secs: float = number_iters / state.proof_of_time_estimate_ips
if estimate_secs < config['pool_share_threshold']:
request = plotter_protocol.RequestPartialProof(response.quality,
sha256(bytes.fromhex(config['farmer_target'])).digest())
yield OutboundMessage(NodeType.PLOTTER, Message("request_partial_proof", request), Delivery.RESPOND)
if estimate_secs < config['propagate_threshold']:
async with db.lock:
if new_proof_height not in db.coinbase_rewards:
async with state.lock:
if new_proof_height not in state.coinbase_rewards:
log.error(f"Don't have coinbase transaction for height {new_proof_height}, cannot submit PoS")
return
coinbase, signature = db.coinbase_rewards[new_proof_height]
coinbase, signature = state.coinbase_rewards[new_proof_height]
request = farmer_protocol.RequestHeaderHash(challenge_hash, coinbase, signature,
bytes.fromhex(config['farmer_target']), response.proof)
@ -136,10 +136,10 @@ async def respond_header_signature(response: plotter_protocol.RespondHeaderSigna
Receives a signature on a block header hash, which is required for submitting
a block to the blockchain.
"""
async with db.lock:
header_hash: bytes32 = db.plotter_responses_header_hash[response.quality]
proof_of_space: bytes32 = db.plotter_responses_proofs[response.quality]
plot_pubkey = db.plotter_responses_proofs[response.quality].plot_pubkey
async with state.lock:
header_hash: bytes32 = state.plotter_responses_header_hash[response.quality]
proof_of_space: bytes32 = state.plotter_responses_proofs[response.quality]
plot_pubkey = state.plotter_responses_proofs[response.quality].plot_pubkey
assert response.header_hash_signature.verify([Util.hash256(header_hash)],
[plot_pubkey])
@ -158,9 +158,9 @@ async def respond_partial_proof(response: plotter_protocol.RespondPartialProof):
share, to tell the pool where to pay the farmer.
"""
async with db.lock:
async with state.lock:
farmer_target_hash = sha256(bytes.fromhex(config['farmer_target'])).digest()
plot_pubkey = db.plotter_responses_proofs[response.quality].plot_pubkey
plot_pubkey = state.plotter_responses_proofs[response.quality].plot_pubkey
assert response.farmer_target_signature.verify([Util.hash256(farmer_target_hash)],
[plot_pubkey])
@ -179,9 +179,9 @@ async def header_hash(response: farmer_protocol.HeaderHash):
"""
header_hash: bytes32 = response.header_hash
async with db.lock:
quality: bytes32 = db.plotter_responses_proof_hash_to_qual[response.pos_hash]
db.plotter_responses_header_hash[quality] = header_hash
async with state.lock:
quality: bytes32 = state.plotter_responses_proof_hash_to_qual[response.pos_hash]
state.plotter_responses_header_hash[quality] = header_hash
# TODO: only send to the plotter who made the proof of space, not all plotters
request = plotter_protocol.RequestHeaderSignature(quality, header_hash)
@ -195,30 +195,30 @@ async def proof_of_space_finalized(proof_of_space_finalized: farmer_protocol.Pro
challenges list at that height, and height is updated if necessary
"""
get_proofs: bool = False
async with db.lock:
if (proof_of_space_finalized.height >= db.current_height and
proof_of_space_finalized.challenge_hash not in db.seen_challenges):
async with state.lock:
if (proof_of_space_finalized.height >= state.current_height and
proof_of_space_finalized.challenge_hash not in state.seen_challenges):
# Only get proofs for new challenges, at a current or new height
get_proofs = True
if (proof_of_space_finalized.height > db.current_height):
db.current_height = proof_of_space_finalized.height
if (proof_of_space_finalized.height > state.current_height):
state.current_height = proof_of_space_finalized.height
# TODO: ask the pool for this information
coinbase: CoinbaseInfo = CoinbaseInfo(uint32(db.current_height + 1),
calculate_block_reward(db.current_height),
coinbase: CoinbaseInfo = CoinbaseInfo(uint32(state.current_height + 1),
calculate_block_reward(state.current_height),
bytes.fromhex(config["pool_target"]))
pool_sks: List[PrivateKey] = [PrivateKey.from_bytes(bytes.fromhex(ce)) for ce in config["pool_sks"]]
coinbase_signature: PrependSignature = pool_sks[0].sign_prepend(coinbase.serialize())
db.coinbase_rewards[uint32(db.current_height + 1)] = (coinbase, coinbase_signature)
state.coinbase_rewards[uint32(state.current_height + 1)] = (coinbase, coinbase_signature)
log.info(f"\tCurrent height set to {db.current_height}")
db.seen_challenges.add(proof_of_space_finalized.challenge_hash)
if proof_of_space_finalized.height not in db.challenges:
db.challenges[proof_of_space_finalized.height] = [proof_of_space_finalized]
log.info(f"\tCurrent height set to {state.current_height}")
state.seen_challenges.add(proof_of_space_finalized.challenge_hash)
if proof_of_space_finalized.height not in state.challenges:
state.challenges[proof_of_space_finalized.height] = [proof_of_space_finalized]
else:
db.challenges[proof_of_space_finalized.height].append(proof_of_space_finalized)
db.challenge_to_height[proof_of_space_finalized.challenge_hash] = proof_of_space_finalized.height
state.challenges[proof_of_space_finalized.height].append(proof_of_space_finalized)
state.challenge_to_height[proof_of_space_finalized.challenge_hash] = proof_of_space_finalized.height
if get_proofs:
message = plotter_protocol.NewChallenge(proof_of_space_finalized.challenge_hash)
@ -231,11 +231,11 @@ async def proof_of_space_arrived(proof_of_space_arrived: farmer_protocol.ProofOf
Full node notifies the farmer that a new proof of space was created. The farmer can use this
information to decide whether to propagate a proof.
"""
async with db.lock:
if proof_of_space_arrived.height not in db.unfinished_challenges:
db.unfinished_challenges[proof_of_space_arrived.height] = []
async with state.lock:
if proof_of_space_arrived.height not in state.unfinished_challenges:
state.unfinished_challenges[proof_of_space_arrived.height] = []
else:
db.unfinished_challenges[proof_of_space_arrived.height].append(
state.unfinished_challenges[proof_of_space_arrived.height].append(
proof_of_space_arrived.quality_string)
@ -244,9 +244,11 @@ async def deep_reorg_notification(deep_reorg_notification: farmer_protocol.DeepR
# TODO: implement
# TODO: "forget everything and start over (reset db)"
log.error(f"Deep reorg notification not implemented.")
async with state.lock:
pass
@api_request
async def proof_of_time_rate(proof_of_time_rate: farmer_protocol.ProofOfTimeRate):
async with db.lock:
db.proof_of_time_estimate_ips = proof_of_time_rate.pot_estimate_ips
async with state.lock:
state.proof_of_time_estimate_ips = proof_of_time_rate.pot_estimate_ips

View File

@ -1,15 +1,14 @@
import logging
import time
import asyncio
import collections
import yaml
import concurrent
from secrets import token_bytes
from hashlib import sha256
from chiapos import Verifier
from blspy import Signature, PrivateKey
from asyncio import Lock, Event
from typing import Dict, List, Tuple, Optional, AsyncGenerator, Counter
from asyncio import Event
from typing import List, Optional, AsyncGenerator
from src.util.api_decorators import api_request
from src.util.ints import uint64, uint32
from src.util import errors
@ -21,7 +20,6 @@ from src.types.block_body import BlockBody
from src.types.trunk_block import TrunkBlock
from src.types.challenge import Challenge
from src.types.block_header import BlockHeaderData, BlockHeader
from src.types.proof_of_space import ProofOfSpace
from src.types.full_block import FullBlock
from src.types.fees_target import FeesTarget
from src.consensus.weight_verifier import verify_weight
@ -30,44 +28,14 @@ from src.consensus.constants import constants
from src.blockchain import Blockchain, ReceiveBlockResult
from src.server.outbound_message import OutboundMessage, Delivery, NodeType, Message
from src.util.errors import BlockNotInBlockchain, PeersDontHaveBlock, InvalidUnfinishedBlock
class Database:
# This protects all other resources
lock: Lock = Lock()
blockchain: Blockchain = Blockchain()
full_blocks: Dict[str, FullBlock] = {
FullBlock.from_bytes(constants["GENESIS_BLOCK"]).trunk_block.header.header_hash:
FullBlock.from_bytes(constants["GENESIS_BLOCK"])}
sync_mode: bool = True
# Block headers and blocks which we think might be heads, but we haven't verified yet.
# All these are used during sync mode
potential_heads: Counter[bytes32] = collections.Counter()
potential_heads_full_blocks: Dict[bytes32, FullBlock] = collections.Counter()
# Headers/trunks downloaded for the during sync, by height
potential_trunks: Dict[uint32, TrunkBlock] = {}
# Blocks downloaded during sync, by height
potential_blocks: Dict[uint32, FullBlock] = {}
# Event, which gets set whenever we receive the block at each height. Waited for by sync().
potential_blocks_received: Dict[uint32, Event] = {}
# These are the blocks that we created, but don't have the PoS from farmer yet,
# keyed from the proof of space hash
candidate_blocks: Dict[bytes32, Tuple[BlockBody, BlockHeaderData, ProofOfSpace]] = {}
# These are the blocks that we created, have PoS, but not PoT yet, keyed from the
# block header hash
unfinished_blocks: Dict[Tuple[bytes32, uint64], FullBlock] = {}
# Latest height with unfinished blocks, and expected timestamp of the finishing
unfinished_blocks_leader: Tuple[uint32, uint64] = (uint32(0), uint64(9999999999))
proof_of_time_estimate_ips: uint64 = uint64(1500)
from src.store.full_node_store import FullNodeStore
config = yaml.safe_load(open("src/config/full_node.yaml", "r"))
log = logging.getLogger(__name__)
db = Database()
store = FullNodeStore()
store.initialize()
blockchain: Blockchain = Blockchain(store)
async def send_heads_to_farmers() -> AsyncGenerator[OutboundMessage, None]:
@ -76,19 +44,19 @@ async def send_heads_to_farmers() -> AsyncGenerator[OutboundMessage, None]:
estimated proof of time rate, so farmer can calulate which proofs are good.
"""
requests: List[farmer_protocol.ProofOfSpaceFinalized] = []
async with db.lock:
for head in db.blockchain.get_current_heads():
async with (await store.get_lock()):
for head in blockchain.get_current_heads():
prev_challenge_hash = head.proof_of_time.output.challenge_hash
challenge_hash = head.challenge.get_hash()
height = head.challenge.height
quality = head.proof_of_space.verify_and_get_quality(prev_challenge_hash)
if head.height > 0:
difficulty: uint64 = db.blockchain.get_next_difficulty(head.prev_header_hash)
difficulty: uint64 = await blockchain.get_next_difficulty(head.prev_header_hash)
else:
difficulty = head.weight
requests.append(farmer_protocol.ProofOfSpaceFinalized(challenge_hash, height,
quality, difficulty))
proof_of_time_rate: uint64 = db.proof_of_time_estimate_ips
proof_of_time_rate: uint64 = await store.get_proof_of_time_estimate_ips()
for request in requests:
yield OutboundMessage(NodeType.FARMER, Message("proof_of_space_finalized", request), Delivery.BROADCAST)
rate_update = farmer_protocol.ProofOfTimeRate(proof_of_time_rate)
@ -100,8 +68,8 @@ async def send_challenges_to_timelords() -> AsyncGenerator[OutboundMessage, None
Sends all of the current heads to all timelord peers.
"""
requests: List[timelord_protocol.ChallengeStart] = []
async with db.lock:
for head in db.blockchain.get_current_heads():
async with (await store.get_lock()):
for head in blockchain.get_current_heads():
challenge_hash = head.challenge.get_hash()
requests.append(timelord_protocol.ChallengeStart(challenge_hash, head.challenge.height))
for request in requests:
@ -113,10 +81,10 @@ async def on_connect() -> AsyncGenerator[OutboundMessage, None]:
Whenever we connect to another full node, send them our current heads.
"""
blocks: List[FullBlock] = []
async with db.lock:
heads: List[TrunkBlock] = db.blockchain.get_current_heads()
async with (await store.get_lock()):
heads: List[TrunkBlock] = blockchain.get_current_heads()
for h in heads:
blocks.append(db.full_blocks[h.header.get_hash()])
blocks.append(blockchain.get_block(h.header.get_hash()))
for block in blocks:
request = peer_protocol.Block(block)
yield OutboundMessage(NodeType.FULL_NODE, Message("block", request), Delivery.RESPOND)
@ -141,23 +109,19 @@ async def sync():
# Based on responses from peers about the current heads, see which head is the heaviest
# (similar to longest chain rule).
async with db.lock:
potential_heads = db.potential_heads.items()
async with (await store.get_lock()):
potential_heads = await store.get_potential_heads.items()
log.info(f"Have collected {len(potential_heads)} potential heads")
for header_hash, _ in potential_heads:
block = db.potential_heads_full_blocks[header_hash]
block = await store.get_potential_heads_full_block(header_hash)
if block.trunk_block.challenge.total_weight > highest_weight:
highest_weight = block.trunk_block.challenge.total_weight
tip_block = block
tip_height = block.trunk_block.challenge.height
if highest_weight <= max([t.challenge.total_weight for t in db.blockchain.get_current_heads()]):
if highest_weight <= max([t.challenge.total_weight for t in blockchain.get_current_heads()]):
log.info("Not performing sync, already caught up.")
db.sync_mode = False
db.potential_heads.clear()
db.potential_heads_full_blocks.clear()
db.potential_trunks.clear()
db.potential_blocks.clear()
db.potential_blocks_received.clear()
await store.set_sync_mode(False)
await store.clear_sync_information()
return
# Now, we download all of the headers in order to verify the weight
@ -176,14 +140,14 @@ async def sync():
yield OutboundMessage(NodeType.FULL_NODE, Message("request_trunk_blocks", request), Delivery.RANDOM)
await asyncio.sleep(sleep_interval)
total_time_slept += sleep_interval
async with db.lock:
async with (await store.get_lock()):
received_all_trunks = True
local_trunks = []
for height in range(0, tip_height + 1):
if height not in db.potential_trunks:
if await store.get_potential_trunk(uint32(height)) is None:
received_all_trunks = False
break
local_trunks.append(db.potential_trunks[uint32(height)])
local_trunks.append(await store.get_potential_trunk(uint32(height)))
if received_all_trunks:
trunks = local_trunks
break
@ -194,31 +158,32 @@ async def sync():
log.error(f"Downloaded trunks up to tip height: {tip_height}")
assert tip_height + 1 == len(trunks)
async with db.lock:
fork_point: TrunkBlock = db.blockchain.find_fork_point(trunks)
async with (await store.get_lock()):
fork_point: TrunkBlock = await blockchain.find_fork_point(trunks)
# TODO: optimize, send many requests at once, and for more blocks
for height in range(fork_point.challenge.height + 1, tip_height + 1):
# Only download from fork point (what we don't have)
async with db.lock:
have_block = trunks[height].header.get_hash() in db.potential_heads_full_blocks
async with (await store.get_lock()):
have_block = await store.get_potential_heads_full_block(trunks[height].header_get_hash()) is not None
if not have_block:
request = peer_protocol.RequestSyncBlocks(tip_block.trunk_block.header.header_hash, [height])
async with db.lock:
db.potential_blocks_received[uint32(height)] = Event()
async with (await store.get_lock()):
await store.set_potential_blocks_received(uint32(height), Event())
found = False
for _ in range(30):
yield OutboundMessage(NodeType.FULL_NODE, Message("request_sync_blocks", request), Delivery.RANDOM)
try:
await asyncio.wait_for(db.potential_blocks_received[uint32(height)].wait(), timeout=2)
await asyncio.wait_for((await store.get_potential_blocks_received(uint32(height))).wait(),
timeout=2)
found = True
break
except concurrent.futures.TimeoutError:
log.info("Did not receive desired block")
if not found:
raise PeersDontHaveBlock(f"Did not receive desired block at height {height}")
async with db.lock:
async with (await store.get_lock()):
# TODO: ban peers that provide bad blocks
if have_block:
block = db.potential_heads_full_blocks[trunks[height].header.get_hash()]
@ -226,13 +191,13 @@ async def sync():
block = db.potential_blocks[uint32(height)]
start = time.time()
db.blockchain.receive_block(block)
await blockchain.receive_block(block)
log.info(f"Took {time.time() - start}")
assert max([h.challenge.height for h in db.blockchain.get_current_heads()]) >= height
db.full_blocks[block.trunk_block.header.get_hash()] = block
assert max([h.challenge.height for h in await blockchain.get_current_heads()]) >= height
# db.full_blocks[block.trunk_block.header.get_hash()] = block
db.proof_of_time_estimate_ips = db.blockchain.get_next_ips(block.header_hash)
async with db.lock:
async with (await store.get_lock()):
log.info(f"Finished sync up to height {tip_height}")
db.potential_heads.clear()
db.potential_heads_full_blocks.clear()
@ -251,7 +216,7 @@ async def request_trunk_blocks(request: peer_protocol.RequestTrunkBlocks) \
if len(request.heights) > config['max_trunks_to_send']:
raise errors.TooManyTrunksRequested(f"The max number of trunks is {config['max_trunks_to_send']},\
but requested {len(request.heights)}")
async with db.lock:
async with (await store.get_lock()):
try:
trunks: List[TrunkBlock] = db.blockchain.get_trunk_blocks_by_height(request.heights,
request.tip_header_hash)
@ -272,7 +237,7 @@ async def trunk_blocks(request: peer_protocol.TrunkBlocks) \
"""
Receive trunk blocks from a peer.
"""
async with db.lock:
async with (await store.get_lock()):
for trunk_block in request.trunk_blocks:
db.potential_trunks[trunk_block.challenge.height] = trunk_block
@ -286,15 +251,16 @@ async def request_sync_blocks(request: peer_protocol.RequestSyncBlocks) -> Async
Responsd to a peers request for syncing blocks.
"""
blocks: List[FullBlock] = []
async with db.lock:
if request.tip_header_hash in db.full_blocks:
async with (await store.get_lock()):
tip_block: Optional[FullBlock] = await block.chain.get_block(request.tip_header_hash)
if tip_block is not None:
if len(request.heights) > config['max_blocks_to_send']:
raise errors.TooManyTrunksRequested(f"The max number of blocks is {config['max_blocks_to_send']},"
f"but requested {len(request.heights)}")
try:
trunk_blocks: List[TrunkBlock] = db.blockchain.get_trunk_blocks_by_height(request.heights,
request.tip_header_hash)
blocks = [db.full_blocks[t.header.get_hash()] for t in trunk_blocks]
trunk_blocks: List[TrunkBlock] = await blockchain.get_trunk_blocks_by_height(request.heights,
request.tip_header_hash)
blocks = [await blockchain.get_block(t.header.get_hash()) for t in trunk_blocks]
except KeyError:
log.info("Do not have required blocks")
return
@ -315,7 +281,7 @@ async def sync_blocks(request: peer_protocol.SyncBlocks) -> AsyncGenerator[Outbo
We have received the blocks that we needed for syncing. Add them to processing queue.
"""
# TODO: use an actual queue?
async with db.lock:
async with (await store.get_lock()):
if not db.sync_mode:
log.warning("Receiving sync blocks when we are not in sync mode.")
return
@ -342,7 +308,7 @@ async def request_header_hash(request: farmer_protocol.RequestHeaderHash) -> Asy
bytes(request.proof_of_space.proof))
assert quality_string
async with db.lock:
async with (await store.get_lock()):
# Retrieves the correct head for the challenge
heads: List[TrunkBlock] = db.blockchain.get_current_heads()
target_head: Optional[TrunkBlock] = None
@ -394,7 +360,7 @@ async def header_signature(header_signature: farmer_protocol.HeaderSignature) ->
block, which only needs a Proof of Time to be finished. If the signature is valid,
we call the unfinished_block routine.
"""
async with db.lock:
async with (await store.get_lock()):
if header_signature.pos_hash not in db.candidate_blocks:
log.warning(f"PoS hash {header_signature.pos_hash} not found in database")
return
@ -422,7 +388,7 @@ async def proof_of_time_finished(request: timelord_protocol.ProofOfTimeFinished)
A proof of time, received by a peer timelord. We can use this to complete a block,
and call the block routine (which handles propagation and verification of blocks).
"""
async with db.lock:
async with (await store.get_lock()):
dict_key = (request.proof.output.challenge_hash, request.proof.output.number_of_iterations)
if dict_key not in db.unfinished_blocks:
log.warning(f"Received a proof of time that we cannot use to complete a block {dict_key}")
@ -457,7 +423,7 @@ async def new_proof_of_time(new_proof_of_time: peer_protocol.NewProofOfTime) ->
"""
finish_block: bool = False
propagate_proof: bool = False
async with db.lock:
async with (await store.get_lock()):
if (new_proof_of_time.proof.output.challenge_hash,
new_proof_of_time.proof.output.number_of_iterations) in db.unfinished_blocks:
finish_block = True
@ -480,7 +446,7 @@ async def unfinished_block(unfinished_block: peer_protocol.UnfinishedBlock) -> A
We can validate it and if it's a good block, propagate it to other peers and
timelords.
"""
async with db.lock:
async with (await store.get_lock()):
if not db.blockchain.is_child_of_head(unfinished_block.block):
return
@ -510,7 +476,7 @@ async def unfinished_block(unfinished_block: peer_protocol.UnfinishedBlock) -> A
# If this block is slow, sleep to allow faster blocks to come out first
await asyncio.sleep(2)
async with db.lock:
async with (await store.get_lock()):
if unfinished_block.block.height > db.unfinished_blocks_leader[0]:
log.info(f"This is the first block at height {unfinished_block.block.height}, so propagate.")
# If this is the first block we see at this height, propagate
@ -548,11 +514,11 @@ async def block(block: peer_protocol.Block) -> AsyncGenerator[OutboundMessage, N
header_hash = block.block.trunk_block.header.get_hash()
async with db.lock:
if db.sync_mode:
async with (await store.get_lock()):
if await store.get_sync_mode():
# Add the block to our potential heads list
db.potential_heads[header_hash] += 1
db.potential_heads_full_blocks[header_hash] = block.block
await store.add_potential_head(header_hash)
await store.add_potential_heads_full_block(block.block)
return
added: ReceiveBlockResult = db.blockchain.receive_block(block.block)
@ -564,17 +530,17 @@ async def block(block: peer_protocol.Block) -> AsyncGenerator[OutboundMessage, N
return
elif added == ReceiveBlockResult.DISCONNECTED_BLOCK:
log.warning(f"Disconnected block")
async with db.lock:
async with (await store.get_lock()):
tip_height = max([head.challenge.height for head in db.blockchain.get_current_heads()])
if block.block.trunk_block.challenge.height > tip_height + config["sync_blocks_behind_threshold"]:
async with db.lock:
db.potential_heads.clear()
db.potential_heads[header_hash] += 1
db.potential_heads_full_blocks[header_hash] = block.block
async with (await store.get_lock()):
await store.clear_sync_information()
await store.add_potential_head(header_hash)
await store.add_potential_heads_full_block(block.block)
log.info(f"We are too far behind this block. Our height is {tip_height} and block is at"
f"{block.block.trunk_block.challenge.height}")
# Perform a sync if we have to
db.sync_mode = True
await store.set_sync_mode(True)
try:
# Performs sync, and catch exceptions so we don't close the connection
async for msg in sync():
@ -595,21 +561,17 @@ async def block(block: peer_protocol.Block) -> AsyncGenerator[OutboundMessage, N
break
return
async with db.lock:
db.full_blocks[header_hash] = block.block
if added == ReceiveBlockResult.ADDED_TO_HEAD:
# Only propagate blocks which extend the blockchain (one of the heads)
ips_changed: bool = False
async with db.lock:
log.info(f"\tUpdated heads, new heights: {[b.height for b in db.blockchain.get_current_heads()]}")
difficulty = db.blockchain.get_next_difficulty(block.block.prev_header_hash)
old_ips = db.proof_of_time_estimate_ips
next_vdf_ips = db.blockchain.get_next_ips(block.block.header_hash)
async with (await store.get_lock()):
log.info(f"\tUpdated heads, new heights: {[b.height for b in blockchain.get_current_heads()]}")
difficulty = await blockchain.get_next_difficulty(block.block.prev_header_hash)
old_ips = await store.get_proof_of_time_estimate_ips()
next_vdf_ips = await blockchain.get_next_ips(block.block.header_hash)
log.info(f"Difficulty {difficulty} IPS {old_ips}")
if next_vdf_ips != db.proof_of_time_estimate_ips:
db.proof_of_time_estimate_ips = next_vdf_ips
if next_vdf_ips != await store.get_proof_of_time_estimate_ips():
await store.set_proof_of_time_estimate_ips(next_vdf_ips)
ips_changed = True
if ips_changed:
if next_vdf_ips > old_ips:

View File

@ -1,7 +1,7 @@
import logging
import os
import os.path
import yaml
from yaml import safe_load
from asyncio import Lock
from typing import Dict, Tuple, Optional
from blspy import PrivateKey, PublicKey, PrependSignature, Util
@ -14,17 +14,16 @@ from src.types.proof_of_space import ProofOfSpace
from src.server.outbound_message import OutboundMessage, Delivery, Message, NodeType
# TODO: store on disk
class Database:
class PlotterState:
# From filename to prover
provers: Dict[str, DiskProver] = {}
provers = {}
lock: Lock = Lock()
# From quality to (challenge_hash, filename, index)
challenge_hashes: Dict[bytes32, Tuple[bytes32, str, uint8]] = {}
config = yaml.safe_load(open("src/config/plotter.yaml", "r"))
db: Database = Database()
config = safe_load(open("src/config/plotter.yaml", "r"))
state: PlotterState = PlotterState()
log = logging.getLogger(__name__)
@ -48,8 +47,8 @@ async def plotter_handshake(plotter_handshake: plotter_protocol.PlotterHandshake
else:
# TODO: check plots are correct
pass
async with db.lock:
db.provers[filename] = DiskProver(filename)
async with state.lock:
state.provers[filename] = DiskProver(filename)
else:
log.warning(f"Plot {filename} has an invalid pool key.")
@ -66,17 +65,17 @@ async def new_challenge(new_challenge: plotter_protocol.NewChallenge):
raise ValueError("Invalid challenge size")
all_responses = []
async with db.lock:
for filename, prover in db.provers.items():
async with state.lock:
for filename, prover in state.provers.items():
try:
quality_strings = prover.get_qualities_for_challenge(new_challenge.challenge_hash)
except RuntimeError:
log.warning("Error using prover object. Reinitializing prover object.")
db.provers[filename] = DiskProver(filename)
state.provers[filename] = DiskProver(filename)
quality_strings = prover.get_qualities_for_challenge(new_challenge.challenge_hash)
for index, quality_str in enumerate(quality_strings):
quality = ProofOfSpace.quality_str_to_quality(new_challenge.challenge_hash, quality_str)
db.challenge_hashes[quality] = (new_challenge.challenge_hash, filename, uint8(index))
state.challenge_hashes[quality] = (new_challenge.challenge_hash, filename, uint8(index))
response: plotter_protocol.ChallengeResponse = plotter_protocol.ChallengeResponse(
new_challenge.challenge_hash,
quality,
@ -95,19 +94,19 @@ async def request_proof_of_space(request: plotter_protocol.RequestProofOfSpace):
We look up the correct plot based on the quality, lookup the proof, and return it.
"""
response: Optional[plotter_protocol.RespondProofOfSpace] = None
async with db.lock:
async with state.lock:
try:
# Using the quality find the right plot and index from our solutions
challenge_hash, filename, index = db.challenge_hashes[request.quality]
challenge_hash, filename, index = state.challenge_hashes[request.quality]
except KeyError:
log.warning(f"Quality {request.quality} not found")
return
if index is not None:
try:
proof_xs: bytes = db.provers[filename].get_full_proof(challenge_hash, index)
proof_xs: bytes = state.provers[filename].get_full_proof(challenge_hash, index)
except RuntimeError:
db.provers[filename] = DiskProver(filename)
proof_xs: bytes = db.provers[filename].get_full_proof(challenge_hash, index)
state.provers[filename] = DiskProver(filename)
proof_xs: bytes = state.provers[filename].get_full_proof(challenge_hash, index)
pool_pubkey = PublicKey.from_bytes(bytes.fromhex(config['plots'][filename]['pool_pk']))
plot_pubkey = PrivateKey.from_bytes(bytes.fromhex(config['plots'][filename]['sk'])).get_public_key()
@ -131,8 +130,8 @@ async def request_header_signature(request: plotter_protocol.RequestHeaderSignat
A signature is created on the header hash using the plot private key.
"""
async with db.lock:
_, filename, _ = db.challenge_hashes[request.quality]
async with state.lock:
_, filename, _ = state.challenge_hashes[request.quality]
plot_sk = PrivateKey.from_bytes(bytes.fromhex(config['plots'][filename]['sk']))
header_hash_signature: PrependSignature = plot_sk.sign_prepend(request.header_hash)
@ -152,8 +151,8 @@ async def request_partial_proof(request: plotter_protocol.RequestPartialProof):
We look up the correct plot based on the quality, lookup the proof, and sign
the farmer target hash using the plot private key. This will be used as a pool share.
"""
async with db.lock:
_, filename, _ = db.challenge_hashes[request.quality]
async with state.lock:
_, filename, _ = state.challenge_hashes[request.quality]
plot_sk = PrivateKey.from_bytes(bytes.fromhex(config['plots'][filename]['sk']))
farmer_target_signature: PrependSignature = plot_sk.sign_prepend(request.farmer_target_hash)

View File

@ -22,6 +22,7 @@ class ChallengeStart:
challenge_hash: bytes32
height: uint32
@cbor_message(tag=3002)
class ChallengeEnd:
challenge_hash: bytes32

View File

@ -1,9 +1,9 @@
ps -e | grep python | grep "start_" | awk '{print $1}' | xargs -L1 kill -9
ps -e | grep "fast_vdf/vdf" | awk '{print $1}' | xargs -L1 kill -9
ps -e | grep "fast_vdf/server" | awk '{print $1}' | xargs -L1 kill -9
./lib/chiavdf/fast_vdf/vdf 8889 &
./lib/chiavdf/fast_vdf/server 8889 &
P1=$!
./lib/chiavdf/fast_vdf/vdf 8890 &
./lib/chiavdf/fast_vdf/server 8890 &
P2=$!
python -m src.server.start_plotter &
P3=$!
@ -11,9 +11,9 @@ python -m src.server.start_timelord &
P4=$!
python -m src.server.start_farmer &
P5=$!
python -m src.server.start_full_node "127.0.0.1" 8002 "-f" "-t" &
python -m src.server.start_full_node "127.0.0.1" 8002 "-f" &
P6=$!
python -m src.server.start_full_node "127.0.0.1" 8004 &
python -m src.server.start_full_node "127.0.0.1" 8004 "-t" &
P7=$!
python -m src.server.start_full_node "127.0.0.1" 8005 &
P8=$!

View File

@ -0,0 +1,118 @@
from typing import Tuple, Optional, Dict, Counter
import collections
from asyncio import Lock, Event
from src.types.proof_of_space import ProofOfSpace
from src.types.block_header import BlockHeaderData
from src.types.trunk_block import TrunkBlock
from src.types.block_body import BlockBody
from src.types.full_block import FullBlock
from src.types.sized_bytes import bytes32
from src.util.ints import uint32, uint64
class FullNodeStore:
def __init__(self):
self.lock = Lock()
def initialize(self):
self.full_blocks: Dict[str, FullBlock] = {}
self.sync_mode: bool = True
# Block headers and blocks which we think might be heads, but we haven't verified yet.
# All these are used during sync mode
self.potential_heads: Counter[bytes32] = collections.Counter()
self.potential_heads_full_blocks: Dict[bytes32, FullBlock] = collections.Counter()
# Headers/trunks downloaded for the during sync, by height
self.potential_trunks: Dict[uint32, TrunkBlock] = {}
# Blocks downloaded during sync, by height
self.potential_blocks: Dict[uint32, FullBlock] = {}
# Event, which gets set whenever we receive the block at each height. Waited for by sync().
self.potential_blocks_received: Dict[uint32, Event] = {}
# These are the blocks that we created, but don't have the PoS from farmer yet,
# keyed from the proof of space hash
self.candidate_blocks: Dict[bytes32, Tuple[BlockBody, BlockHeaderData, ProofOfSpace]] = {}
# These are the blocks that we created, have PoS, but not PoT yet, keyed from the
# challenge hash and iterations
self.unfinished_blocks: Dict[Tuple[bytes32, uint64], FullBlock] = {}
# Latest height with unfinished blocks, and expected timestamp of the finishing
self.unfinished_blocks_leader: Tuple[uint32, uint64] = (uint32(0), uint64(9999999999))
self.proof_of_time_estimate_ips: uint64 = uint64(3000)
async def get_lock(self) -> Lock:
return self.lock
async def save_block(self, block: FullBlock):
self.full_blocks[block.header_hash] = block
async def get_block(self, header_hash: str) -> Optional[FullBlock]:
self.full_blocks.get(header_hash)
async def set_sync_mode(self, sync_mode: bool):
self.sync_mode = sync_mode
async def get_sync_mode(self) -> bool:
return self.sync_mode
async def clear_sync_information(self):
self.potential_heads.clear()
self.potential_heads_full_blocks.clear()
self.potential_trunks.clear()
self.potential_blocks.clear()
self.potential_blocks_received.clear()
async def add_potential_head(self, header_hash: bytes32):
self.potential_heads[header_hash] += 1
async def get_potential_heads(self) -> Dict[bytes32, int]:
return self.potential_heads
async def add_potential_heads_full_block(self, block: FullBlock):
self.potential_heads_full_blocks[block.header_hash] = block
async def get_potential_heads_full_block(self, header_hash: str) -> Optional[FullBlock]:
return self.potential_heads_full_blocks.get(header_hash)
async def add_potential_trunk(self, block: TrunkBlock):
self.potential_trunks[block.height] = block
async def get_potential_trunk(self, height: uint32) -> Optional[TrunkBlock]:
return self.potential_trunks.get(height)
async def add_potential_block(self, block: FullBlock):
self.potential_blocks[block.height] = block
async def get_potential_block(self, height: uint32) -> Optional[FullBlock]:
return self.potential_blocks.get(height)
async def set_potential_blocks_received(self, height: uint32, event: Event):
self.potential_blocks_received[height] = event
async def get_potential_blocks_received(self, height: uint32) -> Event:
return self.potential_blocks_received[height]
async def add_candidate_block(self, pos_hash: bytes32, block: Tuple[BlockBody, BlockHeaderData, ProofOfSpace]):
self.candidate_blocks[pos_hash] = block
async def get_candidate_block(self, pos_hash: bytes32) -> Optional[Tuple[BlockBody, BlockHeaderData, ProofOfSpace]]:
return self.candidate_blocks.get(pos_hash)
async def add_unfinished_block(self, key: Tuple[bytes32, uint64], block: FullBlock):
self.unfinished_blocks[key] = block
async def get_unfinished_block(self, key=Tuple[bytes32, uint64]) -> Optional[FullBlock]:
return self.unfinished_blocks.get(key)
async def set_unfinished_block_leader(self, value: Tuple[uint32, uint64]):
self.unfinished_blocks_leader = value
async def get_unfinished_block_leader(self) -> Tuple[uint32, uint64]:
return self.unfinished_blocks_leader
async def set_proof_of_time_estimate_ips(self, estimate: uint64):
self.proof_of_time_estimate_ips = estimate
async def get_proof_of_time_estimate_ips(self) -> uint64:
return self.proof_of_time_estimate_ips

View File

@ -1,7 +1,7 @@
import logging
import asyncio
import io
import yaml
from yaml import safe_load
import time
from asyncio import Lock
from typing import Dict, List
@ -18,7 +18,7 @@ from src.consensus.constants import constants
from src.server.outbound_message import OutboundMessage, Delivery, Message, NodeType
class Database:
class TimelordState:
lock: Lock = Lock()
free_servers: List[int] = []
active_discriminants: Dict = {}
@ -30,9 +30,9 @@ class Database:
log = logging.getLogger(__name__)
config = yaml.safe_load(open("src/config/timelord.yaml", "r"))
db = Database()
db.free_servers = config["vdf_server_ports"]
config = safe_load(open("src/config/timelord.yaml", "r"))
state: TimelordState = TimelordState()
state.free_servers = config["vdf_server_ports"]
@api_request
@ -44,30 +44,30 @@ async def challenge_start(challenge_start: timelord_protocol.ChallengeStart):
forever.
"""
disc: int = create_discriminant(challenge_start.challenge_hash, constants["DISCRIMINANT_SIZE_BITS"])
async with db.lock:
if (challenge_start.challenge_hash in db.seen_discriminants):
async with state.lock:
if (challenge_start.challenge_hash in state.seen_discriminants):
log.info("Already seen this one... Ignoring")
return
db.seen_discriminants.append(challenge_start.challenge_hash)
db.active_heights.append(challenge_start.height)
state.seen_discriminants.append(challenge_start.challenge_hash)
state.active_heights.append(challenge_start.height)
# Wait for a server to become free.
port: int = -1
while port == -1:
async with db.lock:
if (challenge_start.height <= max(db.active_heights) - 3):
db.done_discriminants.append(challenge_start.challenge_hash)
db.active_heights.remove(challenge_start.height)
async with state.lock:
if (challenge_start.height <= max(state.active_heights) - 3):
state.done_discriminants.append(challenge_start.challenge_hash)
state.active_heights.remove(challenge_start.height)
log.info(f"Will not execute challenge at height {challenge_start.height}, too old")
return
assert(len(db.active_heights) > 0)
if (challenge_start.height == max(db.active_heights)):
if (len(db.free_servers) != 0):
port = db.free_servers[0]
db.free_servers = db.free_servers[1:]
assert(len(state.active_heights) > 0)
if (challenge_start.height == max(state.active_heights)):
if (len(state.free_servers) != 0):
port = state.free_servers[0]
state.free_servers = state.free_servers[1:]
log.info(f"Discriminant {str(disc)[:10]}... attached to port {port}.")
log.info(f"Challenge/Height attached is {challenge_start}")
db.active_heights.remove(challenge_start.height)
state.active_heights.remove(challenge_start.height)
# Poll until a server becomes free.
if port == -1:
@ -94,13 +94,13 @@ async def challenge_start(challenge_start: timelord_protocol.ChallengeStart):
log.info("Got handshake with VDF server.")
async with db.lock:
db.active_discriminants[challenge_start.challenge_hash] = writer
db.active_discriminants_start_time[challenge_start.challenge_hash] = time.time()
async with state.lock:
state.active_discriminants[challenge_start.challenge_hash] = writer
state.active_discriminants_start_time[challenge_start.challenge_hash] = time.time()
async with db.lock:
if (challenge_start.challenge_hash in db.pending_iters):
for iter in db.pending_iters[challenge_start.challenge_hash]:
async with state.lock:
if (challenge_start.challenge_hash in state.pending_iters):
for iter in sorted(state.pending_iters[challenge_start.challenge_hash]):
log.info(f"Writing pending iters {challenge_start.challenge_hash}")
writer.write((str(len(str(iter))) + str(iter)).encode())
await writer.drain()
@ -110,21 +110,21 @@ async def challenge_start(challenge_start: timelord_protocol.ChallengeStart):
data = await reader.readexactly(4)
if (data.decode() == "STOP"):
# Server is now available.
async with db.lock:
async with state.lock:
writer.write(b"ACK")
await writer.drain()
db.free_servers.append(port)
state.free_servers.append(port)
break
elif (data.decode() == "POLL"):
async with db.lock:
async with state.lock:
# If I have a newer discriminant... Free up the VDF server
if (len(db.active_heights) > 0 and challenge_start.height < max(db.active_heights)):
if (len(state.active_heights) > 0 and challenge_start.height < max(state.active_heights)):
log.info("Got poll, stopping the challenge!")
writer.write(b'10')
await writer.drain()
del db.active_discriminants[challenge_start.challenge_hash]
del db.active_discriminants_start_time[challenge_start.challenge_hash]
db.done_discriminants.append(challenge_start.challenge_hash)
del state.active_discriminants[challenge_start.challenge_hash]
del state.active_discriminants_start_time[challenge_start.challenge_hash]
state.done_discriminants.append(challenge_start.challenge_hash)
else:
try:
# This must be a proof, read the continuation.
@ -150,8 +150,8 @@ async def challenge_start(challenge_start: timelord_protocol.ChallengeStart):
proof_of_time = ProofOfTime(output, config['n_wesolowski'], [uint8(b) for b in proof_bytes])
response = timelord_protocol.ProofOfTimeFinished(proof_of_time)
async with db.lock:
time_taken = time.time() - db.active_discriminants_start_time[challenge_start.challenge_hash]
async with state.lock:
time_taken = time.time() - state.active_discriminants_start_time[challenge_start.challenge_hash]
ips = int(iterations_needed / time_taken * 10)/10
log.info(f"Finished PoT, chall:{challenge_start.challenge_hash[:10].hex()}.. {iterations_needed}"
f" iters. {int(time_taken*1000)/1000}s, {ips} ips")
@ -165,16 +165,16 @@ async def challenge_end(challenge_end: timelord_protocol.ChallengeEnd):
A challenge is no longer active, so stop the process for this challenge, if it
exists.
"""
async with db.lock:
if (challenge_end.challenge_hash in db.done_discriminants):
async with state.lock:
if (challenge_end.challenge_hash in state.done_discriminants):
return
if (challenge_end.challenge_hash in db.active_discriminants):
writer = db.active_discriminants[challenge_end.challenge_hash]
if (challenge_end.challenge_hash in state.active_discriminants):
writer = state.active_discriminants[challenge_end.challenge_hash]
writer.write(b'10')
await writer.drain()
del db.active_discriminants[challenge_end.challenge_hash]
del db.active_discriminants_start_time[challenge_end.challenge_hash]
db.done_discriminants.append(challenge_end.challenge_hash)
del state.active_discriminants[challenge_end.challenge_hash]
del state.active_discriminants_start_time[challenge_end.challenge_hash]
state.done_discriminants.append(challenge_end.challenge_hash)
await asyncio.sleep(0.5)
@ -187,18 +187,18 @@ async def proof_of_space_info(proof_of_space_info: timelord_protocol.ProofOfSpac
"""
log.info(f"Got Pos info {proof_of_space_info}")
async with db.lock:
if (proof_of_space_info.challenge_hash in db.active_discriminants):
writer = db.active_discriminants[proof_of_space_info.challenge_hash]
async with state.lock:
if (proof_of_space_info.challenge_hash in state.active_discriminants):
writer = state.active_discriminants[proof_of_space_info.challenge_hash]
writer.write(((str(len(str(proof_of_space_info.iterations_needed))) +
str(proof_of_space_info.iterations_needed)).encode()))
await writer.drain()
print("Wrote")
log.info("Wrote")
return
elif (proof_of_space_info.challenge_hash in db.done_discriminants):
print("ALready done")
elif (proof_of_space_info.challenge_hash in state.done_discriminants):
log.info("Already done")
return
elif (proof_of_space_info.challenge_hash not in db.pending_iters):
db.pending_iters[proof_of_space_info.challenge_hash] = []
print("Set to pending")
db.pending_iters[proof_of_space_info.challenge_hash].append(proof_of_space_info.iterations_needed)
elif (proof_of_space_info.challenge_hash not in state.pending_iters):
state.pending_iters[proof_of_space_info.challenge_hash] = []
log.info("Set to pending")
state.pending_iters[proof_of_space_info.challenge_hash].append(proof_of_space_info.iterations_needed)

View File

@ -5,7 +5,6 @@ from src.types.coinbase import CoinbaseInfo
from src.types.fees_target import FeesTarget
from src.types.sized_bytes import bytes32
@streamable
class BlockBody:
coinbase: CoinbaseInfo

View File

@ -1,10 +1,22 @@
from src.util.streamable import streamable
from src.types.sized_bytes import bytes32
from src.util.ints import uint64, uint32
from dataclasses import dataclass
@streamable
# @streamable
@dataclass
class CoinbaseInfo:
height: uint32
amount: uint64
puzzle_hash: bytes32
def f(c: CoinbaseInfo) -> CoinbaseInfo:
return c
a: int = f(124)
b = CoinbaseInfo