Improve wallet consistency (#3305)

* Improve wallet consistency

* Improve CLI significantly, and fix self-tx balances

* Fix await

* Fix deadlock and test

* Remove spam.sh
This commit is contained in:
Mariano Sorgente 2021-05-02 04:13:04 +09:00 committed by GitHub
parent d044cbb763
commit fe257cdfe1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 302 additions and 259 deletions

View File

@ -104,6 +104,7 @@ async def print_balances(args: dict, wallet_client: WalletRpcClient, fingerprint
address_prefix = config["network_overrides"]["config"][config["selected_network"]]["address_prefix"]
print(f"Wallet height: {await wallet_client.get_height_info()}")
print(f"Sync status: {'Synced' if (await wallet_client.get_synced()) else 'Not synced'}")
print(f"Balances, fingerprint: {fingerprint}")
for summary in summaries_response:
wallet_id = summary["id"]
@ -111,39 +112,33 @@ async def print_balances(args: dict, wallet_client: WalletRpcClient, fingerprint
typ = WalletType(int(summary["type"])).name
if typ != "STANDARD_WALLET":
print(f"Wallet ID {wallet_id} type {typ} {summary['name']}")
print(f" -Confirmed: " f"{balances['confirmed_wallet_balance']/units['colouredcoin']}")
print(f" -Confirmed - Pending Outgoing: {balances['unconfirmed_wallet_balance']/units['colouredcoin']}")
print(f" -Spendable: {balances['spendable_balance']/units['colouredcoin']}")
print(f" -Pending change: {balances['pending_change']/units['colouredcoin']}")
print(f" -Total Balance: " f"{balances['confirmed_wallet_balance']/units['colouredcoin']}")
print(f" -Pending Total Balance: {balances['unconfirmed_wallet_balance']/units['colouredcoin']}")
print(f" -Spendable Balance: {balances['spendable_balance']/units['colouredcoin']}")
else:
print(f"Wallet ID {wallet_id} type {typ}")
print(
f" -Confirmed: {balances['confirmed_wallet_balance']} mojo "
f"({balances['confirmed_wallet_balance']/units['chia']} {address_prefix})"
f" -Total Balance: {balances['confirmed_wallet_balance']/units['chia']} {address_prefix} "
f"({balances['confirmed_wallet_balance']} mojo)"
)
print(
f" -Unconfirmed: {balances['unconfirmed_wallet_balance']} mojo "
f"({balances['unconfirmed_wallet_balance']/units['chia']} {address_prefix})"
f" -Pending Total Balance: {balances['unconfirmed_wallet_balance']/units['chia']} {address_prefix} "
f"({balances['unconfirmed_wallet_balance']} mojo)"
)
print(
f" -Spendable: {balances['spendable_balance']} mojo "
f"({balances['spendable_balance']/units['chia']} {address_prefix})"
)
print(
f" -Pending change: {balances['pending_change']} mojo "
f"({balances['pending_change']/units['chia']} {address_prefix})"
f" -Spendable: {balances['spendable_balance']/units['chia']} {address_prefix} "
f"({balances['spendable_balance']} mojo)"
)
async def get_wallet(wallet_client: WalletRpcClient, fingerprint: int = None) -> Optional[Tuple[WalletRpcClient, int]]:
fingerprints = await wallet_client.get_public_keys()
if fingerprint is not None:
fingerprints = [fingerprint]
else:
fingerprints = await wallet_client.get_public_keys()
if len(fingerprints) == 0:
print("No keys loaded. Run 'chia keys generate' or import a key")
return None
if fingerprint is not None:
if fingerprint not in fingerprints:
print(f"Fingerprint {fingerprint} does not exist")
return None
if len(fingerprints) == 1:
fingerprint = fingerprints[0]
if fingerprint is not None:

View File

@ -335,30 +335,37 @@ class WalletRpcApi:
host = request["host"]
if request["wallet_type"] == "cc_wallet":
if request["mode"] == "new":
cc_wallet: CCWallet = await CCWallet.create_new_cc(wallet_state_manager, main_wallet, request["amount"])
colour = cc_wallet.get_colour()
asyncio.create_task(self._create_backup_and_upload(host))
async with self.service.wallet_state_manager.lock:
cc_wallet: CCWallet = await CCWallet.create_new_cc(
wallet_state_manager, main_wallet, request["amount"]
)
colour = cc_wallet.get_colour()
asyncio.create_task(self._create_backup_and_upload(host))
return {
"type": cc_wallet.type(),
"colour": colour,
"wallet_id": cc_wallet.id(),
}
elif request["mode"] == "existing":
cc_wallet = await CCWallet.create_wallet_for_cc(wallet_state_manager, main_wallet, request["colour"])
asyncio.create_task(self._create_backup_and_upload(host))
async with self.service.wallet_state_manager.lock:
cc_wallet = await CCWallet.create_wallet_for_cc(
wallet_state_manager, main_wallet, request["colour"]
)
asyncio.create_task(self._create_backup_and_upload(host))
return {"type": cc_wallet.type()}
elif request["wallet_type"] == "rl_wallet":
if request["rl_type"] == "admin":
log.info("Create rl admin wallet")
rl_admin: RLWallet = await RLWallet.create_rl_admin(wallet_state_manager)
success = await rl_admin.admin_create_coin(
uint64(int(request["interval"])),
uint64(int(request["limit"])),
request["pubkey"],
uint64(int(request["amount"])),
uint64(int(request["fee"])) if "fee" in request else uint64(0),
)
asyncio.create_task(self._create_backup_and_upload(host))
async with self.service.wallet_state_manager.lock:
rl_admin: RLWallet = await RLWallet.create_rl_admin(wallet_state_manager)
success = await rl_admin.admin_create_coin(
uint64(int(request["interval"])),
uint64(int(request["limit"])),
request["pubkey"],
uint64(int(request["amount"])),
uint64(int(request["fee"])) if "fee" in request else uint64(0),
)
asyncio.create_task(self._create_backup_and_upload(host))
assert rl_admin.rl_info.admin_pubkey is not None
return {
"success": success,
@ -369,8 +376,9 @@ class WalletRpcApi:
}
elif request["rl_type"] == "user":
log.info("Create rl user wallet")
rl_user: RLWallet = await RLWallet.create_rl_user(wallet_state_manager)
asyncio.create_task(self._create_backup_and_upload(host))
async with self.service.wallet_state_manager.lock:
rl_user: RLWallet = await RLWallet.create_rl_user(wallet_state_manager)
asyncio.create_task(self._create_backup_and_upload(host))
assert rl_user.rl_info.user_pubkey is not None
return {
"id": rl_user.id(),
@ -385,13 +393,14 @@ class WalletRpcApi:
backup_dids.append(hexstr_to_bytes(d))
if len(backup_dids) > 0:
num_needed = uint64(request["num_of_backup_ids_needed"])
did_wallet: DIDWallet = await DIDWallet.create_new_did_wallet(
wallet_state_manager,
main_wallet,
int(request["amount"]),
backup_dids,
uint64(num_needed),
)
async with self.service.wallet_state_manager.lock:
did_wallet: DIDWallet = await DIDWallet.create_new_did_wallet(
wallet_state_manager,
main_wallet,
int(request["amount"]),
backup_dids,
uint64(num_needed),
)
my_did = did_wallet.get_my_DID()
return {
"success": True,
@ -400,9 +409,10 @@ class WalletRpcApi:
"wallet_id": did_wallet.id(),
}
elif request["did_type"] == "recovery":
did_wallet = await DIDWallet.create_new_did_wallet_from_recovery(
wallet_state_manager, main_wallet, request["filename"]
)
async with self.service.wallet_state_manager.lock:
did_wallet = await DIDWallet.create_new_did_wallet_from_recovery(
wallet_state_manager, main_wallet, request["filename"]
)
assert did_wallet.did_info.temp_coin is not None
assert did_wallet.did_info.temp_puzhash is not None
assert did_wallet.did_info.temp_pubkey is not None
@ -542,7 +552,7 @@ class WalletRpcApi:
fee = uint64(request["fee"])
else:
fee = uint64(0)
async with self.service.wallet_state_manager.tx_lock:
async with self.service.wallet_state_manager.lock:
tx: TransactionRecord = await wallet.generate_signed_transaction(amount, puzzle_hash, fee)
await wallet.push_transaction(tx)
@ -594,7 +604,7 @@ class WalletRpcApi:
fee = uint64(request["fee"])
else:
fee = uint64(0)
async with self.service.wallet_state_manager.tx_lock:
async with self.service.wallet_state_manager.lock:
tx: TransactionRecord = await wallet.generate_signed_transaction([amount], [puzzle_hash], fee)
await wallet.push_transaction(tx)
@ -615,11 +625,12 @@ class WalletRpcApi:
offer = request["ids"]
file_name = request["filename"]
(
success,
spend_bundle,
error,
) = await self.service.wallet_state_manager.trade_manager.create_offer_for_ids(offer, file_name)
async with self.service.wallet_state_manager.lock:
(
success,
spend_bundle,
error,
) = await self.service.wallet_state_manager.trade_manager.create_offer_for_ids(offer, file_name)
if success:
self.service.wallet_state_manager.trade_manager.write_offer_to_disk(Path(file_name), spend_bundle)
return {}
@ -629,11 +640,12 @@ class WalletRpcApi:
assert self.service.wallet_state_manager is not None
file_name = request["filename"]
file_path = Path(file_name)
(
success,
discrepancies,
error,
) = await self.service.wallet_state_manager.trade_manager.get_discrepancies_for_offer(file_path)
async with self.service.wallet_state_manager.lock:
(
success,
discrepancies,
error,
) = await self.service.wallet_state_manager.trade_manager.get_discrepancies_for_offer(file_path)
if success:
return {"discrepancies": discrepancies}
@ -642,11 +654,12 @@ class WalletRpcApi:
async def respond_to_offer(self, request):
assert self.service.wallet_state_manager is not None
file_path = Path(request["filename"])
(
success,
trade_record,
error,
) = await self.service.wallet_state_manager.trade_manager.respond_to_offer(file_path)
async with self.service.wallet_state_manager.lock:
(
success,
trade_record,
error,
) = await self.service.wallet_state_manager.trade_manager.respond_to_offer(file_path)
if not success:
raise ValueError(error)
return {}
@ -683,10 +696,11 @@ class WalletRpcApi:
secure = request["secure"]
trade_id = hexstr_to_bytes(request["trade_id"])
if secure:
await wsm.trade_manager.cancel_pending_offer_safely(trade_id)
else:
await wsm.trade_manager.cancel_pending_offer(trade_id)
async with self.service.wallet_state_manager.lock:
if secure:
await wsm.trade_manager.cancel_pending_offer_safely(trade_id)
else:
await wsm.trade_manager.cancel_pending_offer(trade_id)
return {}
async def get_backup_info(self, request: Dict):
@ -725,18 +739,20 @@ class WalletRpcApi:
new_amount_verifications_required = uint64(request["num_verifications_required"])
else:
new_amount_verifications_required = len(recovery_list)
success = await wallet.update_recovery_list(recovery_list, new_amount_verifications_required)
# Update coin with new ID info
updated_puz = await wallet.get_new_puzzle()
spend_bundle = await wallet.create_spend(updated_puz.get_tree_hash())
async with self.service.wallet_state_manager.lock:
success = await wallet.update_recovery_list(recovery_list, new_amount_verifications_required)
# Update coin with new ID info
updated_puz = await wallet.get_new_puzzle()
spend_bundle = await wallet.create_spend(updated_puz.get_tree_hash())
if spend_bundle is not None and success:
return {"success": True}
return {"success": False}
async def did_spend(self, request):
wallet_id = int(request["wallet_id"])
wallet: DIDWallet = self.service.wallet_state_manager.wallets[wallet_id]
spend_bundle = await wallet.create_spend(request["puzzlehash"])
async with self.service.wallet_state_manager.lock:
wallet: DIDWallet = self.service.wallet_state_manager.wallets[wallet_id]
spend_bundle = await wallet.create_spend(request["puzzlehash"])
if spend_bundle is not None:
return {"success": True}
return {"success": False}
@ -745,7 +761,8 @@ class WalletRpcApi:
wallet_id = int(request["wallet_id"])
wallet: DIDWallet = self.service.wallet_state_manager.wallets[wallet_id]
my_did: str = wallet.get_my_DID()
coins = await wallet.select_coins(1)
async with self.service.wallet_state_manager.lock:
coins = await wallet.select_coins(1)
if coins is None or coins == set():
return {"success": True, "wallet_id": wallet_id, "my_did": my_did}
else:
@ -772,30 +789,31 @@ class WalletRpcApi:
if len(request["attest_filenames"]) < wallet.did_info.num_of_backup_ids_needed:
return {"success": False, "reason": "insufficient messages"}
(
info_list,
message_spend_bundle,
) = await wallet.load_attest_files_for_recovery_spend(request["attest_filenames"])
async with self.service.wallet_state_manager.lock:
(
info_list,
message_spend_bundle,
) = await wallet.load_attest_files_for_recovery_spend(request["attest_filenames"])
if "pubkey" in request:
pubkey = G1Element.from_bytes(hexstr_to_bytes(request["pubkey"]))
else:
assert wallet.did_info.temp_pubkey is not None
pubkey = wallet.did_info.temp_pubkey
if "pubkey" in request:
pubkey = G1Element.from_bytes(hexstr_to_bytes(request["pubkey"]))
else:
assert wallet.did_info.temp_pubkey is not None
pubkey = wallet.did_info.temp_pubkey
if "puzhash" in request:
puzhash = hexstr_to_bytes(request["puzhash"])
else:
assert wallet.did_info.temp_puzhash is not None
puzhash = wallet.did_info.temp_puzhash
if "puzhash" in request:
puzhash = hexstr_to_bytes(request["puzhash"])
else:
assert wallet.did_info.temp_puzhash is not None
puzhash = wallet.did_info.temp_puzhash
success = await wallet.recovery_spend(
wallet.did_info.temp_coin,
puzhash,
info_list,
pubkey,
message_spend_bundle,
)
success = await wallet.recovery_spend(
wallet.did_info.temp_coin,
puzhash,
info_list,
pubkey,
message_spend_bundle,
)
return {"success": success}
async def did_get_pubkey(self, request):
@ -807,12 +825,13 @@ class WalletRpcApi:
async def did_create_attest(self, request):
wallet_id = int(request["wallet_id"])
wallet: DIDWallet = self.service.wallet_state_manager.wallets[wallet_id]
info = await wallet.get_info_for_recovery()
coin = hexstr_to_bytes(request["coin_name"])
pubkey = G1Element.from_bytes(hexstr_to_bytes(request["pubkey"]))
spend_bundle = await wallet.create_attestment(
coin, hexstr_to_bytes(request["puzhash"]), pubkey, request["filename"]
)
async with self.service.wallet_state_manager.lock:
info = await wallet.get_info_for_recovery()
coin = hexstr_to_bytes(request["coin_name"])
pubkey = G1Element.from_bytes(hexstr_to_bytes(request["pubkey"]))
spend_bundle = await wallet.create_attestment(
coin, hexstr_to_bytes(request["puzhash"]), pubkey, request["filename"]
)
if spend_bundle is not None:
return {
"success": True,
@ -856,14 +875,15 @@ class WalletRpcApi:
wallet_id = uint32(int(request["wallet_id"]))
rl_user = self.service.wallet_state_manager.wallets[wallet_id]
origin = request["origin"]
await rl_user.set_user_info(
uint64(request["interval"]),
uint64(request["limit"]),
origin["parent_coin_info"],
origin["puzzle_hash"],
origin["amount"],
request["admin_pubkey"],
)
async with self.service.wallet_state_manager.lock:
await rl_user.set_user_info(
uint64(request["interval"]),
uint64(request["limit"]),
origin["parent_coin_info"],
origin["puzzle_hash"],
origin["amount"],
request["admin_pubkey"],
)
return {}
async def send_clawback_transaction(self, request):
@ -873,7 +893,7 @@ class WalletRpcApi:
wallet: RLWallet = self.service.wallet_state_manager.wallets[wallet_id]
fee = int(request["fee"])
async with self.service.wallet_state_manager.tx_lock:
async with self.service.wallet_state_manager.lock:
tx = await wallet.clawback_rl_coin_transaction(fee)
await wallet.push_transaction(tx)
@ -889,7 +909,8 @@ class WalletRpcApi:
puzzle_hash = wallet.rl_get_aggregation_puzzlehash(wallet.rl_info.rl_puzzle_hash)
request["wallet_id"] = 1
request["puzzle_hash"] = puzzle_hash
await wallet.rl_add_funds(request["amount"], puzzle_hash, request["fee"])
async with self.service.wallet_state_manager.lock:
await wallet.rl_add_funds(request["amount"], puzzle_hash, request["fee"])
return {"status": "SUCCESS"}
async def get_farmed_amount(self, request):
@ -948,7 +969,8 @@ class WalletRpcApi:
if "coins" in request and len(request["coins"]) > 0:
coins = set([Coin.from_json_dict(coin_json) for coin_json in request["coins"]])
signed_tx = await self.service.wallet_state_manager.main_wallet.generate_signed_transaction(
amount_0, puzzle_hash_0, fee, coins=coins, ignore_max_send_amount=True, primaries=additional_outputs
)
async with self.service.wallet_state_manager.lock:
signed_tx = await self.service.wallet_state_manager.main_wallet.generate_signed_transaction(
amount_0, puzzle_hash_0, fee, coins=coins, ignore_max_send_amount=True, primaries=additional_outputs
)
return {"signed_tx": signed_tx}

View File

@ -74,6 +74,9 @@ class WalletRpcClient(RpcClient):
async def get_sync_status(self) -> bool:
return (await self.fetch("get_sync_status", {}))["syncing"]
async def get_synced(self) -> bool:
return (await self.fetch("get_sync_status", {}))["synced"]
async def get_height_info(self) -> uint32:
return (await self.fetch("get_height_info", {}))["height"]

View File

@ -281,7 +281,7 @@ class CCWallet:
return bytes(self.cc_info.my_genesis_checker).hex()
async def coin_added(self, coin: Coin, header_hash: bytes32, removals: List[Coin], height: uint32):
""" Notification from wallet state manager that wallet has been received. """
"""Notification from wallet state manager that wallet has been received."""
self.log.info(f"CC wallet has been notified that {coin} was added")
search_for_parent: bool = True
@ -493,47 +493,50 @@ class CCWallet:
return result
async def select_coins(self, amount: uint64) -> Set[Coin]:
""" Returns a set of coins that can be used for generating a new transaction. """
async with self.wallet_state_manager.lock:
spendable_am = await self.get_confirmed_balance()
"""
Returns a set of coins that can be used for generating a new transaction.
Note: Must be called under wallet state manager lock
"""
if amount > spendable_am:
error_msg = f"Can't select amount higher than our spendable balance {amount}, spendable {spendable_am}"
self.log.warning(error_msg)
raise ValueError(error_msg)
spendable_am = await self.get_confirmed_balance()
self.log.info(f"About to select coins for amount {amount}")
spendable: List[WalletCoinRecord] = await self.get_cc_spendable_coins()
if amount > spendable_am:
error_msg = f"Can't select amount higher than our spendable balance {amount}, spendable {spendable_am}"
self.log.warning(error_msg)
raise ValueError(error_msg)
sum = 0
used_coins: Set = set()
self.log.info(f"About to select coins for amount {amount}")
spendable: List[WalletCoinRecord] = await self.get_cc_spendable_coins()
# Use older coins first
spendable.sort(key=lambda r: r.confirmed_block_height)
sum = 0
used_coins: Set = set()
# Try to use coins from the store, if there isn't enough of "unused"
# coins use change coins that are not confirmed yet
unconfirmed_removals: Dict[bytes32, Coin] = await self.wallet_state_manager.unconfirmed_removals_for_wallet(
self.id()
# Use older coins first
spendable.sort(key=lambda r: r.confirmed_block_height)
# Try to use coins from the store, if there isn't enough of "unused"
# coins use change coins that are not confirmed yet
unconfirmed_removals: Dict[bytes32, Coin] = await self.wallet_state_manager.unconfirmed_removals_for_wallet(
self.id()
)
for coinrecord in spendable:
if sum >= amount and len(used_coins) > 0:
break
if coinrecord.coin.name() in unconfirmed_removals:
continue
sum += coinrecord.coin.amount
used_coins.add(coinrecord.coin)
self.log.info(f"Selected coin: {coinrecord.coin.name()} at height {coinrecord.confirmed_block_height}!")
# This happens when we couldn't use one of the coins because it's already used
# but unconfirmed, and we are waiting for the change. (unconfirmed_additions)
if sum < amount:
raise ValueError(
"Can't make this transaction at the moment. Waiting for the change from the previous transaction."
)
for coinrecord in spendable:
if sum >= amount and len(used_coins) > 0:
break
if coinrecord.coin.name() in unconfirmed_removals:
continue
sum += coinrecord.coin.amount
used_coins.add(coinrecord.coin)
self.log.info(f"Selected coin: {coinrecord.coin.name()} at height {coinrecord.confirmed_block_height}!")
# This happens when we couldn't use one of the coins because it's already used
# but unconfirmed, and we are waiting for the change. (unconfirmed_additions)
if sum < amount:
raise ValueError(
"Can't make this transaction at the moment. Waiting for the change from the previous transaction."
)
self.log.info(f"Successfully selected coins: {used_coins}")
return used_coins
self.log.info(f"Successfully selected coins: {used_coins}")
return used_coins
async def get_sigs(self, innerpuz: Program, innersol: Program, coin_name: bytes32) -> List[G2Element]:
puzzle_hash = innerpuz.get_tree_hash()

View File

@ -49,6 +49,9 @@ class DIDWallet:
num_of_backup_ids_needed: uint64 = None,
name: str = None,
):
"""
This must be called under the wallet state manager lock
"""
self = DIDWallet()
self.base_puzzle_program = None
self.base_inner_puzzle_hash = None
@ -238,54 +241,53 @@ class DIDWallet:
return uint64(result)
async def select_coins(self, amount, exclude: List[Coin] = None) -> Optional[Set[Coin]]:
""" Returns a set of coins that can be used for generating a new transaction. """
async with self.wallet_state_manager.lock:
if exclude is None:
exclude = []
"""Returns a set of coins that can be used for generating a new transaction."""
if exclude is None:
exclude = []
spendable_amount = await self.get_spendable_balance()
if amount > spendable_amount:
self.log.warning(f"Can't select {amount}, from spendable {spendable_amount} for wallet id {self.id()}")
return None
spendable_amount = await self.get_spendable_balance()
if amount > spendable_amount:
self.log.warning(f"Can't select {amount}, from spendable {spendable_amount} for wallet id {self.id()}")
return None
self.log.info(f"About to select coins for amount {amount}")
unspent: List[WalletCoinRecord] = list(
await self.wallet_state_manager.get_spendable_coins_for_wallet(self.wallet_info.id)
self.log.info(f"About to select coins for amount {amount}")
unspent: List[WalletCoinRecord] = list(
await self.wallet_state_manager.get_spendable_coins_for_wallet(self.wallet_info.id)
)
sum_value = 0
used_coins: Set = set()
# Use older coins first
unspent.sort(key=lambda r: r.confirmed_block_height)
# Try to use coins from the store, if there isn't enough of "unused"
# coins use change coins that are not confirmed yet
unconfirmed_removals: Dict[bytes32, Coin] = await self.wallet_state_manager.unconfirmed_removals_for_wallet(
self.wallet_info.id
)
for coinrecord in unspent:
if sum_value >= amount and len(used_coins) > 0:
break
if coinrecord.coin.name() in unconfirmed_removals:
continue
if coinrecord.coin in exclude:
continue
sum_value += coinrecord.coin.amount
used_coins.add(coinrecord.coin)
# This happens when we couldn't use one of the coins because it's already used
# but unconfirmed, and we are waiting for the change. (unconfirmed_additions)
if sum_value < amount:
raise ValueError(
"Can't make this transaction at the moment. Waiting for the change from the previous transaction."
)
sum_value = 0
used_coins: Set = set()
# Use older coins first
unspent.sort(key=lambda r: r.confirmed_block_height)
# Try to use coins from the store, if there isn't enough of "unused"
# coins use change coins that are not confirmed yet
unconfirmed_removals: Dict[bytes32, Coin] = await self.wallet_state_manager.unconfirmed_removals_for_wallet(
self.wallet_info.id
)
for coinrecord in unspent:
if sum_value >= amount and len(used_coins) > 0:
break
if coinrecord.coin.name() in unconfirmed_removals:
continue
if coinrecord.coin in exclude:
continue
sum_value += coinrecord.coin.amount
used_coins.add(coinrecord.coin)
# This happens when we couldn't use one of the coins because it's already used
# but unconfirmed, and we are waiting for the change. (unconfirmed_additions)
if sum_value < amount:
raise ValueError(
"Can't make this transaction at the moment. Waiting for the change from the previous transaction."
)
self.log.info(f"Successfully selected coins: {used_coins}")
return used_coins
# This will be used in the recovery case where we don't have the parent info already
async def coin_added(self, coin: Coin, header_hash: bytes32, removals: List[Coin], height: int):
""" Notification from wallet state manager that wallet has been received. """
"""Notification from wallet state manager that wallet has been received."""
self.log.info("DID wallet has been notified that coin was added")
inner_puzzle = await self.inner_puzzle_for_did_puzzle(coin.puzzle_hash)
new_info = DIDInfo(
@ -752,6 +754,9 @@ class DIDWallet:
return parent_info
async def generate_new_decentralised_id(self, amount: uint64) -> Optional[SpendBundle]:
"""
This must be called under the wallet state manager lock
"""
coins = await self.standard_wallet.select_coins(amount)
if coins is None:

View File

@ -127,6 +127,7 @@ class Wallet:
for record in unconfirmed_tx:
if not record.is_in_mempool():
self.log.warning(f"Record: {record} not in mempool")
continue
our_spend = False
for coin in record.removals:
@ -229,60 +230,60 @@ class Wallet:
return solution_for_conditions(condition_list)
async def select_coins(self, amount, exclude: List[Coin] = None) -> Set[Coin]:
"""Returns a set of coins that can be used for generating a new transaction."""
async with self.wallet_state_manager.lock:
if exclude is None:
exclude = []
"""
Returns a set of coins that can be used for generating a new transaction.
Note: This must be called under a wallet state manager lock
"""
if exclude is None:
exclude = []
spendable_amount = await self.get_spendable_balance()
spendable_amount = await self.get_spendable_balance()
if amount > spendable_amount:
error_msg = (
f"Can't select amount higher than our spendable balance. Amount: {amount}, spendable: "
f" {spendable_amount}"
)
self.log.warning(error_msg)
raise ValueError(error_msg)
self.log.info(f"About to select coins for amount {amount}")
unspent: List[WalletCoinRecord] = list(
await self.wallet_state_manager.get_spendable_coins_for_wallet(self.id())
if amount > spendable_amount:
error_msg = (
f"Can't select amount higher than our spendable balance. Amount: {amount}, spendable: "
f" {spendable_amount}"
)
sum_value = 0
used_coins: Set = set()
self.log.warning(error_msg)
raise ValueError(error_msg)
# Use older coins first
unspent.sort(reverse=True, key=lambda r: r.coin.amount)
self.log.info(f"About to select coins for amount {amount}")
unspent: List[WalletCoinRecord] = list(
await self.wallet_state_manager.get_spendable_coins_for_wallet(self.id())
)
sum_value = 0
used_coins: Set = set()
# Try to use coins from the store, if there isn't enough of "unused"
# coins use change coins that are not confirmed yet
unconfirmed_removals: Dict[bytes32, Coin] = await self.wallet_state_manager.unconfirmed_removals_for_wallet(
self.id()
# Use older coins first
unspent.sort(reverse=True, key=lambda r: r.coin.amount)
# Try to use coins from the store, if there isn't enough of "unused"
# coins use change coins that are not confirmed yet
unconfirmed_removals: Dict[bytes32, Coin] = await self.wallet_state_manager.unconfirmed_removals_for_wallet(
self.id()
)
for coinrecord in unspent:
if sum_value >= amount and len(used_coins) > 0:
break
if coinrecord.coin.name() in unconfirmed_removals:
continue
if coinrecord.coin in exclude:
continue
sum_value += coinrecord.coin.amount
used_coins.add(coinrecord.coin)
self.log.debug(f"Selected coin: {coinrecord.coin.name()} at height {coinrecord.confirmed_block_height}!")
# This happens when we couldn't use one of the coins because it's already used
# but unconfirmed, and we are waiting for the change. (unconfirmed_additions)
if sum_value < amount:
raise ValueError(
"Can't make this transaction at the moment. Waiting for the change from the previous transaction."
)
for coinrecord in unspent:
if sum_value >= amount and len(used_coins) > 0:
break
if coinrecord.coin.name() in unconfirmed_removals:
continue
if coinrecord.coin in exclude:
continue
sum_value += coinrecord.coin.amount
used_coins.add(coinrecord.coin)
self.log.debug(
f"Selected coin: {coinrecord.coin.name()} at height {coinrecord.confirmed_block_height}!"
)
# This happens when we couldn't use one of the coins because it's already used
# but unconfirmed, and we are waiting for the change. (unconfirmed_additions)
if sum_value < amount:
raise ValueError(
"Can't make this transaction at the moment. Waiting for the change from the previous transaction."
)
self.log.debug(f"Successfully selected coins: {used_coins}")
return used_coins
async def generate_unsigned_transaction(
async def _generate_unsigned_transaction(
self,
amount: uint64,
newpuzzlehash: bytes32,
@ -294,6 +295,7 @@ class Wallet:
) -> List[CoinSolution]:
"""
Generates a unsigned transaction in form of List(Puzzle, Solutions)
Note: this must be called under a wallet state manager lock
"""
if primaries_input is None:
primaries: Optional[List[Dict]] = None
@ -377,13 +379,16 @@ class Wallet:
primaries: Optional[List[Dict[str, bytes32]]] = None,
ignore_max_send_amount: bool = False,
) -> TransactionRecord:
"""Use this to generate transaction."""
"""
Use this to generate transaction.
Note: this must be called under a wallet state manager lock
"""
if primaries is None:
non_change_amount = amount
else:
non_change_amount = uint64(amount + sum(p["amount"] for p in primaries))
transaction = await self.generate_unsigned_transaction(
transaction = await self._generate_unsigned_transaction(
amount, puzzle_hash, fee, origin_id, coins, primaries, ignore_max_send_amount
)
assert len(transaction) > 0

View File

@ -65,6 +65,7 @@ class WalletBlockchain(BlockchainInterface):
coins_of_interest_received: Any
reorg_rollback: Any
wallet_state_manager_lock: asyncio.Lock
# Whether blockchain is shut down or not
_shut_down: bool
@ -79,6 +80,7 @@ class WalletBlockchain(BlockchainInterface):
consensus_constants: ConsensusConstants,
coins_of_interest_received: Callable, # f(removals: List[Coin], additions: List[Coin], height: uint32)
reorg_rollback: Callable,
lock: asyncio.Lock,
):
"""
Initializes a blockchain with the BlockRecords from disk, assuming they have all been
@ -100,6 +102,7 @@ class WalletBlockchain(BlockchainInterface):
self.coins_of_interest_received = coins_of_interest_received
self.reorg_rollback = reorg_rollback
self.log = logging.getLogger(__name__)
self.wallet_state_manager_lock = lock
await self._load_chain_from_store()
return self
@ -211,24 +214,27 @@ class WalletBlockchain(BlockchainInterface):
)
# Always add the block to the database
async with self.block_store.db_wrapper.lock:
try:
await self.block_store.db_wrapper.begin_transaction()
await self.block_store.add_block_record(header_block_record, block_record)
self.add_block_record(block_record)
self.clean_block_record(block_record.height - self.constants.BLOCKS_CACHE_SIZE)
async with self.wallet_state_manager_lock:
async with self.block_store.db_wrapper.lock:
try:
await self.block_store.db_wrapper.begin_transaction()
await self.block_store.add_block_record(header_block_record, block_record)
self.add_block_record(block_record)
self.clean_block_record(block_record.height - self.constants.BLOCKS_CACHE_SIZE)
fork_height: Optional[uint32] = await self._reconsider_peak(block_record, genesis, fork_point_with_peak)
await self.block_store.db_wrapper.commit_transaction()
except BaseException as e:
self.log.error(f"Error during db transaction: {e}")
await self.block_store.db_wrapper.rollback_transaction()
raise
if fork_height is not None:
self.log.info(f"💰 Updated wallet peak to height {block_record.height}, weight {block_record.weight}, ")
return ReceiveBlockResult.NEW_PEAK, None, fork_height
else:
return ReceiveBlockResult.ADDED_AS_ORPHAN, None, None
fork_height: Optional[uint32] = await self._reconsider_peak(
block_record, genesis, fork_point_with_peak
)
await self.block_store.db_wrapper.commit_transaction()
except BaseException as e:
self.log.error(f"Error during db transaction: {e}")
await self.block_store.db_wrapper.rollback_transaction()
raise
if fork_height is not None:
self.log.info(f"💰 Updated wallet peak to height {block_record.height}, weight {block_record.weight}, ")
return ReceiveBlockResult.NEW_PEAK, None, fork_height
else:
return ReceiveBlockResult.ADDED_AS_ORPHAN, None, None
async def _reconsider_peak(
self, block_record: BlockRecord, genesis: bool, fork_point_with_peak: Optional[uint32]

View File

@ -121,7 +121,6 @@ class WalletStateManager:
else:
self.log = logging.getLogger(__name__)
self.lock = asyncio.Lock()
self.tx_lock = asyncio.Lock()
self.log.debug(f"Starting in db path: {db_path}")
self.db_connection = await aiosqlite.connect(db_path)
@ -141,6 +140,7 @@ class WalletStateManager:
self.constants,
self.coins_of_interest_received,
self.reorg_rollback,
self.lock,
)
self.weight_proof_handler = WeightProofHandler(self.constants, self.blockchain)
@ -498,14 +498,18 @@ class WalletStateManager:
"""
confirmed = await self.get_confirmed_balance_for_wallet(wallet_id, unspent_coin_records)
unconfirmed_tx: List[TransactionRecord] = await self.tx_store.get_unconfirmed_for_wallet(wallet_id)
removal_amount = 0
removal_amount: int = 0
addition_amount: int = 0
for record in unconfirmed_tx:
for removal in record.removals:
removal_amount += removal.amount
for addition in record.additions:
# This change or a self transaction
if await self.does_coin_belong_to_wallet(addition, wallet_id):
addition_amount += addition.amount
removal_amount += record.amount
removal_amount += record.fee_amount
result = confirmed - removal_amount
result = confirmed - removal_amount + addition_amount
return uint128(result)
async def unconfirmed_additions_for_wallet(self, wallet_id: int) -> Dict[bytes32, Coin]:
@ -785,7 +789,7 @@ class WalletStateManager:
async def get_filter_additions_removals(
self, new_block: HeaderBlock, transactions_filter: bytes, fork_point_with_peak: Optional[uint32]
) -> Tuple[List[bytes32], List[bytes32]]:
""" Returns a list of our coin ids, and a list of puzzle_hashes that positively match with provided filter. """
"""Returns a list of our coin ids, and a list of puzzle_hashes that positively match with provided filter."""
# assert new_block.prev_header_hash in self.blockchain.blocks
tx_filter = PyBIP158([b for b in transactions_filter])
@ -863,7 +867,7 @@ class WalletStateManager:
return additions_of_interest, removals_of_interest
async def get_relevant_additions(self, additions: List[Coin]) -> List[Coin]:
""" Returns the list of coins that are relevant to us.(We can spend them) """
"""Returns the list of coins that are relevant to us.(We can spend them)"""
result: List[Coin] = []
my_puzzle_hashes: Set[bytes32] = self.puzzle_store.all_puzzle_hashes
@ -891,7 +895,7 @@ class WalletStateManager:
return wallet
async def get_relevant_removals(self, removals: List[Coin]) -> List[Coin]:
""" Returns a list of our unspent coins that are in the passed list. """
"""Returns a list of our unspent coins that are in the passed list."""
result: List[Coin] = []
wallet_coin_records = await self.coin_store.get_unspent_coins_at_height()