atari - transactions for offers

This commit is contained in:
Kyle Altendorf 2022-08-19 18:36:22 -04:00
parent 97ef1ca152
commit 52f518e26d
No known key found for this signature in database
GPG Key ID: 5715D880FF005192
2 changed files with 279 additions and 247 deletions

View File

@ -143,30 +143,32 @@ class DataLayer:
self,
tree_id: bytes32,
changelist: List[Dict[str, Any]],
lock: bool = True,
) -> bytes32:
# Make sure we update based on the latest confirmed root.
async with self.lock:
await self._update_confirmation_status(tree_id=tree_id)
pending_root: Optional[Root] = await self.data_store.get_pending_root(tree_id=tree_id)
if pending_root is not None:
raise Exception("Already have a pending root waiting for confirmation.")
async with self.data_store.transaction(lock=lock):
# Make sure we update based on the latest confirmed root.
async with self.lock:
await self._update_confirmation_status(tree_id=tree_id, lock=False)
pending_root: Optional[Root] = await self.data_store.get_pending_root(tree_id=tree_id, lock=False)
if pending_root is not None:
raise Exception("Already have a pending root waiting for confirmation.")
# check before any DL changes that this singleton is currently owned by this wallet
singleton_records: List[SingletonRecord] = await self.get_owned_stores()
if not any(tree_id == singleton.launcher_id for singleton in singleton_records):
raise ValueError(f"Singleton with launcher ID {tree_id} is not owned by DL Wallet")
# check before any DL changes that this singleton is currently owned by this wallet
singleton_records: List[SingletonRecord] = await self.get_owned_stores()
if not any(tree_id == singleton.launcher_id for singleton in singleton_records):
raise ValueError(f"Singleton with launcher ID {tree_id} is not owned by DL Wallet")
t1 = time.monotonic()
batch_hash = await self.data_store.insert_batch(tree_id, changelist, lock=True)
t2 = time.monotonic()
self.log.info(f"Data store batch update process time: {t2 - t1}.")
# todo return empty node hash from get_tree_root
if batch_hash is not None:
node_hash = batch_hash
else:
node_hash = self.none_bytes # todo change
t1 = time.monotonic()
batch_hash = await self.data_store.insert_batch(tree_id, changelist, lock=False)
t2 = time.monotonic()
self.log.info(f"Data store batch update process time: {t2 - t1}.")
# todo return empty node hash from get_tree_root
if batch_hash is not None:
node_hash = batch_hash
else:
node_hash = self.none_bytes # todo change
return node_hash
return node_hash
async def publish_update(
self,
@ -194,20 +196,23 @@ class DataLayer:
store_id: bytes32,
key: bytes,
root_hash: Optional[bytes32] = None,
lock: bool = True,
) -> bytes32:
async with self.lock:
await self._update_confirmation_status(tree_id=store_id)
node = await self.data_store.get_node_by_key(tree_id=store_id, key=key, root_hash=root_hash)
return node.hash
async with self.data_store.transaction(lock=lock):
async with self.lock:
await self._update_confirmation_status(tree_id=store_id, lock=False)
node = await self.data_store.get_node_by_key(tree_id=store_id, key=key, root_hash=root_hash, lock=False)
return node.hash
async def get_value(self, store_id: bytes32, key: bytes) -> Optional[bytes]:
async with self.lock:
await self._update_confirmation_status(tree_id=store_id)
res = await self.data_store.get_node_by_key(tree_id=store_id, key=key)
if res is None:
self.log.error("Failed to fetch key")
return None
return res.value
async def get_value(self, store_id: bytes32, key: bytes, lock: bool = True) -> Optional[bytes]:
async with self.data_store.transaction(lock=lock):
async with self.lock:
await self._update_confirmation_status(tree_id=store_id, lock=False)
res = await self.data_store.get_node_by_key(tree_id=store_id, key=key, lock=False)
if res is None:
self.log.error("Failed to fetch key")
return None
return res.value
async def get_keys_values(self, store_id: bytes32, root_hash: Optional[bytes32]) -> List[TerminalNode]:
async with self.lock:
@ -236,6 +241,8 @@ class DataLayer:
return latest
async def get_local_root(self, store_id: bytes32) -> Optional[bytes32]:
async with self.lock:
await self._update_confirmation_status(tree_id=store_id)
res = await self.data_store.get_tree_root(tree_id=store_id)
if res is None:
self.log.error(f"Failed to get root for {store_id.hex()}")
@ -254,60 +261,61 @@ class DataLayer:
prev = record
return root_history
async def _update_confirmation_status(self, tree_id: bytes32) -> None:
try:
root = await self.data_store.get_tree_root(tree_id=tree_id)
except asyncio.CancelledError:
raise
except Exception:
root = None
singleton_record: Optional[SingletonRecord] = await self.wallet_rpc.dl_latest_singleton(tree_id, True)
if singleton_record is None:
return
if root is None:
pending_root = await self.data_store.get_pending_root(tree_id=tree_id)
if pending_root is not None:
if pending_root.generation == 0 and pending_root.node_hash is None:
await self.data_store.change_root_status(pending_root, Status.COMMITTED)
await self.data_store.clear_pending_roots(tree_id=tree_id)
return
else:
root = None
if root is None:
self.log.info(f"Don't have pending root for {tree_id}.")
return
if root.generation == singleton_record.generation:
return
if root.generation > singleton_record.generation:
self.log.info(
f"Local root ahead of chain root: {root.generation} {singleton_record.generation}. "
"Maybe we're doing a batch update."
async def _update_confirmation_status(self, tree_id: bytes32, lock: bool = True) -> None:
async with self.data_store.transaction(lock=lock):
try:
root = await self.data_store.get_tree_root(tree_id=tree_id, lock=False)
except asyncio.CancelledError:
raise
except Exception:
root = None
singleton_record: Optional[SingletonRecord] = await self.wallet_rpc.dl_latest_singleton(tree_id, True)
if singleton_record is None:
return
if root is None:
pending_root = await self.data_store.get_pending_root(tree_id=tree_id, lock=False)
if pending_root is not None:
if pending_root.generation == 0 and pending_root.node_hash is None:
await self.data_store.change_root_status(pending_root, Status.COMMITTED, lock=False)
await self.data_store.clear_pending_roots(tree_id=tree_id, lock=False)
return
else:
root = None
if root is None:
self.log.info(f"Don't have pending root for {tree_id}.")
return
if root.generation == singleton_record.generation:
return
if root.generation > singleton_record.generation:
self.log.info(
f"Local root ahead of chain root: {root.generation} {singleton_record.generation}. "
"Maybe we're doing a batch update."
)
return
wallet_history = await self.wallet_rpc.dl_history(
launcher_id=tree_id,
min_generation=uint32(root.generation + 1),
max_generation=singleton_record.generation,
)
return
wallet_history = await self.wallet_rpc.dl_history(
launcher_id=tree_id,
min_generation=uint32(root.generation + 1),
max_generation=singleton_record.generation,
)
new_hashes = [record.root for record in reversed(wallet_history)]
root_hash = self.none_bytes if root.node_hash is None else root.node_hash
generation_shift = 0
while len(new_hashes) > 0 and new_hashes[0] == root_hash:
generation_shift += 1
new_hashes.pop(0)
if generation_shift > 0:
await self.data_store.shift_root_generations(tree_id=tree_id, shift_size=generation_shift)
else:
expected_root_hash = None if new_hashes[0] == self.none_bytes else new_hashes[0]
pending_root = await self.data_store.get_pending_root(tree_id=tree_id)
if (
pending_root is not None
and pending_root.generation == root.generation + 1
and pending_root.node_hash == expected_root_hash
):
await self.data_store.change_root_status(pending_root, Status.COMMITTED)
await self.data_store.build_ancestor_table_for_latest_root(tree_id=tree_id)
await self.data_store.clear_pending_roots(tree_id=tree_id)
new_hashes = [record.root for record in reversed(wallet_history)]
root_hash = self.none_bytes if root.node_hash is None else root.node_hash
generation_shift = 0
while len(new_hashes) > 0 and new_hashes[0] == root_hash:
generation_shift += 1
new_hashes.pop(0)
if generation_shift > 0:
await self.data_store.shift_root_generations(tree_id=tree_id, shift_size=generation_shift, lock=False)
else:
expected_root_hash = None if new_hashes[0] == self.none_bytes else new_hashes[0]
pending_root = await self.data_store.get_pending_root(tree_id=tree_id, lock=False)
if (
pending_root is not None
and pending_root.generation == root.generation + 1
and pending_root.node_hash == expected_root_hash
):
await self.data_store.change_root_status(pending_root, Status.COMMITTED, lock=False)
await self.data_store.build_ancestor_table_for_latest_root(tree_id=tree_id, lock=False)
await self.data_store.clear_pending_roots(tree_id=tree_id, lock=False)
async def fetch_and_validate(self, tree_id: bytes32) -> None:
singleton_record: Optional[SingletonRecord] = await self.wallet_rpc.dl_latest_singleton(tree_id, True)
@ -520,86 +528,101 @@ class DataLayer:
except asyncio.CancelledError:
raise
async def build_offer_changelist(self, store_id: bytes32, inclusions: Tuple[KeyValue, ...]) -> List[Dict[str, Any]]:
changelist: List[Dict[str, Any]] = []
for entry in inclusions:
try:
existing_value = await self.get_value(store_id=store_id, key=entry.key)
except KeyNotFoundError:
existing_value = None
async def build_offer_changelist(
self,
store_id: bytes32,
inclusions: Tuple[KeyValue, ...],
lock: bool = True,
) -> List[Dict[str, Any]]:
async with self.data_store.transaction(lock=lock):
changelist: List[Dict[str, Any]] = []
for entry in inclusions:
try:
existing_value = await self.get_value(store_id=store_id, key=entry.key, lock=False)
except KeyNotFoundError:
existing_value = None
if existing_value == entry.value:
# already present, nothing needed
continue
if existing_value == entry.value:
# already present, nothing needed
continue
if existing_value is not None:
# upsert, delete the existing key and value
changelist.append(
{
"action": "delete",
"key": entry.key,
}
)
if existing_value is not None:
# upsert, delete the existing key and value
changelist.append(
{
"action": "delete",
"action": "insert",
"key": entry.key,
"value": entry.value,
}
)
changelist.append(
{
"action": "insert",
"key": entry.key,
"value": entry.value,
}
)
return changelist
return changelist
async def process_offered_stores(self, offer_stores: Tuple[OfferStore, ...]) -> Dict[bytes32, StoreProofs]:
our_store_proofs: Dict[bytes32, StoreProofs] = {}
for offer_store in offer_stores:
changelist = await self.build_offer_changelist(
store_id=offer_store.store_id,
inclusions=offer_store.inclusions,
)
if len(changelist) > 0:
new_root_hash = await self.batch_insert(
tree_id=offer_store.store_id,
changelist=changelist,
async def process_offered_stores(
self, offer_stores: Tuple[OfferStore, ...], lock: bool = True
) -> Dict[bytes32, StoreProofs]:
async with self.data_store.transaction(lock=lock):
our_store_proofs: Dict[bytes32, StoreProofs] = {}
for offer_store in offer_stores:
changelist = await self.build_offer_changelist(
store_id=offer_store.store_id,
inclusions=offer_store.inclusions,
lock=False,
)
else:
existing_root = await self.get_root(store_id=offer_store.store_id)
if existing_root is None:
raise Exception(f"store id not available: {offer_store.store_id.hex()}")
new_root_hash = existing_root.root
if new_root_hash is None:
raise Exception("only inserts are supported so a None root hash should not be possible")
if len(changelist) > 0:
new_root_hash = await self.batch_insert(
tree_id=offer_store.store_id,
changelist=changelist,
lock=False,
)
else:
existing_root = await self.get_root(store_id=offer_store.store_id)
if existing_root is None:
raise Exception(f"store id not available: {offer_store.store_id.hex()}")
new_root_hash = existing_root.root
proofs: List[Proof] = []
for entry in offer_store.inclusions:
node_hash = await self.get_key_value_hash(
store_id=offer_store.store_id, key=entry.key, root_hash=new_root_hash
)
proof_of_inclusion = await self.data_store.get_proof_of_inclusion_by_hash(
node_hash=node_hash,
tree_id=offer_store.store_id,
root_hash=new_root_hash,
)
proof = Proof(
key=entry.key,
value=entry.value,
node_hash=proof_of_inclusion.node_hash,
layers=tuple(
Layer(
other_hash_side=layer.other_hash_side,
other_hash=layer.other_hash,
combined_hash=layer.combined_hash,
)
for layer in proof_of_inclusion.layers
),
)
proofs.append(proof)
store_proof = StoreProofs(store_id=offer_store.store_id, proofs=tuple(proofs))
our_store_proofs[offer_store.store_id] = store_proof
return our_store_proofs
if new_root_hash is None:
raise Exception("only inserts are supported so a None root hash should not be possible")
proofs: List[Proof] = []
for entry in offer_store.inclusions:
node_hash = await self.get_key_value_hash(
store_id=offer_store.store_id,
key=entry.key,
root_hash=new_root_hash,
lock=False,
)
proof_of_inclusion = await self.data_store.get_proof_of_inclusion_by_hash(
node_hash=node_hash,
tree_id=offer_store.store_id,
root_hash=new_root_hash,
lock=False,
)
proof = Proof(
key=entry.key,
value=entry.value,
node_hash=proof_of_inclusion.node_hash,
layers=tuple(
Layer(
other_hash_side=layer.other_hash_side,
other_hash=layer.other_hash,
combined_hash=layer.combined_hash,
)
for layer in proof_of_inclusion.layers
),
)
proofs.append(proof)
store_proof = StoreProofs(store_id=offer_store.store_id, proofs=tuple(proofs))
our_store_proofs[offer_store.store_id] = store_proof
return our_store_proofs
async def make_offer(
self,
@ -607,55 +630,56 @@ class DataLayer:
taker: Tuple[OfferStore, ...],
fee: uint64,
) -> Offer:
our_store_proofs = await self.process_offered_stores(offer_stores=maker)
async with self.data_store.transaction():
our_store_proofs = await self.process_offered_stores(offer_stores=maker, lock=False)
offer_dict: Dict[Union[uint32, str], int] = {
**{offer_store.store_id.hex(): -1 for offer_store in maker},
**{offer_store.store_id.hex(): 1 for offer_store in taker},
}
solver: Dict[str, Any] = {
"0x"
+ our_offer_store.store_id.hex(): {
"new_root": "0x" + our_store_proofs[our_offer_store.store_id].proofs[0].root().hex(),
"dependencies": [
{
"launcher_id": "0x" + their_offer_store.store_id.hex(),
"values_to_prove": [
"0x" + leaf_hash(key=entry.key, value=entry.value).hex()
for entry in their_offer_store.inclusions
],
}
for their_offer_store in taker
],
offer_dict: Dict[Union[uint32, str], int] = {
**{offer_store.store_id.hex(): -1 for offer_store in maker},
**{offer_store.store_id.hex(): 1 for offer_store in taker},
}
for our_offer_store in maker
}
wallet_offer, trade_record = await self.wallet_rpc.create_offer_for_ids(
offer_dict=offer_dict,
solver=solver,
driver_dict={},
fee=fee,
validate_only=False,
)
if wallet_offer is None:
raise Exception("offer is None despite validate_only=False")
solver: Dict[str, Any] = {
"0x"
+ our_offer_store.store_id.hex(): {
"new_root": "0x" + our_store_proofs[our_offer_store.store_id].proofs[0].root().hex(),
"dependencies": [
{
"launcher_id": "0x" + their_offer_store.store_id.hex(),
"values_to_prove": [
"0x" + leaf_hash(key=entry.key, value=entry.value).hex()
for entry in their_offer_store.inclusions
],
}
for their_offer_store in taker
],
}
for our_offer_store in maker
}
offer = Offer(
offer_id=trade_record.trade_id,
offer=bytes(wallet_offer),
taker=taker,
maker=tuple(our_store_proofs.values()),
)
wallet_offer, trade_record = await self.wallet_rpc.create_offer_for_ids(
offer_dict=offer_dict,
solver=solver,
driver_dict={},
fee=fee,
validate_only=False,
)
if wallet_offer is None:
raise Exception("offer is None despite validate_only=False")
# being extra careful and verifying the offer before returning it
trading_offer = TradingOffer.from_bytes(offer.offer)
summary = await DataLayerWallet.get_offer_summary(offer=trading_offer)
offer = Offer(
offer_id=trade_record.trade_id,
offer=bytes(wallet_offer),
taker=taker,
maker=tuple(our_store_proofs.values()),
)
verify_offer(maker=offer.maker, taker=offer.taker, summary=summary)
# being extra careful and verifying the offer before returning it
trading_offer = TradingOffer.from_bytes(offer.offer)
summary = await DataLayerWallet.get_offer_summary(offer=trading_offer)
return offer
verify_offer(maker=offer.maker, taker=offer.taker, summary=summary)
return offer
async def take_offer(
self,
@ -664,59 +688,61 @@ class DataLayer:
maker: Tuple[StoreProofs, ...],
fee: uint64,
) -> TradeRecord:
our_store_proofs = await self.process_offered_stores(offer_stores=taker)
async with self.data_store.transaction():
our_store_proofs = await self.process_offered_stores(offer_stores=taker, lock=False)
offer = TradingOffer.from_bytes(offer_bytes)
summary = await DataLayerWallet.get_offer_summary(offer=offer)
offer = TradingOffer.from_bytes(offer_bytes)
summary = await DataLayerWallet.get_offer_summary(offer=offer)
verify_offer(maker=maker, taker=taker, summary=summary)
verify_offer(maker=maker, taker=taker, summary=summary)
all_store_proofs: Dict[bytes32, StoreProofs] = {
store_proofs.proofs[0].root(): store_proofs for store_proofs in [*maker, *our_store_proofs.values()]
}
proofs_of_inclusion: List[Tuple[str, str, List[str]]] = []
for root, store_proofs in all_store_proofs.items():
for proof in store_proofs.proofs:
layers = [
ProofOfInclusionLayer(
combined_hash=layer.combined_hash,
other_hash_side=layer.other_hash_side,
other_hash=layer.other_hash,
all_store_proofs: Dict[bytes32, StoreProofs] = {
store_proofs.proofs[0].root(): store_proofs for store_proofs in [*maker, *our_store_proofs.values()]
}
proofs_of_inclusion: List[Tuple[str, str, List[str]]] = []
for root, store_proofs in all_store_proofs.items():
for proof in store_proofs.proofs:
layers = [
ProofOfInclusionLayer(
combined_hash=layer.combined_hash,
other_hash_side=layer.other_hash_side,
other_hash=layer.other_hash,
)
for layer in proof.layers
]
proof_of_inclusion = ProofOfInclusion(node_hash=proof.node_hash, layers=layers)
sibling_sides_integer = proof_of_inclusion.sibling_sides_integer()
proofs_of_inclusion.append(
(
root.hex(),
str(sibling_sides_integer),
["0x" + sibling_hash.hex() for sibling_hash in proof_of_inclusion.sibling_hashes()],
)
)
for layer in proof.layers
]
proof_of_inclusion = ProofOfInclusion(node_hash=proof.node_hash, layers=layers)
sibling_sides_integer = proof_of_inclusion.sibling_sides_integer()
proofs_of_inclusion.append(
(
root.hex(),
str(sibling_sides_integer),
["0x" + sibling_hash.hex() for sibling_hash in proof_of_inclusion.sibling_hashes()],
)
)
solver: Dict[str, Any] = {
"proofs_of_inclusion": proofs_of_inclusion,
**{
"0x"
+ our_offer_store.store_id.hex(): {
"new_root": "0x" + root.hex(),
"dependencies": [
{
"launcher_id": "0x" + their_offer_store.store_id.hex(),
"values_to_prove": ["0x" + entry.node_hash.hex() for entry in their_offer_store.proofs],
}
for their_offer_store in maker
],
}
for our_offer_store in taker
},
}
solver: Dict[str, Any] = {
"proofs_of_inclusion": proofs_of_inclusion,
**{
"0x"
+ our_offer_store.store_id.hex(): {
"new_root": "0x" + root.hex(),
"dependencies": [
{
"launcher_id": "0x" + their_offer_store.store_id.hex(),
"values_to_prove": ["0x" + entry.node_hash.hex() for entry in their_offer_store.proofs],
}
for their_offer_store in maker
],
}
for our_offer_store in taker
},
}
trade_record = await self.wallet_rpc.take_offer(
offer=offer,
solver=solver,
fee=fee,
)
# TODO: what if this fails after the offer is taken?
trade_record = await self.wallet_rpc.take_offer(
offer=offer,
solver=solver,
fee=fee,
)
return trade_record
return trade_record

View File

@ -1,7 +1,8 @@
import logging
from collections import defaultdict
from contextlib import asynccontextmanager
from dataclasses import dataclass, replace
from typing import Any, Awaitable, BinaryIO, Callable, Dict, List, Optional, Set, Tuple
from typing import Any, AsyncIterator, Awaitable, BinaryIO, Callable, Dict, List, Optional, Set, Tuple
import aiosqlite
@ -145,6 +146,11 @@ class DataStore:
return self
@asynccontextmanager
async def transaction(self, lock: bool = True) -> AsyncIterator[None]:
async with self.db_wrapper.locked_transaction(lock=lock):
yield
async def _insert_root(
self,
tree_id: bytes32,
@ -337,8 +343,8 @@ class DataStore:
for _ in range(shift_size):
await self._insert_root(tree_id=tree_id, node_hash=root.node_hash, status=Status.COMMITTED)
async def change_root_status(self, root: Root, status: Status = Status.PENDING) -> None:
async with self.db_wrapper.locked_transaction(lock=True):
async def change_root_status(self, root: Root, status: Status = Status.PENDING, lock: bool = True) -> None:
async with self.db_wrapper.locked_transaction(lock=lock):
await self.db.execute(
"UPDATE root SET status = ? WHERE tree_id=? and generation = ?",
(