Merge pull request #24 from Chia-Network/faster-sync

Faster sync
This commit is contained in:
Mariano Sorgente 2019-11-27 16:41:33 +09:00 committed by GitHub
commit 946f2b5f2e
19 changed files with 525 additions and 266 deletions

View File

@ -81,9 +81,7 @@ proofs of space during testing. The next time tests are run, this won't be neces
Make sure to run mongo before running the tests.
```bash
mongod --dbpath ./db/ &
black src tests
flake8 src
mypy src tests
black src tests && flake8 src && mypy src tests
py.test tests -s -v
```

View File

@ -54,12 +54,13 @@ full_node:
port: 8444
enable_upnp: False
# Don't send any more than these number of headers and blocks, in one message
max_headers_to_send: 100
max_blocks_to_send: 10
max_headers_to_send: 25
max_blocks_to_send: 5
num_sync_batches: 10
# If node is more than these blocks behind, will do a sync
sync_blocks_behind_threshold: 3
# When we are > 1 but < sync_blocks_behind_threshold, download these many blocks
short_sync_download_count: 6
sync_blocks_behind_threshold: 10
# This SSH key is for the ui SSH server
ssh_filename: config/ssh_host_key
# How often to connect to introducer if we need to learn more peers
@ -74,11 +75,10 @@ full_node:
host: 127.0.0.1
port: 8446
introducer_peer:
host: 127.0.0.1
# host: 216.39.16.173
# host: 2001:470:82f3:20:d250:99ff:fed1:5797
# host: 2600:1f18:63ed:e200:f7c5:9e28:9b5e:130
port: 8445
# host: 2600:1f18:63ed:e200:f7c5:9e28:9b5e:130 # Chia AWS introducer IPv6
# host: 127.0.0.1
host: 18.232.223.201 # Chia AWS introducer IPv4
port: 8444
introducer:
host: 127.0.0.1

1
db/.gitignore vendored
View File

@ -1,3 +1,4 @@
# Ignore everything in this directory
*
# Except this file

View File

