diff --git a/chia/data_layer/data_store.py b/chia/data_layer/data_store.py index 2a5f1631891f..2b4ed306ab9c 100644 --- a/chia/data_layer/data_store.py +++ b/chia/data_layer/data_store.py @@ -4,6 +4,7 @@ from dataclasses import dataclass from enum import IntEnum import io import logging +import random # from typing import Dict, List, Optional, Tuple from typing import Iterable, List, Tuple @@ -85,6 +86,20 @@ class TableRow: return sexp_self == other + def __hash__(self) -> int: + # TODO: this is dirty, consider the TODOs in .__eq__() + return object.__hash__(dataclasses.replace(self, clvm_object=SExp.to(self.clvm_object))) + + +# # TODO: remove or formalize this +# async def _debug_dump(db, description=""): +# cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table';") +# print("-" * 50, description, flush=True) +# for [name] in await cursor.fetchall(): +# cursor = await db.execute(f"SELECT * FROM {name}") +# x = await cursor.fetchall() +# print(f"\n -- {name} ------", x, flush=True) + @dataclass(frozen=True) class Action: @@ -125,36 +140,45 @@ class DataStore: self.db_wrapper = db_wrapper self.db = db_wrapper.db - # TODO: what pragmas do we want? + # TODO: what pragmas do we want? maybe foreign_keys? # await self.db.execute("pragma journal_mode=wal") # await self.db.execute("pragma synchronous=2") # TODO: make this handle multiple data layer tables # TODO: do we need to handle multiple equal rows - # Just a raw collection of all ChiaLisp lists that are used + await self.db.execute("CREATE TABLE IF NOT EXISTS tables(id TEXT PRIMARY KEY, name STRING)") + await self.db.execute("CREATE TABLE IF NOT EXISTS keys_values(key TEXT PRIMARY KEY, value BLOB)") await self.db.execute( - "CREATE TABLE IF NOT EXISTS raw_rows(row_hash TEXT PRIMARY KEY, table_id TEXT, clvm_object BLOB)" + "CREATE TABLE IF NOT EXISTS table_values(" + "table_id TEXT," + " key STRING," + " PRIMARY KEY(table_id, key)," + " FOREIGN KEY(table_id) REFERENCES tables(id)," + " FOREIGN KEY(key) REFERENCES keys_values(key)" + ")" ) - # The present properly ordered collection of rows. await self.db.execute( - "CREATE TABLE IF NOT EXISTS data_rows(" - "row_hash TEXT PRIMARY KEY," - " FOREIGN KEY(row_hash) REFERENCES raw_rows(row_hash))" + "CREATE TABLE IF NOT EXISTS commits(" + "id TEXT PRIMARY KEY," + " table_id TEXT," + " state INTEGER," + " FOREIGN KEY(table_id) REFERENCES tables(id)" + ")" ) - # TODO: needs a key + await self.db.execute( + "CREATE TABLE IF NOT EXISTS actions(" + "commit_id TEXT," + " idx INTEGER," + " operation INTEGER," + " key TEXT," + " PRIMARY KEY(commit_id, idx)" + ")" + ) + # TODO: As operations are reverted do they get deleted? Or perhaps we track # a reference into the table and only remove when a non-matching forward # step is taken? Or reverts are just further actions? - await self.db.execute( - "CREATE TABLE IF NOT EXISTS actions(" - "row_hash TEXT," - " operation INTEGER," - " FOREIGN KEY(row_hash) REFERENCES raw_rows(row_hash))" - ) - # TODO: Could also be structured such that the action table has a reference from - # each action to the commit it is part of. - await self.db.execute("CREATE TABLE IF NOT EXISTS commits(changelist_hash TEXT, actions_index INTEGER)") await self.db.commit() @@ -165,30 +189,46 @@ class DataStore: # chia.util.merkle_set.TerminalNode requires 32 bytes so I think that's applicable here - # TODO: added as the core was adjusted to be `get_rows` (plural). API can be - # discussed more. + # TODO: should be Set[TableRow] but our stuff isn't super hashable yet... + async def get_rows(self, table: bytes32) -> List[TableRow]: + cursor = await self.db.execute( + "SELECT value FROM keys_values INNER JOIN table_values" + " WHERE" + " keys_values.key == table_values.key" + " AND table_values.table_id == :table_id", + {"table_id": table}, + ) + + some_clvm_bytes = [value for [value] in await cursor.fetchall()] + + table_rows = [TableRow.from_clvm_bytes(clvm_bytes=clvm_bytes) for clvm_bytes in some_clvm_bytes] + + return table_rows + async def get_row_by_hash(self, table: bytes32, row_hash: bytes32) -> TableRow: cursor = await self.db.execute( - ( - "SELECT raw_rows.row_hash, raw_rows.clvm_object" - " FROM raw_rows INNER JOIN data_rows" - " WHERE raw_rows.row_hash == data_rows.row_hash AND data_rows.row_hash == ?" - ), - (row_hash,), + "SELECT value FROM keys_values INNER JOIN table_values" + " WHERE" + " keys_values.key == :key" + " AND keys_values.key == table_values.key" + " AND table_values.table_id == :table_id", + {"key": row_hash, "table_id": table}, ) - rows = await cursor.fetchall() - [table_row] = [ - TableRow( - clvm_object=sexp_from_stream(io.BytesIO(clvm_object_bytes), to_sexp=CLVMObject), - hash=row_hash, - bytes=clvm_object_bytes, - ) - for row_hash, clvm_object_bytes in rows - ] + [clvm_bytes] = [value for [value] in await cursor.fetchall()] + + table_row = TableRow( + clvm_object=sexp_from_stream(io.BytesIO(clvm_bytes), to_sexp=CLVMObject), + hash=row_hash, + bytes=clvm_bytes, + ) return table_row + async def create_table(self, id: bytes32, name: str) -> None: + await self.db.execute("INSERT INTO tables(id, name) VALUES(:id, :name)", {"id": id, "name": name}) + await self.db.commit() + async def insert_row(self, table: bytes32, clvm_object: CLVMObject) -> TableRow: """ Args: @@ -197,46 +237,66 @@ class DataStore: # TODO: Should we be using CLVMObject or SExp? row_hash = sha256_treehash(sexp=clvm_object) - - # check if this is already present in the raw_rows - cursor = await self.db.execute( - "SELECT * FROM raw_rows WHERE row_hash=:row_hash", - parameters={"row_hash": row_hash}, - ) - clvm_bytes = SExp.to(clvm_object).as_bin() - if await cursor.fetchone() is None: - # not present in raw_rows so add it - await self.db.execute( - "INSERT INTO raw_rows (row_hash, table_id, clvm_object) VALUES(?, ?, ?)", (row_hash, table, clvm_bytes) - ) - - await self.db.execute("INSERT INTO data_rows (row_hash) VALUES(?)", (row_hash,)) await self.db.execute( - "INSERT INTO actions (row_hash, operation) VALUES(?, ?)", - (row_hash, OperationType.INSERT), + "INSERT INTO keys_values(key, value) VALUES(:key, :value)", {"key": row_hash, "value": clvm_bytes} ) + await self.db.execute( + "INSERT INTO table_values(table_id, key) VALUES(:table_id, :key)", {"table_id": table, "key": row_hash} + ) + + await self.add_action(operation_type=OperationType.INSERT, key=row_hash, table=table) + # TODO: Review reentrancy on .commit() since it isn't clearly tied to this # particular task's activity. await self.db.commit() return TableRow(clvm_object=clvm_object, hash=row_hash, bytes=clvm_bytes) + async def add_action(self, operation_type: OperationType, key: bytes32, table: bytes32) -> None: + cursor = await self.db.execute( + "SELECT id, table_id FROM commits WHERE table_id == :table_id AND state == :state", + {"table_id": table, "state": CommitState.OPEN}, + ) + commits_rows: List[Tuple[bytes32, bytes32]] = [(id, table_id) for id, table_id in await cursor.fetchall()] + if len(commits_rows) == 0: + # TODO: just copied from elsewhere... reconsider + commit_id = random.randint(0, 100000000).to_bytes(32, "big") + print("table_id", repr(table)) + await self.db.execute( + "INSERT INTO commits(id, table_id, state) VALUES(:id, :table_id, :state)", + {"id": commit_id, "table_id": table, "state": CommitState.OPEN}, + ) + next_actions_index = 0 + else: + [commit_id] = [commit_id for commit_id, table_id in commits_rows] + + cursor = await self.db.execute( + "SELECT MAX(idx) FROM actions WHERE commit_id == :commit_id", {"commit_id": commit_id} + ) + [[max_actions_index]] = await cursor.fetchall() + next_actions_index = max_actions_index + 1 + await self.db.execute( + "INSERT INTO actions(idx, commit_id, operation, key) VALUES(:idx, :commit_id, :operation, :key)", + {"idx": next_actions_index, "commit_id": commit_id, "operation": operation_type, "key": key}, + ) + async def delete_row_by_hash(self, table: bytes32, row_hash: bytes32) -> TableRow: + # TODO: do we really want to bother getting just so we can have the value and bytes? table_row = await self.get_row_by_hash(table=table, row_hash=row_hash) # TODO: How do we generally handle multiple incoming requests to avoid them # trompling over each other via race conditions such as here? - await self.db.execute("DELETE FROM data_rows WHERE row_hash == ?", (row_hash,)) - await self.db.execute( - "INSERT INTO actions (row_hash, operation) VALUES(?, ?)", - (table_row.hash, OperationType.DELETE), + "DELETE FROM table_values WHERE table_id == :table_id AND key == :key", + {"table_id": table, "key": row_hash}, ) + await self.add_action(operation_type=OperationType.DELETE, key=row_hash, table=table) + await self.db.commit() return table_row @@ -245,9 +305,9 @@ class DataStore: # TODO: What needs to be done to retain proper ordering, relates to the question # at the table creation as well. cursor = await self.db.execute( - "SELECT actions.operation, raw_rows.clvm_object" - " FROM actions INNER JOIN raw_rows" - " WHERE actions.row_hash == raw_rows.row_hash" + "SELECT actions.operation, keys_values.value" + " FROM actions INNER JOIN keys_values" + " WHERE actions.key == keys_values.key" ) actions = await cursor.fetchall() diff --git a/tests/core/data_layer/test_data_store.py b/tests/core/data_layer/test_data_store.py index 4e861366084d..38d432dff54b 100644 --- a/tests/core/data_layer/test_data_store.py +++ b/tests/core/data_layer/test_data_store.py @@ -11,7 +11,7 @@ import pytest # from chia.consensus.blockchain import Blockchain from chia.data_layer.data_store import Action, DataStore, OperationType, TableRow -from chia.types.blockchain_format.tree_hash import sha256_treehash +from chia.types.blockchain_format.tree_hash import bytes32, sha256_treehash # from chia.full_node.block_store import BlockStore # from chia.full_node.coin_store import CoinStore @@ -34,6 +34,8 @@ log = logging.getLogger(__name__) # yield connection async def db_connection_fixture() -> AsyncIterable[aiosqlite.Connection]: async with aiosqlite.connect(":memory:") as connection: + # TODO: consider this... should it be used at runtime? + await connection.execute("PRAGMA foreign_keys = ON") yield connection @@ -48,9 +50,18 @@ def db_fixture(db_wrapper: DBWrapper) -> aiosqlite.Connection: return db_wrapper.db +@pytest.fixture(name="table_id", scope="function") +def table_id_fixture() -> bytes32: + base = b"a table id" + pad = b"." * (32 - len(base)) + return bytes32(pad + base) + + @pytest.fixture(name="data_store", scope="function") -async def data_store_fixture(db_wrapper: DBWrapper) -> DataStore: - return await DataStore.create(db_wrapper=db_wrapper) +async def data_store_fixture(db_wrapper: DBWrapper, table_id: bytes32) -> DataStore: + data_store = await DataStore.create(db_wrapper=db_wrapper) + await data_store.create_table(id=table_id, name="A Table") + return data_store clvm_objects = [ @@ -86,10 +97,11 @@ clvm_objects = [ table_columns: Dict[str, List[str]] = { - "raw_rows": ["row_hash", "table_id", "clvm_object"], - "data_rows": ["row_hash"], - "actions": ["row_hash", "operation"], - "commits": ["changelist_hash", "actions_index"], + "tables": ["id", "name"], + "keys_values": ["key", "value"], + "table_values": ["table_id", "key"], + "commits": ["id", "table_id", "state"], + "actions": ["commit_id", "idx", "operation", "key"], } @@ -117,118 +129,109 @@ async def test_create_creates_tables_and_columns( @pytest.mark.asyncio -async def test_get_row_by_hash_single_match(data_store: DataStore) -> None: +async def test_insert_with_invalid_table_fails(data_store: DataStore) -> None: + # TODO: If this API is retained then it should have a specific exception. + with pytest.raises(Exception): + await data_store.insert_row(table=b"non-existant table", clvm_object=clvm_objects[0]) + + +@pytest.mark.asyncio +async def test_get_row_by_hash_single_match(data_store: DataStore, table_id: bytes32) -> None: a_clvm_object, *_ = clvm_objects - await data_store.insert_row(table=b"", clvm_object=a_clvm_object) + + await data_store.insert_row(table=table_id, clvm_object=a_clvm_object) row_hash = sha256_treehash(SExp.to(a_clvm_object)) - table_row = await data_store.get_row_by_hash(table=b"", row_hash=row_hash) + table_row = await data_store.get_row_by_hash(table=table_id, row_hash=row_hash) assert table_row == TableRow.from_clvm_object(clvm_object=a_clvm_object) @pytest.mark.asyncio -async def test_get_row_by_hash_no_match(data_store: DataStore) -> None: +async def test_get_row_by_hash_no_match(data_store: DataStore, table_id: bytes32) -> None: a_clvm_object, another_clvm_object, *_ = clvm_objects - await data_store.insert_row(table=b"", clvm_object=a_clvm_object) + await data_store.insert_row(table=table_id, clvm_object=a_clvm_object) other_row_hash = sha256_treehash(SExp.to(another_clvm_object)) # TODO: If this API is retained then it should have a specific exception. with pytest.raises(Exception): - await data_store.get_row_by_hash(table=b"", row_hash=other_row_hash) + await data_store.get_row_by_hash(table=table_id, row_hash=other_row_hash) + + +def expected_keys(clvm_objects: List[CLVMObject]) -> Set[Tuple[bytes]]: + return {sha256_treehash(SExp.to(clvm_object)) for clvm_object in clvm_objects} @pytest.mark.asyncio -async def test_insert_adds_to_raw_rows(data_store: DataStore) -> None: - a_clvm_object, *_ = clvm_objects - await data_store.insert_row(table=b"", clvm_object=a_clvm_object) - - cursor = await data_store.db.execute("SELECT * FROM raw_rows") - # TODO: The runtime type is a list, maybe ask about adjusting the hint? - # https://github.com/omnilib/aiosqlite/blob/13d165656f73c3121001622253a532bdc90b2b91/aiosqlite/cursor.py#L63 - raw_rows: List[object] = await cursor.fetchall() # type: ignore[assignment] - - cursor = await data_store.db.execute("SELECT row_hash FROM data_rows") - # TODO: The runtime type is a list, maybe ask about adjusting the hint? - # https://github.com/omnilib/aiosqlite/blob/13d165656f73c3121001622253a532bdc90b2b91/aiosqlite/cursor.py#L63 - data_rows: List[object] = await cursor.fetchall() # type: ignore[assignment] - - assert [len(raw_rows), len(data_rows)] == [1, 1] - - -def expected_data_rows(clvm_objects: List[CLVMObject]) -> Set[Tuple[bytes]]: - return {(sha256_treehash(SExp.to(clvm_object)),) for clvm_object in clvm_objects} - - -@pytest.mark.asyncio -async def test_insert_does(data_store: DataStore) -> None: +async def test_insert_does(data_store: DataStore, table_id: bytes32) -> None: a_clvm_object, another_clvm_object, *_ = clvm_objects - await data_store.insert_row(table=b"", clvm_object=a_clvm_object) - await data_store.insert_row(table=b"", clvm_object=another_clvm_object) + await data_store.insert_row(table=table_id, clvm_object=a_clvm_object) + await data_store.insert_row(table=table_id, clvm_object=another_clvm_object) - cursor = await data_store.db.execute("SELECT row_hash FROM data_rows") - data_rows: Set[Tuple[bytes]] = set(await cursor.fetchall()) # type: ignore[arg-type] + table_rows = await data_store.get_rows(table=table_id) - expected = expected_data_rows([a_clvm_object, another_clvm_object]) - assert data_rows == expected + expected = [ + TableRow.from_clvm_object(clvm_object=clvm_object) for clvm_object in [a_clvm_object, another_clvm_object] + ] + assert table_rows == expected @pytest.mark.asyncio -async def test_deletes_row_by_hash(data_store: DataStore) -> None: +async def test_deletes_row_by_hash(data_store: DataStore, table_id: bytes32) -> None: a_clvm_object, another_clvm_object, *_ = clvm_objects - await data_store.insert_row(table=b"", clvm_object=a_clvm_object) - await data_store.insert_row(table=b"", clvm_object=another_clvm_object) - await data_store.delete_row_by_hash(table=b"", row_hash=sha256_treehash(SExp.to(a_clvm_object))) + await data_store.insert_row(table=table_id, clvm_object=a_clvm_object) + await data_store.insert_row(table=table_id, clvm_object=another_clvm_object) + await data_store.delete_row_by_hash(table=table_id, row_hash=sha256_treehash(SExp.to(a_clvm_object))) - cursor = await data_store.db.execute("SELECT row_hash FROM data_rows") - data_rows: Set[Tuple[bytes]] = set(await cursor.fetchall()) # type: ignore[arg-type] + table_rows = await data_store.get_rows(table=table_id) - expected = expected_data_rows([another_clvm_object]) - assert data_rows == expected + expected = [TableRow.from_clvm_object(clvm_object=clvm_object) for clvm_object in [another_clvm_object]] + + assert table_rows == expected @pytest.mark.asyncio -async def test_get_all_actions_just_inserts(data_store: DataStore) -> None: +async def test_get_all_actions_just_inserts(data_store: DataStore, table_id: bytes32) -> None: expected = [] - await data_store.insert_row(table=b"", clvm_object=clvm_objects[0]) + await data_store.insert_row(table=table_id, clvm_object=clvm_objects[0]) expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[0]))) - await data_store.insert_row(table=b"", clvm_object=clvm_objects[1]) + await data_store.insert_row(table=table_id, clvm_object=clvm_objects[1]) expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[1]))) - await data_store.insert_row(table=b"", clvm_object=clvm_objects[2]) + await data_store.insert_row(table=table_id, clvm_object=clvm_objects[2]) expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[2]))) - await data_store.insert_row(table=b"", clvm_object=clvm_objects[3]) + await data_store.insert_row(table=table_id, clvm_object=clvm_objects[3]) expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[3]))) - all_actions = await data_store.get_all_actions(table=b"") + all_actions = await data_store.get_all_actions(table=table_id) assert all_actions == expected @pytest.mark.asyncio -async def test_get_all_actions_with_a_delete(data_store: DataStore) -> None: +async def test_get_all_actions_with_a_delete(data_store: DataStore, table_id: bytes32) -> None: expected = [] - await data_store.insert_row(table=b"", clvm_object=clvm_objects[0]) + await data_store.insert_row(table=table_id, clvm_object=clvm_objects[0]) expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[0]))) - await data_store.insert_row(table=b"", clvm_object=clvm_objects[1]) + await data_store.insert_row(table=table_id, clvm_object=clvm_objects[1]) expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[1]))) - await data_store.insert_row(table=b"", clvm_object=clvm_objects[2]) + await data_store.insert_row(table=table_id, clvm_object=clvm_objects[2]) expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[2]))) # note this is a delete - await data_store.delete_row_by_hash(table=b"", row_hash=sha256_treehash(sexp=clvm_objects[1])) + await data_store.delete_row_by_hash(table=table_id, row_hash=sha256_treehash(sexp=clvm_objects[1])) expected.append(Action(op=OperationType.DELETE, row=TableRow.from_clvm_object(clvm_object=clvm_objects[1]))) - await data_store.insert_row(table=b"", clvm_object=clvm_objects[3]) + await data_store.insert_row(table=table_id, clvm_object=clvm_objects[3]) expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[3]))) - all_actions = await data_store.get_all_actions(table=b"") + all_actions = await data_store.get_all_actions(table=table_id) assert all_actions == expected