switch data layer to simple key value store, no order/indexes or dpulicates

This commit is contained in:
Kyle Altendorf 2021-09-24 15:58:36 -04:00
parent d03431c938
commit fd1266d708
No known key found for this signature in database
GPG Key ID: 8FA4A4D43C59271E
2 changed files with 183 additions and 120 deletions

View File

@ -4,6 +4,7 @@ from dataclasses import dataclass
from enum import IntEnum
import io
import logging
import random
# from typing import Dict, List, Optional, Tuple
from typing import Iterable, List, Tuple
@ -85,6 +86,20 @@ class TableRow:
return sexp_self == other
def __hash__(self) -> int:
# TODO: this is dirty, consider the TODOs in .__eq__()
return object.__hash__(dataclasses.replace(self, clvm_object=SExp.to(self.clvm_object)))
# # TODO: remove or formalize this
# async def _debug_dump(db, description=""):
# cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table';")
# print("-" * 50, description, flush=True)
# for [name] in await cursor.fetchall():
# cursor = await db.execute(f"SELECT * FROM {name}")
# x = await cursor.fetchall()
# print(f"\n -- {name} ------", x, flush=True)
@dataclass(frozen=True)
class Action:
@ -125,36 +140,45 @@ class DataStore:
self.db_wrapper = db_wrapper
self.db = db_wrapper.db
# TODO: what pragmas do we want?
# TODO: what pragmas do we want? maybe foreign_keys?
# await self.db.execute("pragma journal_mode=wal")
# await self.db.execute("pragma synchronous=2")
# TODO: make this handle multiple data layer tables
# TODO: do we need to handle multiple equal rows
# Just a raw collection of all ChiaLisp lists that are used
await self.db.execute("CREATE TABLE IF NOT EXISTS tables(id TEXT PRIMARY KEY, name STRING)")
await self.db.execute("CREATE TABLE IF NOT EXISTS keys_values(key TEXT PRIMARY KEY, value BLOB)")
await self.db.execute(
"CREATE TABLE IF NOT EXISTS raw_rows(row_hash TEXT PRIMARY KEY, table_id TEXT, clvm_object BLOB)"
"CREATE TABLE IF NOT EXISTS table_values("
"table_id TEXT,"
" key STRING,"
" PRIMARY KEY(table_id, key),"
" FOREIGN KEY(table_id) REFERENCES tables(id),"
" FOREIGN KEY(key) REFERENCES keys_values(key)"
")"
)
# The present properly ordered collection of rows.
await self.db.execute(
"CREATE TABLE IF NOT EXISTS data_rows("
"row_hash TEXT PRIMARY KEY,"
" FOREIGN KEY(row_hash) REFERENCES raw_rows(row_hash))"
"CREATE TABLE IF NOT EXISTS commits("
"id TEXT PRIMARY KEY,"
" table_id TEXT,"
" state INTEGER,"
" FOREIGN KEY(table_id) REFERENCES tables(id)"
")"
)
# TODO: needs a key
await self.db.execute(
"CREATE TABLE IF NOT EXISTS actions("
"commit_id TEXT,"
" idx INTEGER,"
" operation INTEGER,"
" key TEXT,"
" PRIMARY KEY(commit_id, idx)"
")"
)
# TODO: As operations are reverted do they get deleted? Or perhaps we track
# a reference into the table and only remove when a non-matching forward
# step is taken? Or reverts are just further actions?
await self.db.execute(
"CREATE TABLE IF NOT EXISTS actions("
"row_hash TEXT,"
" operation INTEGER,"
" FOREIGN KEY(row_hash) REFERENCES raw_rows(row_hash))"
)
# TODO: Could also be structured such that the action table has a reference from
# each action to the commit it is part of.
await self.db.execute("CREATE TABLE IF NOT EXISTS commits(changelist_hash TEXT, actions_index INTEGER)")
await self.db.commit()
@ -165,30 +189,46 @@ class DataStore:
# chia.util.merkle_set.TerminalNode requires 32 bytes so I think that's applicable here
# TODO: added as the core was adjusted to be `get_rows` (plural). API can be
# discussed more.
# TODO: should be Set[TableRow] but our stuff isn't super hashable yet...
async def get_rows(self, table: bytes32) -> List[TableRow]:
cursor = await self.db.execute(
"SELECT value FROM keys_values INNER JOIN table_values"
" WHERE"
" keys_values.key == table_values.key"
" AND table_values.table_id == :table_id",
{"table_id": table},
)
some_clvm_bytes = [value for [value] in await cursor.fetchall()]
table_rows = [TableRow.from_clvm_bytes(clvm_bytes=clvm_bytes) for clvm_bytes in some_clvm_bytes]
return table_rows
async def get_row_by_hash(self, table: bytes32, row_hash: bytes32) -> TableRow:
cursor = await self.db.execute(
(
"SELECT raw_rows.row_hash, raw_rows.clvm_object"
" FROM raw_rows INNER JOIN data_rows"
" WHERE raw_rows.row_hash == data_rows.row_hash AND data_rows.row_hash == ?"
),
(row_hash,),
"SELECT value FROM keys_values INNER JOIN table_values"
" WHERE"
" keys_values.key == :key"
" AND keys_values.key == table_values.key"
" AND table_values.table_id == :table_id",
{"key": row_hash, "table_id": table},
)
rows = await cursor.fetchall()
[table_row] = [
TableRow(
clvm_object=sexp_from_stream(io.BytesIO(clvm_object_bytes), to_sexp=CLVMObject),
hash=row_hash,
bytes=clvm_object_bytes,
)
for row_hash, clvm_object_bytes in rows
]
[clvm_bytes] = [value for [value] in await cursor.fetchall()]
table_row = TableRow(
clvm_object=sexp_from_stream(io.BytesIO(clvm_bytes), to_sexp=CLVMObject),
hash=row_hash,
bytes=clvm_bytes,
)
return table_row
async def create_table(self, id: bytes32, name: str) -> None:
await self.db.execute("INSERT INTO tables(id, name) VALUES(:id, :name)", {"id": id, "name": name})
await self.db.commit()
async def insert_row(self, table: bytes32, clvm_object: CLVMObject) -> TableRow:
"""
Args:
@ -197,46 +237,66 @@ class DataStore:
# TODO: Should we be using CLVMObject or SExp?
row_hash = sha256_treehash(sexp=clvm_object)
# check if this is already present in the raw_rows
cursor = await self.db.execute(
"SELECT * FROM raw_rows WHERE row_hash=:row_hash",
parameters={"row_hash": row_hash},
)
clvm_bytes = SExp.to(clvm_object).as_bin()
if await cursor.fetchone() is None:
# not present in raw_rows so add it
await self.db.execute(
"INSERT INTO raw_rows (row_hash, table_id, clvm_object) VALUES(?, ?, ?)", (row_hash, table, clvm_bytes)
)
await self.db.execute("INSERT INTO data_rows (row_hash) VALUES(?)", (row_hash,))
await self.db.execute(
"INSERT INTO actions (row_hash, operation) VALUES(?, ?)",
(row_hash, OperationType.INSERT),
"INSERT INTO keys_values(key, value) VALUES(:key, :value)", {"key": row_hash, "value": clvm_bytes}
)
await self.db.execute(
"INSERT INTO table_values(table_id, key) VALUES(:table_id, :key)", {"table_id": table, "key": row_hash}
)
await self.add_action(operation_type=OperationType.INSERT, key=row_hash, table=table)
# TODO: Review reentrancy on .commit() since it isn't clearly tied to this
# particular task's activity.
await self.db.commit()
return TableRow(clvm_object=clvm_object, hash=row_hash, bytes=clvm_bytes)
async def add_action(self, operation_type: OperationType, key: bytes32, table: bytes32) -> None:
cursor = await self.db.execute(
"SELECT id, table_id FROM commits WHERE table_id == :table_id AND state == :state",
{"table_id": table, "state": CommitState.OPEN},
)
commits_rows: List[Tuple[bytes32, bytes32]] = [(id, table_id) for id, table_id in await cursor.fetchall()]
if len(commits_rows) == 0:
# TODO: just copied from elsewhere... reconsider
commit_id = random.randint(0, 100000000).to_bytes(32, "big")
print("table_id", repr(table))
await self.db.execute(
"INSERT INTO commits(id, table_id, state) VALUES(:id, :table_id, :state)",
{"id": commit_id, "table_id": table, "state": CommitState.OPEN},
)
next_actions_index = 0
else:
[commit_id] = [commit_id for commit_id, table_id in commits_rows]
cursor = await self.db.execute(
"SELECT MAX(idx) FROM actions WHERE commit_id == :commit_id", {"commit_id": commit_id}
)
[[max_actions_index]] = await cursor.fetchall()
next_actions_index = max_actions_index + 1
await self.db.execute(
"INSERT INTO actions(idx, commit_id, operation, key) VALUES(:idx, :commit_id, :operation, :key)",
{"idx": next_actions_index, "commit_id": commit_id, "operation": operation_type, "key": key},
)
async def delete_row_by_hash(self, table: bytes32, row_hash: bytes32) -> TableRow:
# TODO: do we really want to bother getting just so we can have the value and bytes?
table_row = await self.get_row_by_hash(table=table, row_hash=row_hash)
# TODO: How do we generally handle multiple incoming requests to avoid them
# trompling over each other via race conditions such as here?
await self.db.execute("DELETE FROM data_rows WHERE row_hash == ?", (row_hash,))
await self.db.execute(
"INSERT INTO actions (row_hash, operation) VALUES(?, ?)",
(table_row.hash, OperationType.DELETE),
"DELETE FROM table_values WHERE table_id == :table_id AND key == :key",
{"table_id": table, "key": row_hash},
)
await self.add_action(operation_type=OperationType.DELETE, key=row_hash, table=table)
await self.db.commit()
return table_row
@ -245,9 +305,9 @@ class DataStore:
# TODO: What needs to be done to retain proper ordering, relates to the question
# at the table creation as well.
cursor = await self.db.execute(
"SELECT actions.operation, raw_rows.clvm_object"
" FROM actions INNER JOIN raw_rows"
" WHERE actions.row_hash == raw_rows.row_hash"
"SELECT actions.operation, keys_values.value"
" FROM actions INNER JOIN keys_values"
" WHERE actions.key == keys_values.key"
)
actions = await cursor.fetchall()

View File

@ -11,7 +11,7 @@ import pytest
# from chia.consensus.blockchain import Blockchain
from chia.data_layer.data_store import Action, DataStore, OperationType, TableRow
from chia.types.blockchain_format.tree_hash import sha256_treehash
from chia.types.blockchain_format.tree_hash import bytes32, sha256_treehash
# from chia.full_node.block_store import BlockStore
# from chia.full_node.coin_store import CoinStore
@ -34,6 +34,8 @@ log = logging.getLogger(__name__)
# yield connection
async def db_connection_fixture() -> AsyncIterable[aiosqlite.Connection]:
async with aiosqlite.connect(":memory:") as connection:
# TODO: consider this... should it be used at runtime?
await connection.execute("PRAGMA foreign_keys = ON")
yield connection
@ -48,9 +50,18 @@ def db_fixture(db_wrapper: DBWrapper) -> aiosqlite.Connection:
return db_wrapper.db
@pytest.fixture(name="table_id", scope="function")
def table_id_fixture() -> bytes32:
base = b"a table id"
pad = b"." * (32 - len(base))
return bytes32(pad + base)
@pytest.fixture(name="data_store", scope="function")
async def data_store_fixture(db_wrapper: DBWrapper) -> DataStore:
return await DataStore.create(db_wrapper=db_wrapper)
async def data_store_fixture(db_wrapper: DBWrapper, table_id: bytes32) -> DataStore:
data_store = await DataStore.create(db_wrapper=db_wrapper)
await data_store.create_table(id=table_id, name="A Table")
return data_store
clvm_objects = [
@ -86,10 +97,11 @@ clvm_objects = [
table_columns: Dict[str, List[str]] = {
"raw_rows": ["row_hash", "table_id", "clvm_object"],
"data_rows": ["row_hash"],
"actions": ["row_hash", "operation"],
"commits": ["changelist_hash", "actions_index"],
"tables": ["id", "name"],
"keys_values": ["key", "value"],
"table_values": ["table_id", "key"],
"commits": ["id", "table_id", "state"],
"actions": ["commit_id", "idx", "operation", "key"],
}
@ -117,118 +129,109 @@ async def test_create_creates_tables_and_columns(
@pytest.mark.asyncio
async def test_get_row_by_hash_single_match(data_store: DataStore) -> None:
async def test_insert_with_invalid_table_fails(data_store: DataStore) -> None:
# TODO: If this API is retained then it should have a specific exception.
with pytest.raises(Exception):
await data_store.insert_row(table=b"non-existant table", clvm_object=clvm_objects[0])
@pytest.mark.asyncio
async def test_get_row_by_hash_single_match(data_store: DataStore, table_id: bytes32) -> None:
a_clvm_object, *_ = clvm_objects
await data_store.insert_row(table=b"", clvm_object=a_clvm_object)
await data_store.insert_row(table=table_id, clvm_object=a_clvm_object)
row_hash = sha256_treehash(SExp.to(a_clvm_object))
table_row = await data_store.get_row_by_hash(table=b"", row_hash=row_hash)
table_row = await data_store.get_row_by_hash(table=table_id, row_hash=row_hash)
assert table_row == TableRow.from_clvm_object(clvm_object=a_clvm_object)
@pytest.mark.asyncio
async def test_get_row_by_hash_no_match(data_store: DataStore) -> None:
async def test_get_row_by_hash_no_match(data_store: DataStore, table_id: bytes32) -> None:
a_clvm_object, another_clvm_object, *_ = clvm_objects
await data_store.insert_row(table=b"", clvm_object=a_clvm_object)
await data_store.insert_row(table=table_id, clvm_object=a_clvm_object)
other_row_hash = sha256_treehash(SExp.to(another_clvm_object))
# TODO: If this API is retained then it should have a specific exception.
with pytest.raises(Exception):
await data_store.get_row_by_hash(table=b"", row_hash=other_row_hash)
await data_store.get_row_by_hash(table=table_id, row_hash=other_row_hash)
def expected_keys(clvm_objects: List[CLVMObject]) -> Set[Tuple[bytes]]:
return {sha256_treehash(SExp.to(clvm_object)) for clvm_object in clvm_objects}
@pytest.mark.asyncio
async def test_insert_adds_to_raw_rows(data_store: DataStore) -> None:
a_clvm_object, *_ = clvm_objects
await data_store.insert_row(table=b"", clvm_object=a_clvm_object)
cursor = await data_store.db.execute("SELECT * FROM raw_rows")
# TODO: The runtime type is a list, maybe ask about adjusting the hint?
# https://github.com/omnilib/aiosqlite/blob/13d165656f73c3121001622253a532bdc90b2b91/aiosqlite/cursor.py#L63
raw_rows: List[object] = await cursor.fetchall() # type: ignore[assignment]
cursor = await data_store.db.execute("SELECT row_hash FROM data_rows")
# TODO: The runtime type is a list, maybe ask about adjusting the hint?
# https://github.com/omnilib/aiosqlite/blob/13d165656f73c3121001622253a532bdc90b2b91/aiosqlite/cursor.py#L63
data_rows: List[object] = await cursor.fetchall() # type: ignore[assignment]
assert [len(raw_rows), len(data_rows)] == [1, 1]
def expected_data_rows(clvm_objects: List[CLVMObject]) -> Set[Tuple[bytes]]:
return {(sha256_treehash(SExp.to(clvm_object)),) for clvm_object in clvm_objects}
@pytest.mark.asyncio
async def test_insert_does(data_store: DataStore) -> None:
async def test_insert_does(data_store: DataStore, table_id: bytes32) -> None:
a_clvm_object, another_clvm_object, *_ = clvm_objects
await data_store.insert_row(table=b"", clvm_object=a_clvm_object)
await data_store.insert_row(table=b"", clvm_object=another_clvm_object)
await data_store.insert_row(table=table_id, clvm_object=a_clvm_object)
await data_store.insert_row(table=table_id, clvm_object=another_clvm_object)
cursor = await data_store.db.execute("SELECT row_hash FROM data_rows")
data_rows: Set[Tuple[bytes]] = set(await cursor.fetchall()) # type: ignore[arg-type]
table_rows = await data_store.get_rows(table=table_id)
expected = expected_data_rows([a_clvm_object, another_clvm_object])
assert data_rows == expected
expected = [
TableRow.from_clvm_object(clvm_object=clvm_object) for clvm_object in [a_clvm_object, another_clvm_object]
]
assert table_rows == expected
@pytest.mark.asyncio
async def test_deletes_row_by_hash(data_store: DataStore) -> None:
async def test_deletes_row_by_hash(data_store: DataStore, table_id: bytes32) -> None:
a_clvm_object, another_clvm_object, *_ = clvm_objects
await data_store.insert_row(table=b"", clvm_object=a_clvm_object)
await data_store.insert_row(table=b"", clvm_object=another_clvm_object)
await data_store.delete_row_by_hash(table=b"", row_hash=sha256_treehash(SExp.to(a_clvm_object)))
await data_store.insert_row(table=table_id, clvm_object=a_clvm_object)
await data_store.insert_row(table=table_id, clvm_object=another_clvm_object)
await data_store.delete_row_by_hash(table=table_id, row_hash=sha256_treehash(SExp.to(a_clvm_object)))
cursor = await data_store.db.execute("SELECT row_hash FROM data_rows")
data_rows: Set[Tuple[bytes]] = set(await cursor.fetchall()) # type: ignore[arg-type]
table_rows = await data_store.get_rows(table=table_id)
expected = expected_data_rows([another_clvm_object])
assert data_rows == expected
expected = [TableRow.from_clvm_object(clvm_object=clvm_object) for clvm_object in [another_clvm_object]]
assert table_rows == expected
@pytest.mark.asyncio
async def test_get_all_actions_just_inserts(data_store: DataStore) -> None:
async def test_get_all_actions_just_inserts(data_store: DataStore, table_id: bytes32) -> None:
expected = []
await data_store.insert_row(table=b"", clvm_object=clvm_objects[0])
await data_store.insert_row(table=table_id, clvm_object=clvm_objects[0])
expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[0])))
await data_store.insert_row(table=b"", clvm_object=clvm_objects[1])
await data_store.insert_row(table=table_id, clvm_object=clvm_objects[1])
expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[1])))
await data_store.insert_row(table=b"", clvm_object=clvm_objects[2])
await data_store.insert_row(table=table_id, clvm_object=clvm_objects[2])
expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[2])))
await data_store.insert_row(table=b"", clvm_object=clvm_objects[3])
await data_store.insert_row(table=table_id, clvm_object=clvm_objects[3])
expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[3])))
all_actions = await data_store.get_all_actions(table=b"")
all_actions = await data_store.get_all_actions(table=table_id)
assert all_actions == expected
@pytest.mark.asyncio
async def test_get_all_actions_with_a_delete(data_store: DataStore) -> None:
async def test_get_all_actions_with_a_delete(data_store: DataStore, table_id: bytes32) -> None:
expected = []
await data_store.insert_row(table=b"", clvm_object=clvm_objects[0])
await data_store.insert_row(table=table_id, clvm_object=clvm_objects[0])
expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[0])))
await data_store.insert_row(table=b"", clvm_object=clvm_objects[1])
await data_store.insert_row(table=table_id, clvm_object=clvm_objects[1])
expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[1])))
await data_store.insert_row(table=b"", clvm_object=clvm_objects[2])
await data_store.insert_row(table=table_id, clvm_object=clvm_objects[2])
expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[2])))
# note this is a delete
await data_store.delete_row_by_hash(table=b"", row_hash=sha256_treehash(sexp=clvm_objects[1]))
await data_store.delete_row_by_hash(table=table_id, row_hash=sha256_treehash(sexp=clvm_objects[1]))
expected.append(Action(op=OperationType.DELETE, row=TableRow.from_clvm_object(clvm_object=clvm_objects[1])))
await data_store.insert_row(table=b"", clvm_object=clvm_objects[3])
await data_store.insert_row(table=table_id, clvm_object=clvm_objects[3])
expected.append(Action(op=OperationType.INSERT, row=TableRow.from_clvm_object(clvm_object=clvm_objects[3])))
all_actions = await data_store.get_all_actions(table=b"")
all_actions = await data_store.get_all_actions(table=table_id)
assert all_actions == expected