farmer just manage (#16754)

This commit is contained in:
Kyle Altendorf 2023-11-21 18:13:08 -05:00 committed by GitHub
parent eff655518c
commit 06e680d421
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -172,12 +172,34 @@ class Farmer:
@contextlib.asynccontextmanager
async def manage(self) -> AsyncIterator[None]:
await self._start()
async def start_task() -> None:
# `Farmer.setup_keys` returns `False` if there are no keys setup yet. In this case we just try until it
# succeeds or until we need to shut down.
while not self._shut_down:
if await self.setup_keys():
self.update_pool_state_task = asyncio.create_task(self._periodically_update_pool_state_task())
self.cache_clear_task = asyncio.create_task(self._periodically_clear_cache_and_refresh_task())
log.debug("start_task: initialized")
self.started = True
return
await asyncio.sleep(1)
asyncio.create_task(start_task())
try:
yield
finally:
self._close()
await self._await_closed()
self._shut_down = True
if self.cache_clear_task is not None:
await self.cache_clear_task
if self.update_pool_state_task is not None:
await self.update_pool_state_task
if self.keychain_proxy is not None:
proxy = self.keychain_proxy
self.keychain_proxy = None
await proxy.close()
await asyncio.sleep(0.5) # https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
self.started = False
def get_connections(self, request_node_type: Optional[NodeType]) -> List[Dict[str, Any]]:
return default_get_connections(server=self.server, request_node_type=request_node_type)
@ -239,36 +261,6 @@ class Farmer:
return True
async def _start(self) -> None:
async def start_task() -> None:
# `Farmer.setup_keys` returns `False` if there are no keys setup yet. In this case we just try until it
# succeeds or until we need to shut down.
while not self._shut_down:
if await self.setup_keys():
self.update_pool_state_task = asyncio.create_task(self._periodically_update_pool_state_task())
self.cache_clear_task = asyncio.create_task(self._periodically_clear_cache_and_refresh_task())
log.debug("start_task: initialized")
self.started = True
return
await asyncio.sleep(1)
asyncio.create_task(start_task())
def _close(self) -> None:
self._shut_down = True
async def _await_closed(self, shutting_down: bool = True) -> None:
if self.cache_clear_task is not None:
await self.cache_clear_task
if self.update_pool_state_task is not None:
await self.update_pool_state_task
if shutting_down and self.keychain_proxy is not None:
proxy = self.keychain_proxy
self.keychain_proxy = None
await proxy.close()
await asyncio.sleep(0.5) # https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
self.started = False
def _set_state_changed_callback(self, callback: StateChangedProtocol) -> None:
self.state_changed_callback = callback