Start removing global connections

This commit is contained in:
Mariano Sorgente 2019-11-13 18:14:46 -05:00
parent 1ff2667a56
commit 9010911177
5 changed files with 57 additions and 75 deletions

View File

@ -1,7 +1,7 @@
from asyncio import StreamReader, StreamWriter
import logging
import time
from typing import List, Any, Optional
from typing import Any, Optional
from src.util import cbor
from src.server.outbound_message import Message, NodeType
@ -61,35 +61,3 @@ class Connection:
def __str__(self) -> str:
return f"Connection({self.get_peername()})"
class PeerConnections:
def __init__(self, all_connections: List[Connection] = []):
self._all_connections = all_connections
def add(self, connection: Connection) -> bool:
for c in self._all_connections:
if c.node_id == connection.node_id:
return False
self._all_connections.append(connection)
return True
def have_connection(self, connection: Connection) -> bool:
for c in self._all_connections:
if c.node_id == connection.node_id:
return True
return False
def close(self, connection: Connection):
if connection in self._all_connections:
connection.close()
self._all_connections.remove(connection)
return
def close_all_connections(self):
for connection in self._all_connections:
connection.close()
self._all_connections = []
def get_connections(self):
return self._all_connections

View File

@ -1,27 +1,32 @@
import asyncio
import logging
import sys
from src.full_node import FullNode
from src.server.server import ChiaServer
from src.util.network import parse_host_port, create_node_id
from src.server.outbound_message import NodeType, OutboundMessage, Message, Delivery
from src.types.peer_info import PeerInfo
from src.store.full_node_store import FullNodeStore
from src.blockchain import Blockchain
# import asyncio
# import logging
# import sys
# from src.full_node import FullNode
# from src.server.server import ChiaServer
# from src.util.network import parse_host_port, create_node_id
# from src.server.outbound_message import NodeType, OutboundMessage, Message, Delivery
# from src.types.peer_info import PeerInfo
# from src.store.full_node_store import FullNodeStore
# from src.blockchain import Blockchain
# logging.basicConfig(format='Farmer %(name)-25s: %(levelname)-8s %(asctime)s.%(msecs)03d %(message)s',
# level=logging.INFO,
# datefmt='%H:%M:%S'
# )
# log = logging.getLogger(__name__)
# async def start_client_to_full_node(host, port):
# store = FullNodeStore()
# await store.initialize()
# blockchain = Blockchain(store)
# await blockchain.initialize()
# full_node = FullNode(store, blockchain)
# server = ChiaServer(9000, full_node, NodeType.FULL_NODE)
# res = await server.start_client(PeerInfo(host, port), None)
# log.info("ASFd")
# m = Message("block", {})
# server.push_message(OutboundMessage(NodeType.FULL_NODE, m, Delivery.BROADCAST))
# await server.await_closed()
async def start_client_to_full_node(host, port):
store = FullNodeStore()
await store.initialize()
blockchain = Blockchain(store)
await blockchain.initialize()
full_node = FullNode(store, blockchain)
server = ChiaServer(9000, full_node, NodeType.FULL_NODE)
res = await server.start_client(PeerInfo(host, port), None)
print(res)
m = Message("block", {})
server.push_message(OutboundMessage(NodeType.FULL_NODE, m, Delivery.BROADCAST))
await server.await_closed()
asyncio.run(start_client_to_full_node("beast.44monty.chia.net", 8444))
# asyncio.run(start_client_to_full_node("127.0.0.1", 8444))

View File