@ -92,5 +92,30 @@ the challenge for block i. Therefore the timelord can start iterating on their V
i-1 is finalized by another timelord. More information is given in the [greenpaper](https://github.com/Chia-Network/proof-of-space).
...
### Difficulty formula
TODO
### IPS formula
TODO
## Propagation rules
Peers transmit blocks and unfinished blocks between each other. Unfinished blocks are usually created very quickly by farmers after a block has been created at the previous height, since proofs of space are fast to create.
### Blocks
When a full node receives a new block, it is propagated if it is valid and if it is one of our 3 tips.
The tips are the three connected valid blocks with the highest weight, where ties are broken by the order of receipt.
For example, if we see 2 blocks at height 1000 and then 2 blocks at height 2000, A, B, C, and D, then the three
tips are blocks C, A, B.
Therefore, there is a limit to how many valid blocks will get propagated at each height, and therefore how many disk lookups farmers must perform.
This is an important network parameter that is talked about extensively in the greenpaper, since a low tip number will increase the advantage of large farmers.
### Unfinished Blocks
When a full node receives an unfinished block, is is propagated if:
1. It is a child of one of the tips
2. Is the current unfinished block leader, or not expected to be finished much slower than the leader
The unfinished block leader is the unfinished block of greatest height which is expected to finish first,
according to the number of iterations required, and the current network IPS.

View File

@ -66,4 +66,40 @@ one farmer did not double sign a block, such a deep reorg cannot happen.
* **proof_of_time_output_hash**: hash of the proof of time output.
* **height**: height of the block the block.
* **total_weight**: cumulative difficulty of all blocks from genesis, including this one.
* **total_iters** cumulative VDF iterations from genesis, including this one.
* **total_iters** cumulative VDF iterations from genesis, including this one.
## Block Validation
An unfinished block is considered valid if it passes the following checks:
1. If not genesis, the previous block must exist
2. If not genesis, the timestamp must be >= the average timestamp of last 11 blocks, and less than 2 hours in the future
3. The compact block filter must be correct, according to the body (BIP158)
4. The hash of the proof of space must match header_data.proof_of_space_hash
5. The hash of the body must match header_data.body_hash
6. Extension data must be valid, if any is present
7. If not genesis, the hash of the challenge on top of the last block, must be the same as proof_of_space.challenge_hash
8. If genesis, the challenge hash in the proof of time must be the same as in the proof of space
9. The harvester signature must sign the header_hash, with the key in the proof of space
10. The proof of space must be valid on the challenge
11. The coinbase height must be the previous block's coinbase height + 1
12. The coinbase amount must be correct according to reward schedule
13. The coinbase signature must be valid, according the the pool public key
14. All transactions must be valid
15. Aggregate signature retrieved from transactions must be valid
16. Fees must be valid
17. Cost must be valid
A block is considered valid, if it passes the unfinished block checks, and the following additional checks:
1. The proof of space hash must match the proof of space hash in challenge
2. The number of iterations (based on quality, pos, difficulty, ips) must be the same as in the PoT
3. the PoT must be valid, on a discriminant of size 1024, and the challenge_hash
4. The coinbase height must equal the height in the challenge
5. If not genesis, the challenge_hash in the proof of time must match the challenge on the previous block
6. If not genesis, the height of the challenge on the previous block must be one less than on this block
7. If genesis, the challenge height must be 0
8. If not genesis, the total weight must be the parent weight + difficulty
9. If genesis, the total weight must be starting difficulty
10. If not genesis, the total iters must be parent iters + number_iters
11. If genesis, the total iters must be number iters

View File

@ -1,14 +1,33 @@
# Networking and Serialization
## Asynchronous
## Introduction
The Chia protocol is an asynchronous peer to peer protocol running on top of TCP on port 8444, where all nodes act as both clients and servers, and can maintain long term connections with other peers.
Every message in the Chia protocol starts with 4 bytes, which is the encoded length in bytes of the dictionary, followed by a CBOR encoding of the following dictionary:
```json
{
f: string,
d: cbor_encoded_message
}
```
where f is the desired function to call, and data is a CBOR encoded message.
For example, for a RequestBlock Chia protocol message, f is "request_block", while d is a CBOR encoded RequestBlock message.
Chia protocol messages have a max length of `(4 + 2^32 - 1) = 4294967299` bytes, or around 4GB.
## CBOR serialization
CBOR is a serialization format (Concise Binary Object Representation, RFC 7049), which optimizes for
[CBOR](https://cbor.io/) is a serialization format (Concise Binary Object Representation, RFC 7049), which optimizes for
small code size and small message size.
All protocol messages use CBOR, but objects which are hashable, such as blocks, headers, proofs, etc, are serialized to bytes using a more simple steramable format, and transmitted in that way.
## Streamable Format
Chia hashes objects using the simple streamable format.
The streamable format is designed to be deterministic and easy to implement, to prevent consensus issues.
The primitives are:
* Sized ints serialized in in big endian format, i.e uint64
@ -35,8 +54,29 @@ Most objects in the Chia protocol are stored and trasmitted using the streamable
## Handshake
All peers in the Chia protocol (whether they are farmers, full nodes, timelords, etc) act as both servers and clients (peers).
As soon as a connection is initiated between two peers, both send a Handshake message, and a HandshakeAck message to complete the handshake.
```Python
class Handshake:
network_id: str # 'testnet' or 'mainnet'
version: str # Protocol version
node_id: bytes32 # Unique node_id
server_port: uint16 # Listening port
node_type: NodeType # Node type (farmer, full node, etc)
```
After the handshake is completed, both peers can send Chia protocol messages, and disconnect at any time by sending an EOF.
## Ping Pong
## Introducer
Ping pong messages are periodic messages to be sent to peers, to ensure the other peer is still online.
A ping message contains a nonce, which is returned in the pong message.
## Introducer
For a new peer to join the decentralized network, they must choose s subset of all online nodes to connect to.
To facilitate this process, a number of introducer nodes will be run by Chia and other users, which will crawl the network and support one protocol message: GetPeers.
The introducer will then return a random subset of peers, that the calling node will attempt to connect to.

View File

@ -0,0 +1,40 @@
## Reference client networking
The reference client can launch any one of the following servers:
full node, timelord, farmer, harvester, or introducer.
The `ChiaServer` class can be used to start a listening server, or to connect to other clients.
Once running, each connection goes through an asynchronous pipeline in `server.py`, where connections are mapped to messages, which are handled by the correct function, and mapped to outbound messages.
When a protocol message arrives, it's function string is read, and the appropriate python function gets called.
The api_request parses the function data into a python object (from CBOR/streamable format).
All api functions are asynchronous generator, which means they can yield any numbner of responses in an asynchronous manner.
For example, a block message may trigger a block message to other peers, as well as messages to a timelord or farmer.
API functions yield OutboundMessages, which can be converted into Messages based on delivery.
```python
class Message:
# Function to call
function: str
# Message data for that function call
data: Any
class OutboundMessage:
# Type of the peer, 'farmer', 'harvester', 'full_node', etc.
peer_type: NodeType
# Message to send
message: Message
delivery_method: Delivery
```
Delivery types include broadcast, response, broadcast_to_others, etc. Therefore, an api function can yield one outbound message with a broadcast type, which gets mapped into one message for each peer.
A `PeerConnections` object is maintained by the server, which contains all active connections, as well as a Peers object for peers that we know of.
Periodically, the full node connects to an introducer to ask for peers, which the full node can connect to, if it does not have enough.
## Reference client testing

View File

@ -1,12 +1,11 @@
. .venv/bin/activate
. scripts/common.sh
# Starts a harvester, farmer, timelord, introducer and full node
# Starts a harvester, farmer, timelord, and full node
_run_bg_cmd python -m src.server.start_harvester
_run_bg_cmd python -m src.server.start_timelord
_run_bg_cmd python -m src.server.start_farmer
_run_bg_cmd python -m src.server.start_introducer
_run_bg_cmd python -m src.server.start_full_node "127.0.0.1" 8444 -id 1 -f -t -u 8222
wait

View File

@ -8,7 +8,7 @@ _run_bg_cmd python -m src.server.start_timelord
_run_bg_cmd python -m src.server.start_farmer
_run_bg_cmd python -m src.server.start_introducer
_run_bg_cmd python -m src.server.start_full_node "127.0.0.1" 8444 -id 1 -f -t -u 8222
_run_bg_cmd python -m src.server.start_full_node "127.0.0.1" 8002 -id 2
_run_bg_cmd python -m src.server.start_full_node "127.0.0.1" 8005 -id 3
_run_bg_cmd python -m src.server.start_full_node "127.0.0.1" 8002 -id 2 -u 8223
_run_bg_cmd python -m src.server.start_full_node "127.0.0.1" 8005 -id 3 -u 8224
wait

8
scripts/run_introducer.sh Executable file
View File

@ -0,0 +1,8 @@
. .venv/bin/activate
. scripts/common.sh
# Starts an introducer
_run_bg_cmd python -m src.server.start_introducer
wait

View File

@ -47,7 +47,11 @@ class Blockchain:
self.store = store
self.tips: List[FullBlock] = []
self.lca_block: FullBlock
# Defines the path from genesis to the tip
self.height_to_hash: Dict[uint32, bytes32] = {}
# All headers (but not orphans) from genesis to the tip are guaranteed to be in header_blocks
self.header_blocks: Dict[bytes32, HeaderBlock] = {}
async def initialize(self):
seen_blocks = {}
@ -65,6 +69,7 @@ class Blockchain:
for block in reversed(reverse_blocks):
self.height_to_hash[block.height] = block.header_hash
self.header_blocks[block.header_hash] = block.header_block
self.lca_block = self.tips[0]
@ -101,12 +106,11 @@ class Blockchain:
else:
return None
async def get_header_blocks_by_height(
def get_header_blocks_by_height(
self, heights: List[uint32], tip_header_hash: bytes32
) -> List[HeaderBlock]:
"""
Returns a list of header blocks, one for each height requested.
# TODO: optimize, check correctness for large reorgs
"""
if len(heights) == 0:
return []
@ -115,28 +119,20 @@ class Blockchain:
[(height, index) for index, height in enumerate(heights)], reverse=True
)
if sorted_heights[0][0] + 100 < self.lca_block.height:
curr_full_block: Optional[FullBlock] = await self.store.get_block(
self.height_to_hash[sorted_heights[0][0]]
)
else:
curr_full_block = await self.store.get_block(tip_header_hash)
curr_block: Optional[HeaderBlock] = self.header_blocks[tip_header_hash]
if not curr_full_block:
if curr_block is None:
raise BlockNotInBlockchain(
f"Header hash {tip_header_hash} not present in chain."
)
curr_block = curr_full_block.header_block
headers: List[Tuple[int, HeaderBlock]] = []
for height, index in sorted_heights:
if height > curr_block.height:
raise ValueError("Height is not valid for tip {tip_header_hash}")
while height < curr_block.height:
fetched: Optional[FullBlock] = await self.store.get_block(
curr_block.header.data.prev_header_hash
)
assert fetched is not None
curr_block = fetched.header_block
curr_block = self.header_blocks.get(curr_block.prev_header_hash, None)
if curr_block is None:
raise ValueError(f"Do not have header {height}")
headers.append((index, curr_block))
return [b for index, b in sorted(headers)]
@ -449,6 +445,9 @@ class Blockchain:
# Block is valid and connected, so it can be added to the blockchain.
await self.store.save_block(block)
# Cache header in memory
self.header_blocks[block.header_hash] = block.header_block
if await self._reconsider_heads(block, genesis):
return ReceiveBlockResult.ADDED_TO_HEAD
else:
@ -497,20 +496,27 @@ class Blockchain:
# 3. Check filter hash is correct TODO
# 4. Check body hash
# 4. Check the proof of space hash is valid
if (
block.header_block.proof_of_space.get_hash()
!= block.header_block.header.data.proof_of_space_hash
):
return False
# 5. Check body hash
if block.body.get_hash() != block.header_block.header.data.body_hash:
return False
# 5. Check extension data, if any is added
# 6. Check extension data, if any is added
# 6. Compute challenge of parent
# 7. Compute challenge of parent
challenge_hash: bytes32
if not genesis:
assert prev_block
assert prev_block.header_block.challenge
challenge_hash = prev_block.header_block.challenge.get_hash()
# 7. Check challenge hash of prev is the same as in pos
# 8. Check challenge hash of prev is the same as in pos
if challenge_hash != block.header_block.proof_of_space.challenge_hash:
return False
else:
@ -520,19 +526,19 @@ class Blockchain:
if challenge_hash != block.header_block.proof_of_space.challenge_hash:
return False
# 8. Check harvester signature of header data is valid based on harvester key
# 9. Check harvester signature of header data is valid based on harvester key
if not block.header_block.header.harvester_signature.verify(
[blspy.Util.hash256(block.header_block.header.data.get_hash())],
[block.header_block.proof_of_space.plot_pubkey],
):
return False
# 9. Check proof of space based on challenge
# 10. Check proof of space based on challenge
pos_quality = block.header_block.proof_of_space.verify_and_get_quality()
if not pos_quality:
return False
# 10. Check coinbase height = parent coinbase height + 1
# 11. Check coinbase height = parent coinbase height + 1
if not genesis:
assert prev_block
if block.body.coinbase.height != prev_block.body.coinbase.height + 1:
@ -541,27 +547,27 @@ class Blockchain:
if block.body.coinbase.height != 0:
return False
# 11. Check coinbase amount
# 12. Check coinbase amount
if (
calculate_block_reward(block.body.coinbase.height)
!= block.body.coinbase.amount
):
return False
# 12. Check coinbase signature with pool pk
# 13. Check coinbase signature with pool pk
if not block.body.coinbase_signature.verify(
[blspy.Util.hash256(bytes(block.body.coinbase))],
[block.header_block.proof_of_space.pool_pubkey],
):
return False
# TODO: 13a. check transactions
# TODO: 13b. Aggregate transaction results into signature
# TODO: 14a. check transactions
# TODO: 14b. Aggregate transaction results into signature
if block.body.aggregated_signature:
# TODO: 14. check that aggregate signature is valid, based on pubkeys, and messages
# TODO: 15. check that aggregate signature is valid, based on pubkeys, and messages
pass
# TODO: 15. check fees
# TODO: 16. check cost
# TODO: 16. check fees
# TODO: 17. check cost
return True
async def validate_block(self, block: FullBlock, genesis: bool = False) -> bool:
@ -678,6 +684,7 @@ class Blockchain:
fetched: Optional[FullBlock]
if not curr_old or curr_old.height < curr_new.height:
self.height_to_hash[uint32(curr_new.height)] = curr_new.header_hash
self.header_blocks[curr_new.header_hash] = curr_new
if curr_new.height == 0:
return
fetched = await self.store.get_block(curr_new.prev_header_hash)
@ -692,6 +699,7 @@ class Blockchain:
if curr_new.header_hash == curr_old.header_hash:
return
self.height_to_hash[uint32(curr_new.height)] = curr_new.header_hash
self.header_blocks[curr_new.header_hash] = curr_new
fetched_new: Optional[FullBlock] = await self.store.get_block(
curr_new.prev_header_hash
)

View File

@ -1,4 +1,5 @@
import asyncio
import logging
from abc import ABC
from typing import AsyncGenerator, Dict, List, Optional, Tuple
from bson.binary import Binary
@ -15,6 +16,9 @@ from src.util.ints import uint32, uint64
from src.util.streamable import Streamable
log = logging.getLogger(__name__)
class Database(ABC):
# All databases must subclass this so that there's one client
# Ensure mongod service is running
@ -24,6 +28,7 @@ class Database(ABC):
)
def __init__(self, db_name):
log.info("Connecting to mongodb database")
self.db = Database.client.get_database(
db_name,
codec_options=CodecOptions(
@ -34,6 +39,7 @@ class Database(ABC):
)
),
)
log.info("Connected to mongodb database")
class FullNodeStore(Database):
@ -49,6 +55,8 @@ class FullNodeStore(Database):
self.candidate_blocks = self.db.get_collection("candidate_blocks")
# Blocks which are not finalized yet (no proof of time)
self.unfinished_blocks = self.db.get_collection("unfinished_blocks")
# Blocks which we have received but our blockchain dose not reach
self.disconnected_blocks = self.db.get_collection("unfinished_blocks")
# Stored in memory
# Whether or not we are syncing
@ -80,6 +88,7 @@ class FullNodeStore(Database):
await self.potential_blocks.drop()
await self.candidate_blocks.drop()
await self.unfinished_blocks.drop()
await self.disconnected_blocks.drop()
async def save_block(self, block: FullBlock) -> None:
header_hash = block.header_hash
@ -99,6 +108,22 @@ class FullNodeStore(Database):
async for query in self.full_blocks.find({}):
yield FullBlock.from_bytes(query["block"])
async def save_disconnected_block(self, block: FullBlock) -> None:
prev_header_hash = block.prev_header_hash
await self.disconnected_blocks.find_one_and_update(
{"_id": prev_header_hash},
{"$set": {"_id": prev_header_hash, "block": block}},
upsert=True,
)
async def get_disconnected_block(
self, prev_header_hash: bytes32
) -> Optional[FullBlock]:
query = await self.disconnected_blocks.find_one({"_id": prev_header_hash})
if query is not None:
return FullBlock.from_bytes(query["block"])
return None
async def set_sync_mode(self, sync_mode: bool) -> None:
self.sync_mode = sync_mode
@ -121,10 +146,10 @@ class FullNodeStore(Database):
async def get_potential_tip(self, header_hash: bytes32) -> Optional[FullBlock]:
return self.potential_tips.get(header_hash, None)
async def add_potential_header(self, block: HeaderBlock) -> None:
def add_potential_header(self, block: HeaderBlock) -> None:
self.potential_headers[block.height] = block
async def get_potential_header(self, height: uint32) -> Optional[HeaderBlock]:
def get_potential_header(self, height: uint32) -> Optional[HeaderBlock]:
return self.potential_headers.get(height, None)
async def add_potential_block(self, block: FullBlock) -> None:
@ -138,18 +163,16 @@ class FullNodeStore(Database):
query = await self.potential_blocks.find_one({"_id": height})
return FullBlock.from_bytes(query["block"]) if query else None
async def set_potential_headers_received(
self, height: uint32, event: asyncio.Event
):
def set_potential_headers_received(self, height: uint32, event: asyncio.Event):
self.potential_headers_received[height] = event
async def get_potential_headers_received(self, height: uint32) -> asyncio.Event:
def get_potential_headers_received(self, height: uint32) -> asyncio.Event:
return self.potential_headers_received[height]
async def set_potential_blocks_received(self, height: uint32, event: asyncio.Event):
def set_potential_blocks_received(self, height: uint32, event: asyncio.Event):
self.potential_blocks_received[height] = event
async def get_potential_blocks_received(self, height: uint32) -> asyncio.Event:
def get_potential_blocks_received(self, height: uint32) -> asyncio.Event:
return self.potential_blocks_received[height]
async def add_potential_future_block(self, block: FullBlock):

