diff --git a/README.md b/README.md index ff6e9e90413b..8c8cb8ae346e 100644 --- a/README.md +++ b/README.md @@ -81,9 +81,7 @@ proofs of space during testing. The next time tests are run, this won't be neces Make sure to run mongo before running the tests. ```bash mongod --dbpath ./db/ & -black src tests -flake8 src -mypy src tests +black src tests && flake8 src && mypy src tests py.test tests -s -v ``` diff --git a/config/config.yaml b/config/config.yaml index 404e96d8c9db..76bb23d76224 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -54,12 +54,13 @@ full_node: port: 8444 enable_upnp: False # Don't send any more than these number of headers and blocks, in one message - max_headers_to_send: 100 - max_blocks_to_send: 10 + max_headers_to_send: 25 + max_blocks_to_send: 5 + num_sync_batches: 10 + # If node is more than these blocks behind, will do a sync - sync_blocks_behind_threshold: 3 - # When we are > 1 but < sync_blocks_behind_threshold, download these many blocks - short_sync_download_count: 6 + sync_blocks_behind_threshold: 10 + # This SSH key is for the ui SSH server ssh_filename: config/ssh_host_key # How often to connect to introducer if we need to learn more peers @@ -74,11 +75,10 @@ full_node: host: 127.0.0.1 port: 8446 introducer_peer: - host: 127.0.0.1 - # host: 216.39.16.173 - # host: 2001:470:82f3:20:d250:99ff:fed1:5797 - # host: 2600:1f18:63ed:e200:f7c5:9e28:9b5e:130 - port: 8445 + # host: 2600:1f18:63ed:e200:f7c5:9e28:9b5e:130 # Chia AWS introducer IPv6 + # host: 127.0.0.1 + host: 18.232.223.201 # Chia AWS introducer IPv4 + port: 8444 introducer: host: 127.0.0.1 diff --git a/db/.gitignore b/db/.gitignore index 86d0cb2726c6..d37ccd8db44c 100644 --- a/db/.gitignore +++ b/db/.gitignore @@ -1,3 +1,4 @@ + # Ignore everything in this directory * # Except this file diff --git a/docs/1-consensus-algorithm-summary.md b/docs/1-consensus-algorithm-summary.md index f8b289e3648b..7f38b64c58b6 100644 --- a/docs/1-consensus-algorithm-summary.md +++ b/docs/1-consensus-algorithm-summary.md @@ -92,5 +92,30 @@ the challenge for block i. Therefore the timelord can start iterating on their V i-1 is finalized by another timelord. More information is given in the [greenpaper](https://github.com/Chia-Network/proof-of-space). -... +### Difficulty formula +TODO + +### IPS formula +TODO + +## Propagation rules + +Peers transmit blocks and unfinished blocks between each other. Unfinished blocks are usually created very quickly by farmers after a block has been created at the previous height, since proofs of space are fast to create. + +### Blocks +When a full node receives a new block, it is propagated if it is valid and if it is one of our 3 tips. +The tips are the three connected valid blocks with the highest weight, where ties are broken by the order of receipt. + +For example, if we see 2 blocks at height 1000 and then 2 blocks at height 2000, A, B, C, and D, then the three +tips are blocks C, A, B. +Therefore, there is a limit to how many valid blocks will get propagated at each height, and therefore how many disk lookups farmers must perform. +This is an important network parameter that is talked about extensively in the greenpaper, since a low tip number will increase the advantage of large farmers. + +### Unfinished Blocks +When a full node receives an unfinished block, is is propagated if: +1. It is a child of one of the tips +2. Is the current unfinished block leader, or not expected to be finished much slower than the leader + +The unfinished block leader is the unfinished block of greatest height which is expected to finish first, +according to the number of iterations required, and the current network IPS. diff --git a/docs/2-block-format.md b/docs/2-block-format.md index 77d8f313a50f..cd1118b61a48 100644 --- a/docs/2-block-format.md +++ b/docs/2-block-format.md @@ -66,4 +66,40 @@ one farmer did not double sign a block, such a deep reorg cannot happen. * **proof_of_time_output_hash**: hash of the proof of time output. * **height**: height of the block the block. * **total_weight**: cumulative difficulty of all blocks from genesis, including this one. -* **total_iters** cumulative VDF iterations from genesis, including this one. \ No newline at end of file +* **total_iters** cumulative VDF iterations from genesis, including this one. + + +## Block Validation +An unfinished block is considered valid if it passes the following checks: + +1. If not genesis, the previous block must exist +2. If not genesis, the timestamp must be >= the average timestamp of last 11 blocks, and less than 2 hours in the future +3. The compact block filter must be correct, according to the body (BIP158) +4. The hash of the proof of space must match header_data.proof_of_space_hash +5. The hash of the body must match header_data.body_hash +6. Extension data must be valid, if any is present +7. If not genesis, the hash of the challenge on top of the last block, must be the same as proof_of_space.challenge_hash +8. If genesis, the challenge hash in the proof of time must be the same as in the proof of space +9. The harvester signature must sign the header_hash, with the key in the proof of space +10. The proof of space must be valid on the challenge +11. The coinbase height must be the previous block's coinbase height + 1 +12. The coinbase amount must be correct according to reward schedule +13. The coinbase signature must be valid, according the the pool public key +14. All transactions must be valid +15. Aggregate signature retrieved from transactions must be valid +16. Fees must be valid +17. Cost must be valid + +A block is considered valid, if it passes the unfinished block checks, and the following additional checks: + +1. The proof of space hash must match the proof of space hash in challenge +2. The number of iterations (based on quality, pos, difficulty, ips) must be the same as in the PoT +3. the PoT must be valid, on a discriminant of size 1024, and the challenge_hash +4. The coinbase height must equal the height in the challenge +5. If not genesis, the challenge_hash in the proof of time must match the challenge on the previous block +6. If not genesis, the height of the challenge on the previous block must be one less than on this block +7. If genesis, the challenge height must be 0 +8. If not genesis, the total weight must be the parent weight + difficulty +9. If genesis, the total weight must be starting difficulty +10. If not genesis, the total iters must be parent iters + number_iters +11. If genesis, the total iters must be number iters diff --git a/docs/4-networking-and-serialization.md b/docs/4-networking-and-serialization.md index bab58d2ced4b..2bbec3f69b60 100644 --- a/docs/4-networking-and-serialization.md +++ b/docs/4-networking-and-serialization.md @@ -1,14 +1,33 @@ # Networking and Serialization -## Asynchronous +## Introduction + +The Chia protocol is an asynchronous peer to peer protocol running on top of TCP on port 8444, where all nodes act as both clients and servers, and can maintain long term connections with other peers. + +Every message in the Chia protocol starts with 4 bytes, which is the encoded length in bytes of the dictionary, followed by a CBOR encoding of the following dictionary: + + +```json +{ + f: string, + d: cbor_encoded_message +} +``` + +where f is the desired function to call, and data is a CBOR encoded message. +For example, for a RequestBlock Chia protocol message, f is "request_block", while d is a CBOR encoded RequestBlock message. + +Chia protocol messages have a max length of `(4 + 2^32 - 1) = 4294967299` bytes, or around 4GB. ## CBOR serialization -CBOR is a serialization format (Concise Binary Object Representation, RFC 7049), which optimizes for +[CBOR](https://cbor.io/) is a serialization format (Concise Binary Object Representation, RFC 7049), which optimizes for small code size and small message size. +All protocol messages use CBOR, but objects which are hashable, such as blocks, headers, proofs, etc, are serialized to bytes using a more simple steramable format, and transmitted in that way. + ## Streamable Format -Chia hashes objects using the simple streamable format. +The streamable format is designed to be deterministic and easy to implement, to prevent consensus issues. The primitives are: * Sized ints serialized in in big endian format, i.e uint64 @@ -35,8 +54,29 @@ Most objects in the Chia protocol are stored and trasmitted using the streamable ## Handshake +All peers in the Chia protocol (whether they are farmers, full nodes, timelords, etc) act as both servers and clients (peers). +As soon as a connection is initiated between two peers, both send a Handshake message, and a HandshakeAck message to complete the handshake. +```Python +class Handshake: + network_id: str # 'testnet' or 'mainnet' + version: str # Protocol version + node_id: bytes32 # Unique node_id + server_port: uint16 # Listening port + node_type: NodeType # Node type (farmer, full node, etc) +``` + +After the handshake is completed, both peers can send Chia protocol messages, and disconnect at any time by sending an EOF. + ## Ping Pong -## Introducer \ No newline at end of file +Ping pong messages are periodic messages to be sent to peers, to ensure the other peer is still online. +A ping message contains a nonce, which is returned in the pong message. + +## Introducer + +For a new peer to join the decentralized network, they must choose s subset of all online nodes to connect to. + +To facilitate this process, a number of introducer nodes will be run by Chia and other users, which will crawl the network and support one protocol message: GetPeers. +The introducer will then return a random subset of peers, that the calling node will attempt to connect to. diff --git a/docs/6-codebase-and-testing.md b/docs/6-codebase-and-testing.md index e69de29bb2d1..a71c30df2042 100644 --- a/docs/6-codebase-and-testing.md +++ b/docs/6-codebase-and-testing.md @@ -0,0 +1,40 @@ + +## Reference client networking + +The reference client can launch any one of the following servers: +full node, timelord, farmer, harvester, or introducer. + +The `ChiaServer` class can be used to start a listening server, or to connect to other clients. +Once running, each connection goes through an asynchronous pipeline in `server.py`, where connections are mapped to messages, which are handled by the correct function, and mapped to outbound messages. + +When a protocol message arrives, it's function string is read, and the appropriate python function gets called. +The api_request parses the function data into a python object (from CBOR/streamable format). +All api functions are asynchronous generator, which means they can yield any numbner of responses in an asynchronous manner. + +For example, a block message may trigger a block message to other peers, as well as messages to a timelord or farmer. + +API functions yield OutboundMessages, which can be converted into Messages based on delivery. + + +```python +class Message: + # Function to call + function: str + # Message data for that function call + data: Any + +class OutboundMessage: + # Type of the peer, 'farmer', 'harvester', 'full_node', etc. + peer_type: NodeType + # Message to send + message: Message + delivery_method: Delivery +``` + +Delivery types include broadcast, response, broadcast_to_others, etc. Therefore, an api function can yield one outbound message with a broadcast type, which gets mapped into one message for each peer. + +A `PeerConnections` object is maintained by the server, which contains all active connections, as well as a Peers object for peers that we know of. +Periodically, the full node connects to an introducer to ask for peers, which the full node can connect to, if it does not have enough. + + +## Reference client testing \ No newline at end of file diff --git a/scripts/run_all.sh b/scripts/run_all.sh index a5dd44d096c6..94b3df613f28 100755 --- a/scripts/run_all.sh +++ b/scripts/run_all.sh @@ -1,12 +1,11 @@ . .venv/bin/activate . scripts/common.sh -# Starts a harvester, farmer, timelord, introducer and full node +# Starts a harvester, farmer, timelord, and full node _run_bg_cmd python -m src.server.start_harvester _run_bg_cmd python -m src.server.start_timelord _run_bg_cmd python -m src.server.start_farmer -_run_bg_cmd python -m src.server.start_introducer _run_bg_cmd python -m src.server.start_full_node "127.0.0.1" 8444 -id 1 -f -t -u 8222 wait diff --git a/scripts/run_all_simulation.sh b/scripts/run_all_simulation.sh index 21b9a1def6cd..30156646e9eb 100755 --- a/scripts/run_all_simulation.sh +++ b/scripts/run_all_simulation.sh @@ -8,7 +8,7 @@ _run_bg_cmd python -m src.server.start_timelord _run_bg_cmd python -m src.server.start_farmer _run_bg_cmd python -m src.server.start_introducer _run_bg_cmd python -m src.server.start_full_node "127.0.0.1" 8444 -id 1 -f -t -u 8222 -_run_bg_cmd python -m src.server.start_full_node "127.0.0.1" 8002 -id 2 -_run_bg_cmd python -m src.server.start_full_node "127.0.0.1" 8005 -id 3 +_run_bg_cmd python -m src.server.start_full_node "127.0.0.1" 8002 -id 2 -u 8223 +_run_bg_cmd python -m src.server.start_full_node "127.0.0.1" 8005 -id 3 -u 8224 wait diff --git a/scripts/run_introducer.sh b/scripts/run_introducer.sh new file mode 100755 index 000000000000..406769194749 --- /dev/null +++ b/scripts/run_introducer.sh @@ -0,0 +1,8 @@ +. .venv/bin/activate +. scripts/common.sh + +# Starts an introducer + +_run_bg_cmd python -m src.server.start_introducer + +wait diff --git a/src/blockchain.py b/src/blockchain.py index 0d550ba2910b..0f6799d8aa3a 100644 --- a/src/blockchain.py +++ b/src/blockchain.py @@ -47,7 +47,11 @@ class Blockchain: self.store = store self.tips: List[FullBlock] = [] self.lca_block: FullBlock + + # Defines the path from genesis to the tip self.height_to_hash: Dict[uint32, bytes32] = {} + # All headers (but not orphans) from genesis to the tip are guaranteed to be in header_blocks + self.header_blocks: Dict[bytes32, HeaderBlock] = {} async def initialize(self): seen_blocks = {} @@ -65,6 +69,7 @@ class Blockchain: for block in reversed(reverse_blocks): self.height_to_hash[block.height] = block.header_hash + self.header_blocks[block.header_hash] = block.header_block self.lca_block = self.tips[0] @@ -101,12 +106,11 @@ class Blockchain: else: return None - async def get_header_blocks_by_height( + def get_header_blocks_by_height( self, heights: List[uint32], tip_header_hash: bytes32 ) -> List[HeaderBlock]: """ Returns a list of header blocks, one for each height requested. - # TODO: optimize, check correctness for large reorgs """ if len(heights) == 0: return [] @@ -115,28 +119,20 @@ class Blockchain: [(height, index) for index, height in enumerate(heights)], reverse=True ) - if sorted_heights[0][0] + 100 < self.lca_block.height: - curr_full_block: Optional[FullBlock] = await self.store.get_block( - self.height_to_hash[sorted_heights[0][0]] - ) - else: - curr_full_block = await self.store.get_block(tip_header_hash) + curr_block: Optional[HeaderBlock] = self.header_blocks[tip_header_hash] - if not curr_full_block: + if curr_block is None: raise BlockNotInBlockchain( f"Header hash {tip_header_hash} not present in chain." ) - curr_block = curr_full_block.header_block headers: List[Tuple[int, HeaderBlock]] = [] for height, index in sorted_heights: if height > curr_block.height: raise ValueError("Height is not valid for tip {tip_header_hash}") while height < curr_block.height: - fetched: Optional[FullBlock] = await self.store.get_block( - curr_block.header.data.prev_header_hash - ) - assert fetched is not None - curr_block = fetched.header_block + curr_block = self.header_blocks.get(curr_block.prev_header_hash, None) + if curr_block is None: + raise ValueError(f"Do not have header {height}") headers.append((index, curr_block)) return [b for index, b in sorted(headers)] @@ -449,6 +445,9 @@ class Blockchain: # Block is valid and connected, so it can be added to the blockchain. await self.store.save_block(block) + # Cache header in memory + self.header_blocks[block.header_hash] = block.header_block + if await self._reconsider_heads(block, genesis): return ReceiveBlockResult.ADDED_TO_HEAD else: @@ -497,20 +496,27 @@ class Blockchain: # 3. Check filter hash is correct TODO - # 4. Check body hash + # 4. Check the proof of space hash is valid + if ( + block.header_block.proof_of_space.get_hash() + != block.header_block.header.data.proof_of_space_hash + ): + return False + + # 5. Check body hash if block.body.get_hash() != block.header_block.header.data.body_hash: return False - # 5. Check extension data, if any is added + # 6. Check extension data, if any is added - # 6. Compute challenge of parent + # 7. Compute challenge of parent challenge_hash: bytes32 if not genesis: assert prev_block assert prev_block.header_block.challenge challenge_hash = prev_block.header_block.challenge.get_hash() - # 7. Check challenge hash of prev is the same as in pos + # 8. Check challenge hash of prev is the same as in pos if challenge_hash != block.header_block.proof_of_space.challenge_hash: return False else: @@ -520,19 +526,19 @@ class Blockchain: if challenge_hash != block.header_block.proof_of_space.challenge_hash: return False - # 8. Check harvester signature of header data is valid based on harvester key + # 9. Check harvester signature of header data is valid based on harvester key if not block.header_block.header.harvester_signature.verify( [blspy.Util.hash256(block.header_block.header.data.get_hash())], [block.header_block.proof_of_space.plot_pubkey], ): return False - # 9. Check proof of space based on challenge + # 10. Check proof of space based on challenge pos_quality = block.header_block.proof_of_space.verify_and_get_quality() if not pos_quality: return False - # 10. Check coinbase height = parent coinbase height + 1 + # 11. Check coinbase height = parent coinbase height + 1 if not genesis: assert prev_block if block.body.coinbase.height != prev_block.body.coinbase.height + 1: @@ -541,27 +547,27 @@ class Blockchain: if block.body.coinbase.height != 0: return False - # 11. Check coinbase amount + # 12. Check coinbase amount if ( calculate_block_reward(block.body.coinbase.height) != block.body.coinbase.amount ): return False - # 12. Check coinbase signature with pool pk + # 13. Check coinbase signature with pool pk if not block.body.coinbase_signature.verify( [blspy.Util.hash256(bytes(block.body.coinbase))], [block.header_block.proof_of_space.pool_pubkey], ): return False - # TODO: 13a. check transactions - # TODO: 13b. Aggregate transaction results into signature + # TODO: 14a. check transactions + # TODO: 14b. Aggregate transaction results into signature if block.body.aggregated_signature: - # TODO: 14. check that aggregate signature is valid, based on pubkeys, and messages + # TODO: 15. check that aggregate signature is valid, based on pubkeys, and messages pass - # TODO: 15. check fees - # TODO: 16. check cost + # TODO: 16. check fees + # TODO: 17. check cost return True async def validate_block(self, block: FullBlock, genesis: bool = False) -> bool: @@ -678,6 +684,7 @@ class Blockchain: fetched: Optional[FullBlock] if not curr_old or curr_old.height < curr_new.height: self.height_to_hash[uint32(curr_new.height)] = curr_new.header_hash + self.header_blocks[curr_new.header_hash] = curr_new if curr_new.height == 0: return fetched = await self.store.get_block(curr_new.prev_header_hash) @@ -692,6 +699,7 @@ class Blockchain: if curr_new.header_hash == curr_old.header_hash: return self.height_to_hash[uint32(curr_new.height)] = curr_new.header_hash + self.header_blocks[curr_new.header_hash] = curr_new fetched_new: Optional[FullBlock] = await self.store.get_block( curr_new.prev_header_hash ) diff --git a/src/database.py b/src/database.py index 625f148231a8..190131badac9 100644 --- a/src/database.py +++ b/src/database.py @@ -1,4 +1,5 @@ import asyncio +import logging from abc import ABC from typing import AsyncGenerator, Dict, List, Optional, Tuple from bson.binary import Binary @@ -15,6 +16,9 @@ from src.util.ints import uint32, uint64 from src.util.streamable import Streamable +log = logging.getLogger(__name__) + + class Database(ABC): # All databases must subclass this so that there's one client # Ensure mongod service is running @@ -24,6 +28,7 @@ class Database(ABC): ) def __init__(self, db_name): + log.info("Connecting to mongodb database") self.db = Database.client.get_database( db_name, codec_options=CodecOptions( @@ -34,6 +39,7 @@ class Database(ABC): ) ), ) + log.info("Connected to mongodb database") class FullNodeStore(Database): @@ -49,6 +55,8 @@ class FullNodeStore(Database): self.candidate_blocks = self.db.get_collection("candidate_blocks") # Blocks which are not finalized yet (no proof of time) self.unfinished_blocks = self.db.get_collection("unfinished_blocks") + # Blocks which we have received but our blockchain dose not reach + self.disconnected_blocks = self.db.get_collection("unfinished_blocks") # Stored in memory # Whether or not we are syncing @@ -80,6 +88,7 @@ class FullNodeStore(Database): await self.potential_blocks.drop() await self.candidate_blocks.drop() await self.unfinished_blocks.drop() + await self.disconnected_blocks.drop() async def save_block(self, block: FullBlock) -> None: header_hash = block.header_hash @@ -99,6 +108,22 @@ class FullNodeStore(Database): async for query in self.full_blocks.find({}): yield FullBlock.from_bytes(query["block"]) + async def save_disconnected_block(self, block: FullBlock) -> None: + prev_header_hash = block.prev_header_hash + await self.disconnected_blocks.find_one_and_update( + {"_id": prev_header_hash}, + {"$set": {"_id": prev_header_hash, "block": block}}, + upsert=True, + ) + + async def get_disconnected_block( + self, prev_header_hash: bytes32 + ) -> Optional[FullBlock]: + query = await self.disconnected_blocks.find_one({"_id": prev_header_hash}) + if query is not None: + return FullBlock.from_bytes(query["block"]) + return None + async def set_sync_mode(self, sync_mode: bool) -> None: self.sync_mode = sync_mode @@ -121,10 +146,10 @@ class FullNodeStore(Database): async def get_potential_tip(self, header_hash: bytes32) -> Optional[FullBlock]: return self.potential_tips.get(header_hash, None) - async def add_potential_header(self, block: HeaderBlock) -> None: + def add_potential_header(self, block: HeaderBlock) -> None: self.potential_headers[block.height] = block - async def get_potential_header(self, height: uint32) -> Optional[HeaderBlock]: + def get_potential_header(self, height: uint32) -> Optional[HeaderBlock]: return self.potential_headers.get(height, None) async def add_potential_block(self, block: FullBlock) -> None: @@ -138,18 +163,16 @@ class FullNodeStore(Database): query = await self.potential_blocks.find_one({"_id": height}) return FullBlock.from_bytes(query["block"]) if query else None - async def set_potential_headers_received( - self, height: uint32, event: asyncio.Event - ): + def set_potential_headers_received(self, height: uint32, event: asyncio.Event): self.potential_headers_received[height] = event - async def get_potential_headers_received(self, height: uint32) -> asyncio.Event: + def get_potential_headers_received(self, height: uint32) -> asyncio.Event: return self.potential_headers_received[height] - async def set_potential_blocks_received(self, height: uint32, event: asyncio.Event): + def set_potential_blocks_received(self, height: uint32, event: asyncio.Event): self.potential_blocks_received[height] = event - async def get_potential_blocks_received(self, height: uint32) -> asyncio.Event: + def get_potential_blocks_received(self, height: uint32) -> asyncio.Event: return self.potential_blocks_received[height] async def add_potential_future_block(self, block: FullBlock): diff --git a/src/full_node.py b/src/full_node.py index 91d057f97dc2..e28dfa05f853 100644 --- a/src/full_node.py +++ b/src/full_node.py @@ -35,7 +35,6 @@ from src.util.api_decorators import api_request from src.util.errors import ( BlockNotInBlockchain, InvalidUnfinishedBlock, - PeersDontHaveBlock, ) from src.util.ints import uint32, uint64 @@ -197,8 +196,9 @@ class FullNode: - Check which are the heaviest tips - Request headers for the heaviest - Verify the weight of the tip, using the headers - - Blacklist peers that provide invalid stuff - - Sync blockchain up to heads (request blocks in batches, and add to queue) + - Find the fork point to see where to start downloading blocks + - Blacklist peers that provide invalid blocks + - Sync blockchain up to heads (request blocks in batches) """ log.info("Starting to perform sync with peers.") log.info("Waiting to receive tips from peers.") @@ -207,7 +207,6 @@ class FullNode: highest_weight: uint64 = uint64(0) tip_block: FullBlock tip_height = 0 - caught_up = False # Based on responses from peers about the current heads, see which head is the heaviest # (similar to longest chain rule). @@ -228,72 +227,106 @@ class FullNode: [t.weight for t in self.blockchain.get_current_tips()] ): log.info("Not performing sync, already caught up.") - caught_up = True - if caught_up: - async for msg in self._finish_sync(): - yield msg - return + return + assert tip_block log.info(f"Tip block {tip_block.header_hash} tip height {tip_block.height}") - async with self.store.lock: - for height in range(0, tip_block.height + 1): - await self.store.set_potential_headers_received(uint32(height), Event()) + for height in range(0, tip_block.height + 1): + self.store.set_potential_headers_received(uint32(height), Event()) + self.store.set_potential_blocks_received(uint32(height), Event()) - # Now, we download all of the headers in order to verify the weight - timeout = 60 + # Now, we download all of the headers in order to verify the weight, in batches + timeout = 200 sleep_interval = 10 - total_headers_coming = 5 * self.config["max_headers_to_send"] headers: List[HeaderBlock] = [] - for start_height in range(0, tip_height + 1, total_headers_coming): + # Download headers in batches. We download a few batches ahead in case there are delays or peers + # that don't have the headers that we need. + last_request_time: float = 0 + highest_height_requested: uint32 = uint32(0) + request_made: bool = False + for height_checkpoint in range( + 0, tip_height + 1, self.config["max_headers_to_send"] + ): + end_height = min( + height_checkpoint + self.config["max_headers_to_send"], tip_height + 1 + ) + total_time_slept = 0 - continue_requesting = True - while continue_requesting: + while True: + if self._shut_down: + return if total_time_slept > timeout: - raise TimeoutError("Took too long to fetch headers") - for height_offset in range( - 0, total_headers_coming, self.config["max_headers_to_send"] - ): - new_start_height = start_height + height_offset - end_height = min( - new_start_height + self.config["max_headers_to_send"], - tip_height + 1, + raise TimeoutError("Took too long to fetch blocks") + + # Request batches that we don't have yet + for batch in range(0, self.config["num_sync_batches"]): + batch_start = ( + height_checkpoint + batch * self.config["max_headers_to_send"] ) - request = peer_protocol.RequestHeaderBlocks( - tip_block.header_block.header.get_hash(), - [uint32(h) for h in range(new_start_height, end_height)], + batch_end = min( + batch_start + self.config["max_headers_to_send"], tip_height + 1 ) - yield OutboundMessage( - NodeType.FULL_NODE, - Message("request_header_blocks", request), - Delivery.RANDOM, + + if batch_start > tip_height: + # We have asked for all blocks + break + + blocks_missing = any( + [ + not ( + self.store.get_potential_headers_received(uint32(h)) + ).is_set() + for h in range(batch_start, batch_end) + ] ) - end_height = min(tip_height + 1, start_height + total_headers_coming) + if ( + time.time() - last_request_time > sleep_interval + and blocks_missing + ) or (batch_end - 1) > highest_height_requested: + # If we are missing header blocks in this batch, and we haven't made a request in a while, + # Make a request for this batch. Also, if we have never requested this batch, make + # the request + highest_height_requested = batch_end - 1 + request_made = True + request = peer_protocol.RequestHeaderBlocks( + tip_block.header_block.header.get_hash(), + [uint32(h) for h in range(batch_start, batch_end)], + ) + yield OutboundMessage( + NodeType.FULL_NODE, + Message("request_header_blocks", request), + Delivery.RANDOM, + ) + if request_made: + # Reset the timer for requests, so we don't overload other peers with requests + last_request_time = time.time() + request_made = False + + # Wait for the first batch (the next "max_blocks_to_send" blocks to arrive) awaitables = [ - ( - await self.store.get_potential_headers_received(uint32(height)) - ).wait() - for height in range(start_height, end_height) + (self.store.get_potential_headers_received(uint32(height))).wait() + for height in range(height_checkpoint, end_height) ] try: await asyncio.wait_for( asyncio.gather(*awaitables), timeout=sleep_interval ) - continue_requesting = False + break except concurrent.futures.TimeoutError: total_time_slept += sleep_interval - log.info("Did not receive desired headers") + log.info("Did not receive desired header blocks") + pass async with self.store.lock: for h in range(0, tip_height + 1): - header = await self.store.get_potential_header(uint32(h)) + header = self.store.get_potential_header(uint32(h)) assert header is not None headers.append(header) log.error(f"Downloaded headers up to tip height: {tip_height}") if not verify_weight(tip_block.header_block, headers): - # TODO: ban peers that provided the invalid heads or proofs raise errors.InvalidWeight( f"Weight of {tip_block.header_block.header.get_hash()} not valid." ) @@ -303,88 +336,122 @@ class FullNode: ) assert tip_height + 1 == len(headers) + # Finding the fork point allows us to only download blocks from the fork point async with self.store.lock: fork_point: HeaderBlock = self.blockchain.find_fork_point(headers) - # TODO: optimize, send many requests at once, and for more blocks - for height in range(fork_point.height + 1, tip_height + 1): - # Only download from fork point (what we don't have) - async with self.store.lock: - have_block = ( - await self.store.get_potential_tip( - headers[height].header.get_hash() - ) - is not None - ) + # Download blocks in batches, and verify them as they come in. We download a few batches ahead, + # in case there are delays. + last_request_time = 0 + highest_height_requested = uint32(0) + request_made = False + for height_checkpoint in range( + fork_point.height + 1, tip_height + 1, self.config["max_blocks_to_send"] + ): + end_height = min( + height_checkpoint + self.config["max_blocks_to_send"], tip_height + 1 + ) - if not have_block: - request_sync = peer_protocol.RequestSyncBlocks( - tip_block.header_block.header.header_hash, [uint32(height)] - ) - async with self.store.lock: - await self.store.set_potential_blocks_received( - uint32(height), Event() + total_time_slept = 0 + while True: + if self._shut_down: + return + if total_time_slept > timeout: + raise TimeoutError("Took too long to fetch blocks") + + # Request batches that we don't have yet + for batch in range(0, self.config["num_sync_batches"]): + batch_start = ( + height_checkpoint + batch * self.config["max_blocks_to_send"] ) - found = False - for _ in range(30): - if self._shut_down: - return - log.info(f"Requesting blocks {request_sync.heights}") - yield OutboundMessage( - NodeType.FULL_NODE, - Message("request_sync_blocks", request_sync), - Delivery.RANDOM, + batch_end = min( + batch_start + self.config["max_blocks_to_send"], tip_height + 1 ) - try: - await asyncio.wait_for( - ( - await self.store.get_potential_blocks_received( - uint32(height) - ) - ).wait(), - timeout=10, - ) - found = True + + if batch_start > tip_height: + # We have asked for all blocks break - except concurrent.futures.TimeoutError: - log.info("Did not receive desired block") - if not found: - raise PeersDontHaveBlock( - f"Did not receive desired block at height {height}" - ) - async with self.store.lock: - # TODO: ban peers that provide bad blocks - if have_block: - block = await self.store.get_potential_tip( - headers[height].header.get_hash() - ) - else: - block = await self.store.get_potential_block(uint32(height)) + blocks_missing = any( + [ + not ( + self.store.get_potential_blocks_received(uint32(h)) + ).is_set() + for h in range(batch_start, batch_end) + ] + ) + if ( + time.time() - last_request_time > sleep_interval + and blocks_missing + ) or (batch_end - 1) > highest_height_requested: + # If we are missing blocks in this batch, and we haven't made a request in a while, + # Make a request for this batch. Also, if we have never requested this batch, make + # the request + log.info( + f"Requesting sync blocks {[i for i in range(batch_start, batch_end)]}" + ) + highest_height_requested = batch_end - 1 + request_made = True + request_sync = peer_protocol.RequestSyncBlocks( + tip_block.header_block.header.header_hash, + [ + uint32(height) + for height in range(batch_start, batch_end) + ], + ) + yield OutboundMessage( + NodeType.FULL_NODE, + Message("request_sync_blocks", request_sync), + Delivery.RANDOM, + ) + if request_made: + # Reset the timer for requests, so we don't overload other peers with requests + last_request_time = time.time() + request_made = False + + # Wait for the first batch (the next "max_blocks_to_send" blocks to arrive) + awaitables = [ + (self.store.get_potential_blocks_received(uint32(height))).wait() + for height in range(height_checkpoint, end_height) + ] + try: + await asyncio.wait_for( + asyncio.gather(*awaitables), timeout=sleep_interval + ) + break + except concurrent.futures.TimeoutError: + total_time_slept += sleep_interval + log.info("Did not receive desired blocks") + pass + + # Verifies this batch, which we are guaranteed to have (since we broke from the above loop) + for height in range(height_checkpoint, end_height): + if self._shut_down: + return + block = await self.store.get_potential_block(uint32(height)) assert block is not None - start = time.time() - result = await self.blockchain.receive_block(block) - if ( - result == ReceiveBlockResult.INVALID_BLOCK - or result == ReceiveBlockResult.DISCONNECTED_BLOCK - ): - raise RuntimeError(f"Invalid block {block.header_hash}") - log.info( - f"Took {time.time() - start} seconds to validate and add block {block.height}." - ) - assert ( - max([h.height for h in self.blockchain.get_current_tips()]) - >= height - ) - await self.store.set_proof_of_time_estimate_ips( - await self.blockchain.get_next_ips(block.header_hash) - ) + async with self.store.lock: + # The block gets permanantly added to the blockchain + result = await self.blockchain.receive_block(block) + if ( + result == ReceiveBlockResult.INVALID_BLOCK + or result == ReceiveBlockResult.DISCONNECTED_BLOCK + ): + raise RuntimeError(f"Invalid block {block.header_hash}") + log.info( + f"Took {time.time() - start} seconds to validate and add block {block.height}." + ) + assert ( + max([h.height for h in self.blockchain.get_current_tips()]) + >= height + ) + await self.store.set_proof_of_time_estimate_ips( + await self.blockchain.get_next_ips(block.header_hash) + ) + assert max([h.height for h in self.blockchain.get_current_tips()]) == tip_height log.info(f"Finished sync up to height {tip_height}") - async for msg in self._finish_sync(): - yield msg - async def _finish_sync(self): """ Finalize sync by setting sync mode to False, clearing all sync information, and adding any final @@ -398,6 +465,8 @@ class FullNode: await self.store.clear_sync_info() for block in potential_fut_blocks: + if self._shut_down: + return async for msg in self.block(peer_protocol.Block(block)): yield msg @@ -414,6 +483,7 @@ class FullNode: """ A peer requests a list of header blocks, by height. Used for syncing or light clients. """ + start = time.time() if len(request.heights) > self.config["max_headers_to_send"]: raise errors.TooManyheadersRequested( f"The max number of headers is {self.config['max_headers_to_send']},\ @@ -421,11 +491,10 @@ class FullNode: ) try: - headers: List[ - HeaderBlock - ] = await self.blockchain.get_header_blocks_by_height( + headers: List[HeaderBlock] = self.blockchain.get_header_blocks_by_height( request.heights, request.tip_header_hash ) + log.info(f"Got header blocks by height {time.time() - start}") except KeyError: return except BlockNotInBlockchain as e: @@ -446,10 +515,8 @@ class FullNode: """ async with self.store.lock: for header_block in request.header_blocks: - await self.store.add_potential_header(header_block) - ( - await self.store.get_potential_headers_received(header_block.height) - ).set() + self.store.add_potential_header(header_block) + (self.store.get_potential_headers_received(header_block.height)).set() for _ in []: # Yields nothing yield _ @@ -475,7 +542,7 @@ class FullNode: try: header_blocks: List[ HeaderBlock - ] = await self.blockchain.get_header_blocks_by_height( + ] = self.blockchain.get_header_blocks_by_height( request.heights, request.tip_header_hash ) for header_block in header_blocks: @@ -514,7 +581,7 @@ class FullNode: for block in request.blocks: await self.store.add_potential_block(block) - (await self.store.get_potential_blocks_received(block.height)).set() + (self.store.get_potential_blocks_received(block.height)).set() for _ in []: # Yields nothing yield _ @@ -744,43 +811,38 @@ class FullNode: We can validate it and if it's a good block, propagate it to other peers and timelords. """ - async with self.store.lock: - if not self.blockchain.is_child_of_head(unfinished_block.block): - return + if not self.blockchain.is_child_of_head(unfinished_block.block): + return - if not await self.blockchain.validate_unfinished_block( - unfinished_block.block - ): - raise InvalidUnfinishedBlock() + if not await self.blockchain.validate_unfinished_block(unfinished_block.block): + raise InvalidUnfinishedBlock() - prev_block: Optional[HeaderBlock] = await self.blockchain.get_header_block( - unfinished_block.block.prev_header_hash - ) - assert prev_block - assert prev_block.challenge + prev_block: Optional[HeaderBlock] = await self.blockchain.get_header_block( + unfinished_block.block.prev_header_hash + ) + assert prev_block + assert prev_block.challenge - challenge_hash: bytes32 = prev_block.challenge.get_hash() - difficulty: uint64 = await self.blockchain.get_next_difficulty( - unfinished_block.block.header_block.prev_header_hash - ) - vdf_ips: uint64 = await self.blockchain.get_next_ips( - unfinished_block.block.header_block.prev_header_hash - ) + challenge_hash: bytes32 = prev_block.challenge.get_hash() + difficulty: uint64 = await self.blockchain.get_next_difficulty( + unfinished_block.block.header_block.prev_header_hash + ) + vdf_ips: uint64 = await self.blockchain.get_next_ips( + unfinished_block.block.header_block.prev_header_hash + ) - iterations_needed: uint64 = calculate_iterations( - unfinished_block.block.header_block.proof_of_space, - difficulty, - vdf_ips, - constants["MIN_BLOCK_TIME"], - ) + iterations_needed: uint64 = calculate_iterations( + unfinished_block.block.header_block.proof_of_space, + difficulty, + vdf_ips, + constants["MIN_BLOCK_TIME"], + ) - if ( - await self.store.get_unfinished_block( - (challenge_hash, iterations_needed) - ) - is not None - ): - return + if ( + await self.store.get_unfinished_block((challenge_hash, iterations_needed)) + is not None + ): + return expected_time: uint64 = uint64( int(iterations_needed / (await self.store.get_proof_of_time_estimate_ips())) @@ -899,24 +961,36 @@ class FullNode: except BaseException as e: log.warning(f"Error {type(e)}{e} with syncing") finally: - async with self.store.lock: - await self.store.set_sync_mode(False) - await self.store.clear_sync_info() - return + async for msg in self._finish_sync(): + yield msg elif block.block.height > tip_height + 1: log.info( f"We are a few blocks behind, our height is {tip_height} and block is at " - f"{block.block.height} so we will request these blocks." + f"{block.block.height} so we will request the previous block." ) - while True: - # TODO: download a few blocks and add them to chain - # prev_block_hash = block.block.header_block.header.data.prev_header_hash - break + msg = Message( + "request_block", + peer_protocol.RequestBlock(block.block.prev_header_hash), + ) + async with self.store.lock: + # If we have already requested the prev block, don't do so again + if ( + await self.store.get_disconnected_block( + block.block.prev_header_hash + ) + is not None + ): + log.info( + f"Have already requested block {block.block.prev_header_hash}" + ) + return + await self.store.save_disconnected_block(block.block) + yield OutboundMessage(NodeType.FULL_NODE, msg, Delivery.RANDOM) return elif added == ReceiveBlockResult.ADDED_TO_HEAD: # Only propagate blocks which extend the blockchain (becomes one of the heads) - # A deep reorg happens can be deteceted when we add a block to the heads, that has a worse + # A deep reorg happens can be detected when we add a block to the heads, that has a worse # height than the worst one (assuming we had a full set of heads). deep_reorg: bool = (block.block.height < least_height) and full_heads ips_changed: bool = False @@ -987,6 +1061,15 @@ class FullNode: Message("proof_of_space_finalized", farmer_request), Delivery.BROADCAST, ) + + # Recursively process the next block if we have it + async with self.store.lock: + next_block: Optional[ + FullBlock + ] = await self.store.get_disconnected_block(block.block.header_hash) + if next_block is not None: + async for msg in self.block(peer_protocol.Block(next_block)): + yield msg elif added == ReceiveBlockResult.ADDED_AS_ORPHAN: assert block.block.header_block.proof_of_time assert block.block.header_block.challenge @@ -997,6 +1080,26 @@ class FullNode: # Should never reach here, all the cases are covered assert False + async with self.store.lock: + pass + # TODO: Remove all unfinished old blocks + # TODO: Remove all candidate old blocks + # TODO: Remove all disconnected old blocks + + @api_request + async def request_block( + self, request_block: peer_protocol.RequestBlock + ) -> AsyncGenerator[OutboundMessage, None]: + block: Optional[FullBlock] = await self.store.get_block( + request_block.header_hash + ) + if block is not None: + yield OutboundMessage( + NodeType.FULL_NODE, + Message("block", peer_protocol.Block(block)), + Delivery.RESPOND, + ) + @api_request async def peers( self, request: peer_protocol.Peers diff --git a/src/server/repl_client.py b/src/server/repl_client.py deleted file mode 100644 index 3061a0b78bb0..000000000000 --- a/src/server/repl_client.py +++ /dev/null @@ -1,27 +0,0 @@ -# import asyncio -# import logging -# import sys -# from src.full_node import FullNode -# from src.server.server import ChiaServer -# from src.util.network import parse_host_port, create_node_id -# from src.server.outbound_message import NodeType, OutboundMessage, Message, Delivery -# from src.types.peer_info import PeerInfo -# from src.database import FullNodeStore -# from src.blockchain import Blockchain - - -# async def start_client_to_full_node(host, port): -# store = FullNodeStore() -# await store.initialize() -# blockchain = Blockchain(store) -# await blockchain.initialize() -# full_node = FullNode(store, blockchain) -# server = ChiaServer(9000, full_node, NodeType.FULL_NODE) -# res = await server.start_client(PeerInfo(host, port), None) -# print(res) -# m = Message("block", {}) -# server.push_message(OutboundMessage(NodeType.FULL_NODE, m, Delivery.BROADCAST)) -# await server.await_closed() - - -# asyncio.run(start_client_to_full_node("beast.44monty.chia.net", 8444)) diff --git a/src/server/server.py b/src/server/server.py index 3db562807f0d..c0ddfa87bdb4 100644 --- a/src/server/server.py +++ b/src/server/server.py @@ -120,6 +120,7 @@ class ChiaServer: return False start_time = time.time() succeeded: bool = False + last_error = None while time.time() - start_time < TOTAL_RETRY_SECONDS and not succeeded: if self._pipeline_task.done(): return False @@ -136,14 +137,14 @@ class ChiaServer: OSError, asyncio.TimeoutError, ) as e: - log.warning( - f"Could not connct to {target_node}. {type(e)}{e}. Retrying." - ) + last_error = e open_con_task.cancel() await asyncio.sleep(RETRY_INTERVAL) if not succeeded: - if self.global_connections.peers.remove(target_node): - log.info(f"Removed {target_node} from peer list") + log.warning( + f"Could not connect to {target_node}. {type(last_error)}{last_error}. Aborting and removing peer." + ) + self.global_connections.peers.remove(target_node) return False if on_connect is not None: self._on_connect_callbacks[target_node] = on_connect diff --git a/src/server/start_full_node.py b/src/server/start_full_node.py index 89847fcab43a..b923505ea648 100644 --- a/src/server/start_full_node.py +++ b/src/server/start_full_node.py @@ -7,8 +7,12 @@ import miniupnpc from src.blockchain import Blockchain from src.database import FullNodeStore from src.full_node import FullNode + +# from src.util.ints import uint16 from src.server.outbound_message import NodeType from src.server.server import ChiaServer + +# from src.server.local_api_server import FullNodeLocalApi from src.types.peer_info import PeerInfo from src.util.network import parse_host_port @@ -30,6 +34,7 @@ async def main(): store = FullNodeStore(f"fndb_{db_id}") blockchain = Blockchain(store) + log.info("Initializing blockchain from disk") await blockchain.initialize() full_node = FullNode(store, blockchain) @@ -37,9 +42,10 @@ async def main(): host, port = parse_host_port(full_node) if full_node.config["enable_upnp"]: + log.info(f"Attempting to enable UPnP (open up port {port})") try: upnp = miniupnpc.UPnP() - upnp.discoverdelay = 10 + upnp.discoverdelay = 5 upnp.discover() upnp.selectigd() upnp.addportmapping(port, "TCP", upnp.lanaddr, port, "chia", "") diff --git a/src/ui/prompt_ui.py b/src/ui/prompt_ui.py index 958435359350..7283852fe8d4 100644 --- a/src/ui/prompt_ui.py +++ b/src/ui/prompt_ui.py @@ -332,7 +332,7 @@ class FullNodeUI: self.latest_blocks_labels[i].text = ( f"{b.height}:{b.header_hash}" f" {'LCA' if b.header_hash == lca_block.header_hash else ''}" - f" {'HEAD' if b.header_hash in [h.header_hash for h in heads] else ''}" + f" {'TIP' if b.header_hash in [h.header_hash for h in heads] else ''}" ) self.latest_blocks_labels[i].handler = self.change_route_handler( f"block/{b.header_hash}" @@ -340,7 +340,7 @@ class FullNodeUI: new_labels.append(self.latest_blocks_labels[i]) self.lca_label.text = f"Current least common ancestor {lca_block.header_hash} height {lca_block.height}" - self.current_heads_label.text = "Heights of heads: " + str( + self.current_heads_label.text = "Heights of tips: " + str( [h.height for h in heads] ) self.difficulty_label.text = f"Current difficuty: {difficulty}" diff --git a/tests/test_blockchain.py b/tests/test_blockchain.py index 6d83cbc2eaa2..3aa05784dce4 100644 --- a/tests/test_blockchain.py +++ b/tests/test_blockchain.py @@ -14,7 +14,7 @@ from src.types.full_block import FullBlock from src.types.header import Header, HeaderData from src.types.header_block import HeaderBlock from src.types.proof_of_space import ProofOfSpace -from src.util.ints import uint32, uint64 +from src.util.ints import uint32, uint64, uint8 from tests.block_tools import BlockTools bt = BlockTools() @@ -52,9 +52,7 @@ class TestGenesisBlock: assert genesis_block.height == 0 assert genesis_block.challenge assert ( - await bc1.get_header_blocks_by_height( - [uint32(0)], genesis_block.header_hash - ) + bc1.get_header_blocks_by_height([uint32(0)], genesis_block.header_hash) )[0] == genesis_block assert ( await bc1.get_next_difficulty(genesis_block.header_hash) @@ -200,8 +198,8 @@ class TestBlockValidation: async def test_invalid_pos(self, initial_blockchain): blocks, b = initial_blockchain - bad_pos = blocks[9].header_block.proof_of_space.proof - bad_pos[0] = (bad_pos[0] + 1) % 256 + bad_pos = [i for i in blocks[9].header_block.proof_of_space.proof] + bad_pos[0] = uint8((bad_pos[0] + 1) % 256) # Proof of space invalid block_bad = FullBlock( HeaderBlock( diff --git a/tests/test_database.py b/tests/test_database.py index 314cac2015b5..46d2b5688726 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -38,8 +38,8 @@ class TestDatabase: # add/get potential trunk header = genesis.header_block - await db.add_potential_header(header) - assert await db.get_potential_header(genesis.height) == header + db.add_potential_header(header) + assert db.get_potential_header(genesis.height) == header # Add potential block await db.add_potential_block(genesis)