@ -5,15 +5,13 @@ from typing import Tuple, AsyncGenerator, Callable, Optional, List, Any, Dict
from aiter.server import start_server_aiter
from aiter import push_aiter, map_aiter, join_aiters, iter_to_aiter, aiter_forker
from src.types.peer_info import PeerInfo
from src.server.connection import Connection, PeerConnections
from src.server.connection import Connection
from src.server.outbound_message import OutboundMessage, Delivery, Message, NodeType
from src.protocols.shared_protocol import Handshake, HandshakeAck, protocol_version
from src.util import partial_func
from src.util.errors import InvalidHandshake, IncompatibleProtocolVersion, InvalidAck, InvalidProtocolMessage
from src.util.network import create_node_id
exited = False
# Each message is prepended with LENGTH_BYTES bytes specifying the length
TOTAL_RETRY_SECONDS: int = 10
RETRY_INTERVAL: int = 2
@ -22,7 +20,7 @@ log = logging.getLogger(__name__)
class ChiaServer:
# Keeps track of all connections to and from this node.
global_connections: PeerConnections = PeerConnections([])
global_connections: List[Connection] = []
# Optional listening server. You can also use this class without starting one.
_server: Optional[asyncio.AbstractServer] = None
@ -133,7 +131,9 @@ class ChiaServer:
"""
Starts closing all the clients and servers, by stopping the server and stopping the aiters.
"""
self.global_connections.close_all_connections()
for connection in self.global_connections:
connection.close()
self.global_connections = []
self._server.close()
if not self._outbound_aiter.is_stopped():
self._outbound_aiter.stop()
@ -240,11 +240,12 @@ class ChiaServer:
# Makes sure that we only start one connection with each peer
connection.node_id = inbound_handshake.node_id
connection.connection_type = inbound_handshake.node_type
if self.global_connections.have_connection(connection):
log.warning(f"Duplicate connection to {connection}")
return
for c in self.global_connections:
if c.node_id == connection.node_id:
log.warning(f"Duplicate connection to {connection}")
return
self.global_connections.add(connection)
self.global_connections.append(connection)
# Send Ack message
await connection.send(Message("handshake_ack", HandshakeAck()))
@ -286,7 +287,9 @@ class ChiaServer:
log.warning(f"Connection error by peer {connection.get_peername()}, closing connection.")
finally:
# Removes the connection from the global list, so we don't try to send things to it
self.global_connections.close(connection)
connection.close()
if connection in self.global_connections:
self.global_connections.remove(connection)
async def handle_message(self, pair: Tuple[Connection, Message], api: Any) -> AsyncGenerator[
Tuple[Connection, OutboundMessage], None]:
@ -313,7 +316,9 @@ class ChiaServer:
await result
except Exception as e:
log.error(f"Error {type(e)} {e}, closing connection {connection}")
self.global_connections.close(connection)
connection.close()
if connection in self.global_connections:
self.global_connections.remove(connection)
async def expand_outbound_messages(self, pair: Tuple[Connection, OutboundMessage]) -> AsyncGenerator[
Tuple[Connection, Message], None]:
@ -329,7 +334,7 @@ class ChiaServer:
elif outbound_message.delivery_method == Delivery.RANDOM:
# Select a random peer.
to_yield_single: Tuple[Connection, Message]
typed_peers: List[Connection] = [peer for peer in self.global_connections.get_connections()
typed_peers: List[Connection] = [peer for peer in self.global_connections
if peer.connection_type == outbound_message.peer_type]
if len(typed_peers) == 0:
return
@ -337,7 +342,7 @@ class ChiaServer:
elif (outbound_message.delivery_method == Delivery.BROADCAST or
outbound_message.delivery_method == Delivery.BROADCAST_TO_OTHERS):
# Broadcast to all peers.
for peer in self.global_connections.get_connections():
for peer in self.global_connections:
if peer.connection_type == outbound_message.peer_type:
if peer == connection:
if outbound_message.delivery_method == Delivery.BROADCAST:

View File

@ -69,7 +69,7 @@ async def main():
log.info("Waiting to connect to some peers...")
await asyncio.sleep(3)
log.info(f"Connected to {len(server.global_connections.get_connections())} peers.")
log.info(f"Connected to {len(server.global_connections)} peers.")
if not server_closed:
try:
async for msg in full_node._sync():

View File

@ -28,7 +28,7 @@ from src.types.sized_bytes import bytes32
from src.types.peer_info import PeerInfo
from src.util.ints import uint32
from src.server.server import ChiaServer
from src.server.connection import PeerConnections, NodeType
from src.server.connection import Connection, NodeType
log = logging.getLogger(__name__)
@ -87,7 +87,7 @@ class FullNodeUI:
self.store: FullNodeStore = store
self.blockchain: Blockchain = blockchain
self.node_server: ChiaServer = server
self.connections: PeerConnections = server.global_connections
self.connections: List[Connection] = server.global_connections
self.logs: List[logging.LogRecord] = []
self.app: Optional[Application] = None
self.closed: bool = False
@ -230,7 +230,8 @@ class FullNodeUI:
async def draw_home(self):
con_strs = []
for con in self.connections.get_connections():
print("Num connections:", len(self.connections))
for con in self.connections:
con_str = f"{NodeType(con.connection_type).name} {con.get_peername()} {con.node_id.hex()[:10]}..."
con_strs.append(con_str)
labels = [row.children[0].content.text() for row in self.con_rows]
@ -238,7 +239,10 @@ class FullNodeUI:
con_label = Label(text=con_str)
def disconnect():
print("Called disconnect on", con)
con.close()
if con in self.connections:
self.connections.remove(con)
self.layout.focus(self.quit_button)
disconnect_button = Button("Disconnect", handler=disconnect)