View File

@ -35,7 +35,6 @@ from src.util.api_decorators import api_request
from src.util.errors import (
BlockNotInBlockchain,
InvalidUnfinishedBlock,
PeersDontHaveBlock,
)
from src.util.ints import uint32, uint64
@ -197,8 +196,9 @@ class FullNode:
- Check which are the heaviest tips
- Request headers for the heaviest
- Verify the weight of the tip, using the headers
- Blacklist peers that provide invalid stuff
- Sync blockchain up to heads (request blocks in batches, and add to queue)
- Find the fork point to see where to start downloading blocks
- Blacklist peers that provide invalid blocks
- Sync blockchain up to heads (request blocks in batches)
"""
log.info("Starting to perform sync with peers.")
log.info("Waiting to receive tips from peers.")
@ -207,7 +207,6 @@ class FullNode:
highest_weight: uint64 = uint64(0)
tip_block: FullBlock
tip_height = 0
caught_up = False
# Based on responses from peers about the current heads, see which head is the heaviest
# (similar to longest chain rule).
@ -228,72 +227,106 @@ class FullNode:
[t.weight for t in self.blockchain.get_current_tips()]
):
log.info("Not performing sync, already caught up.")
caught_up = True
if caught_up:
async for msg in self._finish_sync():
yield msg
return
return
assert tip_block
log.info(f"Tip block {tip_block.header_hash} tip height {tip_block.height}")
async with self.store.lock:
for height in range(0, tip_block.height + 1):
await self.store.set_potential_headers_received(uint32(height), Event())
for height in range(0, tip_block.height + 1):
self.store.set_potential_headers_received(uint32(height), Event())
self.store.set_potential_blocks_received(uint32(height), Event())
# Now, we download all of the headers in order to verify the weight
timeout = 60
# Now, we download all of the headers in order to verify the weight, in batches
timeout = 200
sleep_interval = 10
total_headers_coming = 5 * self.config["max_headers_to_send"]
headers: List[HeaderBlock] = []
for start_height in range(0, tip_height + 1, total_headers_coming):
# Download headers in batches. We download a few batches ahead in case there are delays or peers
# that don't have the headers that we need.
last_request_time: float = 0
highest_height_requested: uint32 = uint32(0)
request_made: bool = False
for height_checkpoint in range(
0, tip_height + 1, self.config["max_headers_to_send"]
):
end_height = min(
height_checkpoint + self.config["max_headers_to_send"], tip_height + 1
)
total_time_slept = 0
continue_requesting = True
while continue_requesting:
while True:
if self._shut_down:
return
if total_time_slept > timeout:
raise TimeoutError("Took too long to fetch headers")
for height_offset in range(
0, total_headers_coming, self.config["max_headers_to_send"]
):
new_start_height = start_height + height_offset
end_height = min(
new_start_height + self.config["max_headers_to_send"],
tip_height + 1,
raise TimeoutError("Took too long to fetch blocks")
# Request batches that we don't have yet
for batch in range(0, self.config["num_sync_batches"]):
batch_start = (
height_checkpoint + batch * self.config["max_headers_to_send"]
)
request = peer_protocol.RequestHeaderBlocks(
tip_block.header_block.header.get_hash(),
[uint32(h) for h in range(new_start_height, end_height)],
batch_end = min(
batch_start + self.config["max_headers_to_send"], tip_height + 1
)
yield OutboundMessage(
NodeType.FULL_NODE,
Message("request_header_blocks", request),
Delivery.RANDOM,
if batch_start > tip_height:
# We have asked for all blocks
break
blocks_missing = any(
[
not (
self.store.get_potential_headers_received(uint32(h))
).is_set()
for h in range(batch_start, batch_end)
]
)
end_height = min(tip_height + 1, start_height + total_headers_coming)
if (
time.time() - last_request_time > sleep_interval
and blocks_missing
) or (batch_end - 1) > highest_height_requested:
# If we are missing header blocks in this batch, and we haven't made a request in a while,
# Make a request for this batch. Also, if we have never requested this batch, make
# the request
highest_height_requested = batch_end - 1
request_made = True
request = peer_protocol.RequestHeaderBlocks(
tip_block.header_block.header.get_hash(),
[uint32(h) for h in range(batch_start, batch_end)],
)
yield OutboundMessage(
NodeType.FULL_NODE,
Message("request_header_blocks", request),
Delivery.RANDOM,
)
if request_made:
# Reset the timer for requests, so we don't overload other peers with requests
last_request_time = time.time()
request_made = False
# Wait for the first batch (the next "max_blocks_to_send" blocks to arrive)
awaitables = [
(
await self.store.get_potential_headers_received(uint32(height))
).wait()
for height in range(start_height, end_height)
(self.store.get_potential_headers_received(uint32(height))).wait()
for height in range(height_checkpoint, end_height)
]
try:
await asyncio.wait_for(
asyncio.gather(*awaitables), timeout=sleep_interval
)
continue_requesting = False
break
except concurrent.futures.TimeoutError:
total_time_slept += sleep_interval
log.info("Did not receive desired headers")
log.info("Did not receive desired header blocks")
pass
async with self.store.lock:
for h in range(0, tip_height + 1):
header = await self.store.get_potential_header(uint32(h))
header = self.store.get_potential_header(uint32(h))
assert header is not None
headers.append(header)
log.error(f"Downloaded headers up to tip height: {tip_height}")
if not verify_weight(tip_block.header_block, headers):
# TODO: ban peers that provided the invalid heads or proofs
raise errors.InvalidWeight(
f"Weight of {tip_block.header_block.header.get_hash()} not valid."
)
@ -303,88 +336,122 @@ class FullNode:
)
assert tip_height + 1 == len(headers)
# Finding the fork point allows us to only download blocks from the fork point
async with self.store.lock:
fork_point: HeaderBlock = self.blockchain.find_fork_point(headers)
# TODO: optimize, send many requests at once, and for more blocks
for height in range(fork_point.height + 1, tip_height + 1):
# Only download from fork point (what we don't have)
async with self.store.lock:
have_block = (
await self.store.get_potential_tip(
headers[height].header.get_hash()
)
is not None
)
# Download blocks in batches, and verify them as they come in. We download a few batches ahead,
# in case there are delays.
last_request_time = 0
highest_height_requested = uint32(0)
request_made = False
for height_checkpoint in range(
fork_point.height + 1, tip_height + 1, self.config["max_blocks_to_send"]
):
end_height = min(
height_checkpoint + self.config["max_blocks_to_send"], tip_height + 1
)
if not have_block:
request_sync = peer_protocol.RequestSyncBlocks(
tip_block.header_block.header.header_hash, [uint32(height)]
)
async with self.store.lock:
await self.store.set_potential_blocks_received(
uint32(height), Event()
total_time_slept = 0
while True:
if self._shut_down:
return
if total_time_slept > timeout:
raise TimeoutError("Took too long to fetch blocks")
# Request batches that we don't have yet
for batch in range(0, self.config["num_sync_batches"]):
batch_start = (
height_checkpoint + batch * self.config["max_blocks_to_send"]
)
found = False
for _ in range(30):
if self._shut_down:
return
log.info(f"Requesting blocks {request_sync.heights}")
yield OutboundMessage(
NodeType.FULL_NODE,
Message("request_sync_blocks", request_sync),
Delivery.RANDOM,
batch_end = min(
batch_start + self.config["max_blocks_to_send"], tip_height + 1
)
try:
await asyncio.wait_for(
(
await self.store.get_potential_blocks_received(
uint32(height)
)
).wait(),
timeout=10,
)
found = True
if batch_start > tip_height:
# We have asked for all blocks
break
except concurrent.futures.TimeoutError:
log.info("Did not receive desired block")
if not found:
raise PeersDontHaveBlock(
f"Did not receive desired block at height {height}"
)
async with self.store.lock:
# TODO: ban peers that provide bad blocks
if have_block:
block = await self.store.get_potential_tip(
headers[height].header.get_hash()
)
else:
block = await self.store.get_potential_block(uint32(height))
blocks_missing = any(
[
not (
self.store.get_potential_blocks_received(uint32(h))
).is_set()
for h in range(batch_start, batch_end)
]
)
if (
time.time() - last_request_time > sleep_interval
and blocks_missing
) or (batch_end - 1) > highest_height_requested:
# If we are missing blocks in this batch, and we haven't made a request in a while,
# Make a request for this batch. Also, if we have never requested this batch, make
# the request
log.info(
f"Requesting sync blocks {[i for i in range(batch_start, batch_end)]}"
)
highest_height_requested = batch_end - 1
request_made = True
request_sync = peer_protocol.RequestSyncBlocks(
tip_block.header_block.header.header_hash,
[
uint32(height)
for height in range(batch_start, batch_end)
],
)
yield OutboundMessage(
NodeType.FULL_NODE,
Message("request_sync_blocks", request_sync),
Delivery.RANDOM,
)
if request_made:
# Reset the timer for requests, so we don't overload other peers with requests
last_request_time = time.time()
request_made = False
# Wait for the first batch (the next "max_blocks_to_send" blocks to arrive)
awaitables = [
(self.store.get_potential_blocks_received(uint32(height))).wait()
for height in range(height_checkpoint, end_height)
]
try:
await asyncio.wait_for(
asyncio.gather(*awaitables), timeout=sleep_interval
)
break
except concurrent.futures.TimeoutError:
total_time_slept += sleep_interval
log.info("Did not receive desired blocks")
pass
# Verifies this batch, which we are guaranteed to have (since we broke from the above loop)
for height in range(height_checkpoint, end_height):
if self._shut_down:
return
block = await self.store.get_potential_block(uint32(height))
assert block is not None
start = time.time()
result = await self.blockchain.receive_block(block)
if (
result == ReceiveBlockResult.INVALID_BLOCK
or result == ReceiveBlockResult.DISCONNECTED_BLOCK
):
raise RuntimeError(f"Invalid block {block.header_hash}")
log.info(
f"Took {time.time() - start} seconds to validate and add block {block.height}."
)
assert (
max([h.height for h in self.blockchain.get_current_tips()])
>= height
)
await self.store.set_proof_of_time_estimate_ips(
await self.blockchain.get_next_ips(block.header_hash)
)
async with self.store.lock:
# The block gets permanantly added to the blockchain
result = await self.blockchain.receive_block(block)
if (
result == ReceiveBlockResult.INVALID_BLOCK
or result == ReceiveBlockResult.DISCONNECTED_BLOCK
):
raise RuntimeError(f"Invalid block {block.header_hash}")
log.info(
f"Took {time.time() - start} seconds to validate and add block {block.height}."
)
assert (
max([h.height for h in self.blockchain.get_current_tips()])
>= height
)
await self.store.set_proof_of_time_estimate_ips(
await self.blockchain.get_next_ips(block.header_hash)
)
assert max([h.height for h in self.blockchain.get_current_tips()]) == tip_height
log.info(f"Finished sync up to height {tip_height}")
async for msg in self._finish_sync():
yield msg
async def _finish_sync(self):
"""
Finalize sync by setting sync mode to False, clearing all sync information, and adding any final
@ -398,6 +465,8 @@ class FullNode:
await self.store.clear_sync_info()
for block in potential_fut_blocks:
if self._shut_down:
return
async for msg in self.block(peer_protocol.Block(block)):
yield msg
@ -414,6 +483,7 @@ class FullNode:
"""
A peer requests a list of header blocks, by height. Used for syncing or light clients.
"""
start = time.time()
if len(request.heights) > self.config["max_headers_to_send"]:
raise errors.TooManyheadersRequested(
f"The max number of headers is {self.config['max_headers_to_send']},\
@ -421,11 +491,10 @@ class FullNode:
)
try:
headers: List[
HeaderBlock
] = await self.blockchain.get_header_blocks_by_height(
headers: List[HeaderBlock] = self.blockchain.get_header_blocks_by_height(
request.heights, request.tip_header_hash
)
log.info(f"Got header blocks by height {time.time() - start}")
except KeyError:
return
except BlockNotInBlockchain as e:
@ -446,10 +515,8 @@ class FullNode:
"""
async with self.store.lock:
for header_block in request.header_blocks:
await self.store.add_potential_header(header_block)
(
await self.store.get_potential_headers_received(header_block.height)
).set()
self.store.add_potential_header(header_block)
(self.store.get_potential_headers_received(header_block.height)).set()
for _ in []: # Yields nothing
yield _
@ -475,7 +542,7 @@ class FullNode:
try:
header_blocks: List[
HeaderBlock
] = await self.blockchain.get_header_blocks_by_height(
] = self.blockchain.get_header_blocks_by_height(
request.heights, request.tip_header_hash
)
for header_block in header_blocks:
@ -514,7 +581,7 @@ class FullNode:
for block in request.blocks:
await self.store.add_potential_block(block)
(await self.store.get_potential_blocks_received(block.height)).set()
(self.store.get_potential_blocks_received(block.height)).set()
for _ in []: # Yields nothing
yield _
@ -744,43 +811,38 @@ class FullNode:
We can validate it and if it's a good block, propagate it to other peers and
timelords.
"""
async with self.store.lock:
if not self.blockchain.is_child_of_head(unfinished_block.block):
return
if not self.blockchain.is_child_of_head(unfinished_block.block):
return
if not await self.blockchain.validate_unfinished_block(
unfinished_block.block
):
raise InvalidUnfinishedBlock()
if not await self.blockchain.validate_unfinished_block(unfinished_block.block):
raise InvalidUnfinishedBlock()
prev_block: Optional[HeaderBlock] = await self.blockchain.get_header_block(
unfinished_block.block.prev_header_hash
)
assert prev_block
assert prev_block.challenge
prev_block: Optional[HeaderBlock] = await self.blockchain.get_header_block(
unfinished_block.block.prev_header_hash
)
assert prev_block
assert prev_block.challenge
challenge_hash: bytes32 = prev_block.challenge.get_hash()
difficulty: uint64 = await self.blockchain.get_next_difficulty(
unfinished_block.block.header_block.prev_header_hash
)
vdf_ips: uint64 = await self.blockchain.get_next_ips(
unfinished_block.block.header_block.prev_header_hash
)
challenge_hash: bytes32 = prev_block.challenge.get_hash()
difficulty: uint64 = await self.blockchain.get_next_difficulty(
unfinished_block.block.header_block.prev_header_hash
)
vdf_ips: uint64 = await self.blockchain.get_next_ips(
unfinished_block.block.header_block.prev_header_hash
)
iterations_needed: uint64 = calculate_iterations(
unfinished_block.block.header_block.proof_of_space,
difficulty,
vdf_ips,
constants["MIN_BLOCK_TIME"],
)
iterations_needed: uint64 = calculate_iterations(
unfinished_block.block.header_block.proof_of_space,
difficulty,
vdf_ips,
constants["MIN_BLOCK_TIME"],
)
if (
await self.store.get_unfinished_block(
(challenge_hash, iterations_needed)
)
is not None
):
return
if (
await self.store.get_unfinished_block((challenge_hash, iterations_needed))
is not None
):
return
expected_time: uint64 = uint64(
int(iterations_needed / (await self.store.get_proof_of_time_estimate_ips()))
@ -899,24 +961,36 @@ class FullNode:
except BaseException as e:
log.warning(f"Error {type(e)}{e} with syncing")
finally:
async with self.store.lock:
await self.store.set_sync_mode(False)
await self.store.clear_sync_info()
return
async for msg in self._finish_sync():
yield msg
elif block.block.height > tip_height + 1:
log.info(
f"We are a few blocks behind, our height is {tip_height} and block is at "
f"{block.block.height} so we will request these blocks."
f"{block.block.height} so we will request the previous block."
)
while True:
# TODO: download a few blocks and add them to chain
# prev_block_hash = block.block.header_block.header.data.prev_header_hash
break
msg = Message(
"request_block",
peer_protocol.RequestBlock(block.block.prev_header_hash),
)
async with self.store.lock:
# If we have already requested the prev block, don't do so again
if (
await self.store.get_disconnected_block(
block.block.prev_header_hash
)
is not None
):
log.info(
f"Have already requested block {block.block.prev_header_hash}"
)
return
await self.store.save_disconnected_block(block.block)
yield OutboundMessage(NodeType.FULL_NODE, msg, Delivery.RANDOM)
return
elif added == ReceiveBlockResult.ADDED_TO_HEAD:
# Only propagate blocks which extend the blockchain (becomes one of the heads)
# A deep reorg happens can be deteceted when we add a block to the heads, that has a worse
# A deep reorg happens can be detected when we add a block to the heads, that has a worse
# height than the worst one (assuming we had a full set of heads).
deep_reorg: bool = (block.block.height < least_height) and full_heads
ips_changed: bool = False
@ -987,6 +1061,15 @@ class FullNode:
Message("proof_of_space_finalized", farmer_request),
Delivery.BROADCAST,
)
# Recursively process the next block if we have it
async with self.store.lock:
next_block: Optional[
FullBlock
] = await self.store.get_disconnected_block(block.block.header_hash)
if next_block is not None:
async for msg in self.block(peer_protocol.Block(next_block)):
yield msg
elif added == ReceiveBlockResult.ADDED_AS_ORPHAN:
assert block.block.header_block.proof_of_time
assert block.block.header_block.challenge
@ -997,6 +1080,26 @@ class FullNode:
# Should never reach here, all the cases are covered
assert False
async with self.store.lock:
pass
# TODO: Remove all unfinished old blocks
# TODO: Remove all candidate old blocks
# TODO: Remove all disconnected old blocks
@api_request
async def request_block(
self, request_block: peer_protocol.RequestBlock
) -> AsyncGenerator[OutboundMessage, None]:
block: Optional[FullBlock] = await self.store.get_block(
request_block.header_hash
)
if block is not None:
yield OutboundMessage(
NodeType.FULL_NODE,
Message("block", peer_protocol.Block(block)),
Delivery.RESPOND,
)
@api_request
async def peers(
self, request: peer_protocol.Peers

View File

@ -1,27 +0,0 @@
# import asyncio
# import logging
# import sys
# from src.full_node import FullNode
# from src.server.server import ChiaServer
# from src.util.network import parse_host_port, create_node_id
# from src.server.outbound_message import NodeType, OutboundMessage, Message, Delivery
# from src.types.peer_info import PeerInfo
# from src.database import FullNodeStore
# from src.blockchain import Blockchain
# async def start_client_to_full_node(host, port):
# store = FullNodeStore()
# await store.initialize()
# blockchain = Blockchain(store)
# await blockchain.initialize()
# full_node = FullNode(store, blockchain)
# server = ChiaServer(9000, full_node, NodeType.FULL_NODE)
# res = await server.start_client(PeerInfo(host, port), None)
# print(res)
# m = Message("block", {})
# server.push_message(OutboundMessage(NodeType.FULL_NODE, m, Delivery.BROADCAST))
# await server.await_closed()
# asyncio.run(start_client_to_full_node("beast.44monty.chia.net", 8444))

View File

@ -120,6 +120,7 @@ class ChiaServer:
return False
start_time = time.time()
succeeded: bool = False
last_error = None
while time.time() - start_time < TOTAL_RETRY_SECONDS and not succeeded:
if self._pipeline_task.done():
return False
@ -136,14 +137,14 @@ class ChiaServer:
OSError,
asyncio.TimeoutError,
) as e:
log.warning(
f"Could not connct to {target_node}. {type(e)}{e}. Retrying."
)
last_error = e
open_con_task.cancel()
await asyncio.sleep(RETRY_INTERVAL)
if not succeeded:
if self.global_connections.peers.remove(target_node):
log.info(f"Removed {target_node} from peer list")
log.warning(
f"Could not connect to {target_node}. {type(last_error)}{last_error}. Aborting and removing peer."
)
self.global_connections.peers.remove(target_node)
return False
if on_connect is not None:
self._on_connect_callbacks[target_node] = on_connect

View File

@ -7,8 +7,12 @@ import miniupnpc
from src.blockchain import Blockchain
from src.database import FullNodeStore
from src.full_node import FullNode
# from src.util.ints import uint16
from src.server.outbound_message import NodeType
from src.server.server import ChiaServer
# from src.server.local_api_server import FullNodeLocalApi
from src.types.peer_info import PeerInfo
from src.util.network import parse_host_port
@ -30,6 +34,7 @@ async def main():
store = FullNodeStore(f"fndb_{db_id}")
blockchain = Blockchain(store)
log.info("Initializing blockchain from disk")
await blockchain.initialize()
full_node = FullNode(store, blockchain)
@ -37,9 +42,10 @@ async def main():
host, port = parse_host_port(full_node)
if full_node.config["enable_upnp"]:
log.info(f"Attempting to enable UPnP (open up port {port})")
try:
upnp = miniupnpc.UPnP()
upnp.discoverdelay = 10
upnp.discoverdelay = 5
upnp.discover()
upnp.selectigd()
upnp.addportmapping(port, "TCP", upnp.lanaddr, port, "chia", "")

View File

@ -332,7 +332,7 @@ class FullNodeUI:
self.latest_blocks_labels[i].text = (
f"{b.height}:{b.header_hash}"
f" {'LCA' if b.header_hash == lca_block.header_hash else ''}"
f" {'HEAD' if b.header_hash in [h.header_hash for h in heads] else ''}"
f" {'TIP' if b.header_hash in [h.header_hash for h in heads] else ''}"
)
self.latest_blocks_labels[i].handler = self.change_route_handler(
f"block/{b.header_hash}"
@ -340,7 +340,7 @@ class FullNodeUI:
new_labels.append(self.latest_blocks_labels[i])
self.lca_label.text = f"Current least common ancestor {lca_block.header_hash} height {lca_block.height}"
self.current_heads_label.text = "Heights of heads: " + str(
self.current_heads_label.text = "Heights of tips: " + str(
[h.height for h in heads]
)
self.difficulty_label.text = f"Current difficuty: {difficulty}"

View File

@ -14,7 +14,7 @@ from src.types.full_block import FullBlock
from src.types.header import Header, HeaderData
from src.types.header_block import HeaderBlock
from src.types.proof_of_space import ProofOfSpace
from src.util.ints import uint32, uint64
from src.util.ints import uint32, uint64, uint8
from tests.block_tools import BlockTools
bt = BlockTools()
@ -52,9 +52,7 @@ class TestGenesisBlock:
assert genesis_block.height == 0
assert genesis_block.challenge
assert (
await bc1.get_header_blocks_by_height(
[uint32(0)], genesis_block.header_hash
)
bc1.get_header_blocks_by_height([uint32(0)], genesis_block.header_hash)
)[0] == genesis_block
assert (
await bc1.get_next_difficulty(genesis_block.header_hash)
@ -200,8 +198,8 @@ class TestBlockValidation:
async def test_invalid_pos(self, initial_blockchain):
blocks, b = initial_blockchain
bad_pos = blocks[9].header_block.proof_of_space.proof
bad_pos[0] = (bad_pos[0] + 1) % 256
bad_pos = [i for i in blocks[9].header_block.proof_of_space.proof]
bad_pos[0] = uint8((bad_pos[0] + 1) % 256)
# Proof of space invalid
block_bad = FullBlock(
HeaderBlock(

View File

@ -38,8 +38,8 @@ class TestDatabase:
# add/get potential trunk
header = genesis.header_block
await db.add_potential_header(header)
assert await db.get_potential_header(genesis.height) == header
db.add_potential_header(header)
assert db.get_potential_header(genesis.height) == header
# Add potential block
await db.add_potential_block(genesis)