Merge branch 'master' into rpc/more-examples

This commit is contained in:
Matthew Yacavone 2021-04-27 15:55:43 -04:00
commit fe62c14735
35 changed files with 746 additions and 606 deletions

5
.github/ci.sh vendored
View File

@ -134,10 +134,7 @@ install_system_deps() {
test_dist() {
setup_dist_bins
setup_external_tools
if $IS_WIN; then
echo "Warning: janky hacky workaround to #764"
sed -i 's!/!\\!g' tests/modsys/T14.icry.stdout
fi
echo "test-runner version: $($BIN/test-runner --version)"
$BIN/test-runner --ext=.icry -F -b --exe=dist/bin/cryptol tests
}

View File

@ -183,7 +183,7 @@ constraints: any.Cabal ==3.2.0.0,
any.test-framework ==0.8.2.0,
any.test-framework-hunit ==0.3.0.2,
test-framework-hunit -base3 +base4,
any.test-lib ==0.2.2,
any.test-lib ==0.3,
any.text ==1.2.4.1,
any.text-short ==0.1.3,
text-short -asserts,

View File

@ -223,7 +223,7 @@ constraints: any.Cabal ==3.2.1.0,
any.test-framework ==0.8.2.0,
any.test-framework-hunit ==0.3.0.2,
test-framework-hunit -base3 +base4,
any.test-lib ==0.2.2,
any.test-lib ==0.3,
any.text ==1.2.4.1,
any.text-short ==0.1.3,
text-short -asserts,

View File

@ -184,7 +184,7 @@ constraints: any.Cabal ==2.4.0.1,
any.test-framework ==0.8.2.0,
any.test-framework-hunit ==0.3.0.2,
test-framework-hunit -base3 +base4,
any.test-lib ==0.2.1,
any.test-lib ==0.3,
any.text ==1.2.4.0,
any.tf-random ==0.5,
any.th-abstraction ==0.3.2.0,

View File

@ -181,7 +181,7 @@ constraints: any.Cabal ==3.0.1.0,
any.test-framework ==0.8.2.0,
any.test-framework-hunit ==0.3.0.2,
test-framework-hunit -base3 +base4,
any.test-lib ==0.2.1,
any.test-lib ==0.3,
any.text ==1.2.4.0,
any.tf-random ==0.5,
any.th-abstraction ==0.3.2.0,

View File

@ -16,599 +16,17 @@ from argo_client.connection import DynamicSocketProcess, ServerConnection, Serve
from . import cryptoltypes
from . import solver
from .bitvector import BV
from .commands import *
from .connection import *
__all__ = ['cryptoltypes', 'solver']
__all__ = ['bitvector', 'commands', 'connections', 'cryptoltypes', 'solver']
def extend_hex(string : str) -> str:
if len(string) % 2 == 1:
return '0' + string
else:
return string
def fail_with(x : Exception) -> NoReturn:
"Raise an exception. This is valid in expression positions."
raise x
def from_cryptol_arg(val : Any) -> Any:
"""Return the canonical Python value for a Cryptol JSON value."""
if isinstance(val, bool):
return val
elif isinstance(val, int):
return val
elif 'expression' in val.keys():
tag = val['expression']
if tag == 'unit':
return ()
elif tag == 'tuple':
return tuple(from_cryptol_arg(x) for x in val['data'])
elif tag == 'record':
return {k : from_cryptol_arg(val[k]) for k in val['data']}
elif tag == 'sequence':
return [from_cryptol_arg(v) for v in val['data']]
elif tag == 'bits':
enc = val['encoding']
size = val['width']
if enc == 'base64':
n = int.from_bytes(
base64.b64decode(val['data'].encode('ascii')),
byteorder='big')
elif enc == 'hex':
n = int.from_bytes(
bytes.fromhex(extend_hex(val['data'])),
byteorder='big')
else:
raise ValueError("Unknown encoding " + str(enc))
return BV(size, n)
else:
raise ValueError("Unknown expression tag " + tag)
else:
raise TypeError("Unsupported value " + str(val))
class CryptolLoadModule(argo.Command):
def __init__(self, connection : HasProtocolState, mod_name : str) -> None:
super(CryptolLoadModule, self).__init__('load module', {'module name': mod_name}, connection)
def process_result(self, res : Any) -> Any:
return res
class CryptolLoadFile(argo.Command):
def __init__(self, connection : HasProtocolState, filename : str) -> None:
super(CryptolLoadFile, self).__init__('load file', {'file': filename}, connection)
def process_result(self, res : Any) -> Any:
return res
class CryptolExtendSearchPath(argo.Command):
def __init__(self, connection : HasProtocolState, dirs : List[str]) -> None:
super(CryptolExtendSearchPath, self).__init__('extend search path', {'paths': dirs}, connection)
def process_result(self, res : Any) -> Any:
return res
class CryptolEvalExpr(argo.Command):
def __init__(self, connection : HasProtocolState, expr : Any) -> None:
super(CryptolEvalExpr, self).__init__(
'evaluate expression',
{'expression': expr},
connection
)
def process_result(self, res : Any) -> Any:
return from_cryptol_arg(res['value'])
class CryptolCall(argo.Command):
def __init__(self, connection : HasProtocolState, fun : str, args : List[Any]) -> None:
super(CryptolCall, self).__init__(
'call',
{'function': fun, 'arguments': args},
connection
)
def process_result(self, res : Any) -> Any:
return from_cryptol_arg(res['value'])
@dataclass
class CheckReport:
"""Class for describing ``check`` test results."""
success: bool
args: List[Any]
error_msg: Optional[str]
tests_run: int
tests_possible: Optional[int]
class CryptolCheck(argo.Command):
def __init__(self, connection : HasProtocolState, expr : Any, num_tests : Union[Literal['all'],int, None]) -> None:
if num_tests:
args = {'expression': expr, 'number of tests':num_tests}
else:
args = {'expression': expr}
super(CryptolCheck, self).__init__(
'check',
args,
connection
)
def process_result(self, res : Any) -> 'CheckReport':
if res['result'] == 'pass':
return CheckReport(
success=True,
args=[],
error_msg = None,
tests_run=res['tests run'],
tests_possible=res['tests possible'])
elif res['result'] == 'fail':
return CheckReport(
success=False,
args=[from_cryptol_arg(arg['expr']) for arg in res['arguments']],
error_msg = None,
tests_run=res['tests run'],
tests_possible=res['tests possible'])
elif res['result'] == 'error':
return CheckReport(
success=False,
args=[from_cryptol_arg(arg['expr']) for arg in res['arguments']],
error_msg = res['error message'],
tests_run=res['tests run'],
tests_possible=res['tests possible'])
else:
raise ValueError("Unknown check result " + str(res))
class CryptolCheckType(argo.Command):
def __init__(self, connection : HasProtocolState, expr : Any) -> None:
super(CryptolCheckType, self).__init__(
'check type',
{'expression': expr},
connection
)
def process_result(self, res : Any) -> Any:
return res['type schema']
class SmtQueryType(str, Enum):
PROVE = 'prove'
SAFE = 'safe'
SAT = 'sat'
class CryptolProveSat(argo.Command):
def __init__(self, connection : HasProtocolState, qtype : SmtQueryType, expr : Any, solver : solver.Solver, count : Optional[int]) -> None:
super(CryptolProveSat, self).__init__(
'prove or satisfy',
{'query type': qtype,
'expression': expr,
'prover': solver,
'result count': 'all' if count is None else count},
connection
)
self.qtype = qtype
def process_result(self, res : Any) -> Any:
if res['result'] == 'unsatisfiable':
if self.qtype == SmtQueryType.SAT:
return False
elif self.qtype == SmtQueryType.PROVE or self.qtype == SmtQueryType.SAFE:
return True
else:
raise ValueError("Unknown SMT query type: " + self.qtype)
elif res['result'] == 'invalid':
return [from_cryptol_arg(arg['expr'])
for arg in res['counterexample']]
elif res['result'] == 'satisfied':
return [from_cryptol_arg(arg['expr'])
for m in res['models']
for arg in m]
else:
raise ValueError("Unknown SMT result: " + str(res))
class CryptolProve(CryptolProveSat):
def __init__(self, connection : HasProtocolState, expr : Any, solver : solver.Solver) -> None:
super(CryptolProve, self).__init__(connection, SmtQueryType.PROVE, expr, solver, 1)
class CryptolSat(CryptolProveSat):
def __init__(self, connection : HasProtocolState, expr : Any, solver : solver.Solver, count : int) -> None:
super(CryptolSat, self).__init__(connection, SmtQueryType.SAT, expr, solver, count)
class CryptolSafe(CryptolProveSat):
def __init__(self, connection : HasProtocolState, expr : Any, solver : solver.Solver) -> None:
super(CryptolSafe, self).__init__(connection, SmtQueryType.SAFE, expr, solver, 1)
class CryptolNames(argo.Command):
def __init__(self, connection : HasProtocolState) -> None:
super(CryptolNames, self).__init__('visible names', {}, connection)
def process_result(self, res : Any) -> Any:
return res
class CryptolFocusedModule(argo.Command):
def __init__(self, connection : HasProtocolState) -> None:
super(CryptolFocusedModule, self).__init__(
'focused module',
{},
connection
)
def process_result(self, res : Any) -> Any:
return res
class CryptolReset(argo.Notification):
def __init__(self, connection : HasProtocolState) -> None:
super(CryptolReset, self).__init__(
'clear state',
{'state to clear': connection.protocol_state()},
connection
)
class CryptolResetServer(argo.Notification):
def __init__(self, connection : HasProtocolState) -> None:
super(CryptolResetServer, self).__init__(
'clear all states',
{},
connection
)
def connect(command : Optional[str]=None,
*,
cryptol_path : Optional[str] = None,
url : Optional[str] = None,
reset_server : bool = False) -> CryptolConnection:
"""
Connect to a (possibly new) Cryptol server process.
:param command: A command to launch a new Cryptol server in socket mode (if provided).
:param cryptol_path: A replacement for the contents of
the ``CRYPTOLPATH`` environment variable (if provided).
:param url: A URL at which to connect to an already running Cryptol
HTTP server.
:param reset_server: If ``True``, the server that is connected to will be
reset. (This ensures any states from previous server usages have been
cleared.)
If no ``command`` or ``url`` parameters are provided, the following are attempted in order:
1. If the environment variable ``CRYPTOL_SERVER`` is set and referse to an executable,
it is assumed to be a Cryptol server and will be used for a new ``socket`` connection.
2. If the environment variable ``CRYPTOL_SERVER_URL`` is set, it is assumed to be
the URL for a running Cryptol server in ``http`` mode and will be connected to.
3. If an executable ``cryptol-remote-api`` is available on the ``PATH``
it is assumed to be a Cryptol server and will be used for a new ``socket`` connection.
"""
c = None
if command is not None:
if url is not None:
raise ValueError("A Cryptol server URL cannot be specified with a command currently.")
c = CryptolConnection(command, cryptol_path)
elif url is not None:
c = CryptolConnection(ServerConnection(HttpProcess(url)), cryptol_path)
elif (command := os.getenv('CRYPTOL_SERVER')) is not None and (command := find_executable(command)) is not None:
c = CryptolConnection(command+" socket", cryptol_path=cryptol_path)
elif (url := os.getenv('CRYPTOL_SERVER_URL')) is not None:
c = CryptolConnection(ServerConnection(HttpProcess(url)), cryptol_path)
elif (command := find_executable('cryptol-remote-api')) is not None:
c = CryptolConnection(command+" socket", cryptol_path=cryptol_path)
else:
raise ValueError(
"""cryptol.connect requires one of the following:",
1) a command to launch a cryptol server is the first positional argument,
2) a URL to connect to a running cryptol server is provided via the `url` keyword argument,
3) the environment variable `CRYPTOL_SERVER` must refer to a valid server executable, or
4) the environment variable `CRYPTOL_SERVER_URL` must refer to the URL of a running cryptol server.""")
if reset_server:
CryptolResetServer(c)
return c
def connect_stdio(command : str, cryptol_path : Optional[str] = None) -> CryptolConnection:
"""Start a new connection to a new Cryptol server process.
:param command: The command to launch the Cryptol server.
:param cryptol_path: An optional replacement for the contents of
the ``CRYPTOLPATH`` environment variable.
"""
conn = CryptolStdIOProcess(command, cryptol_path=cryptol_path)
return CryptolConnection(conn)
class CryptolConnection:
"""Instances of ``CryptolConnection`` represent a particular point of
time in an interaction with Cryptol. Issuing a command through a
``CryptolConnection`` causes it to change its state into one in
which the command has been carried out.
``CryptolConnection`` is in the middle of the abstraction
hierarchy. It relieves users from thinking about explicitly
encoding state and protocol messages, but it provides a
non-blocking interface in which answers from Cryptol must be
explicitly requested. Note that blocking may occur in the case of
sequential state dependencies between commands.
"""
most_recent_result : Optional[argo.Interaction]
proc: ServerProcess
def __init__(self,
command_or_connection : Union[str, ServerConnection, ServerProcess],
cryptol_path : Optional[str] = None) -> None:
self.most_recent_result = None
if isinstance(command_or_connection, ServerProcess):
self.proc = command_or_connection
self.server_connection = ServerConnection(self.proc)
elif isinstance(command_or_connection, str):
self.proc = CryptolDynamicSocketProcess(command_or_connection, cryptol_path=cryptol_path)
self.server_connection = ServerConnection(self.proc)
else:
self.server_connection = command_or_connection
# Currently disabled in (overly?) conservative effort to not accidentally
# duplicate and share mutable state.
# def snapshot(self) -> CryptolConnection:
# """Create a lightweight snapshot of this connection. The snapshot
# shares the underlying server process, but can have different
# application state.
# """
# copy = CryptolConnection(self.server_connection)
# copy.most_recent_result = self.most_recent_result
# return copy
def protocol_state(self) -> Any:
if self.most_recent_result is None:
return None
else:
return self.most_recent_result.state()
# Protocol messages
def load_file(self, filename : str) -> argo.Command:
"""Load a filename as a Cryptol module, like ``:load`` at the Cryptol
REPL.
"""
self.most_recent_result = CryptolLoadFile(self, filename)
return self.most_recent_result
def load_module(self, module_name : str) -> argo.Command:
"""Load a Cryptol module, like ``:module`` at the Cryptol REPL."""
self.most_recent_result = CryptolLoadModule(self, module_name)
return self.most_recent_result
def eval(self, expression : Any) -> argo.Command:
"""Evaluate a Cryptol expression, represented according to
:ref:`cryptol-json-expression`, with Python datatypes standing
for their JSON equivalents.
"""
self.most_recent_result = CryptolEvalExpr(self, expression)
return self.most_recent_result
def evaluate_expression(self, expression : Any) -> argo.Command:
"""Synonym for member method ``eval``.
"""
return self.eval(expression)
def extend_search_path(self, *dir : str) -> argo.Command:
"""Load a Cryptol module, like ``:module`` at the Cryptol REPL."""
self.most_recent_result = CryptolExtendSearchPath(self, list(dir))
return self.most_recent_result
def call(self, fun : str, *args : List[Any]) -> argo.Command:
encoded_args = [cryptoltypes.CryptolType().from_python(a) for a in args]
self.most_recent_result = CryptolCall(self, fun, encoded_args)
return self.most_recent_result
def check(self, expr : Any, *, num_tests : Union[Literal['all'], int, None] = None) -> argo.Command:
"""Tests the validity of a Cryptol expression with random inputs. The expression must be a function with
return type ``Bit``.
If ``num_tests`` is ``"all"`` then the expression is tested exhaustively (i.e., against all possible inputs).
If ``num_tests`` is omitted, Cryptol defaults to running 100 tests.
"""
if num_tests == "all" or isinstance(num_tests, int) or num_tests is None:
self.most_recent_result = CryptolCheck(self, expr, num_tests)
return self.most_recent_result
else:
raise ValueError('``num_tests`` must be an integer, ``None``, or the string literall ``"all"``')
def check_type(self, code : Any) -> argo.Command:
"""Check the type of a Cryptol expression, represented according to
:ref:`cryptol-json-expression`, with Python datatypes standing for
their JSON equivalents.
"""
self.most_recent_result = CryptolCheckType(self, code)
return self.most_recent_result
def sat(self, expr : Any, solver : solver.Solver = solver.Z3, count : int = 1) -> argo.Command:
"""Check the satisfiability of a Cryptol expression, represented according to
:ref:`cryptol-json-expression`, with Python datatypes standing for
their JSON equivalents. Use the solver named `solver`, and return up to
`count` solutions.
"""
self.most_recent_result = CryptolSat(self, expr, solver, count)
return self.most_recent_result
def prove(self, expr : Any, solver : solver.Solver = solver.Z3) -> argo.Command:
"""Check the validity of a Cryptol expression, represented according to
:ref:`cryptol-json-expression`, with Python datatypes standing for
their JSON equivalents. Use the solver named `solver`.
"""
self.most_recent_result = CryptolProve(self, expr, solver)
return self.most_recent_result
def safe(self, expr : Any, solver : solver.Solver = solver.Z3) -> argo.Command:
"""Check via an external SMT solver that the given term is safe for all inputs,
which means it cannot encounter a run-time error.
"""
self.most_recent_result = CryptolSafe(self, expr, solver)
return self.most_recent_result
def names(self) -> argo.Command:
"""Discover the list of names currently in scope in the current context."""
self.most_recent_result = CryptolNames(self)
return self.most_recent_result
def focused_module(self) -> argo.Command:
"""Return the name of the currently-focused module."""
self.most_recent_result = CryptolFocusedModule(self)
return self.most_recent_result
def reset(self) -> None:
"""Resets the connection, causing its unique state on the server to be freed (if applicable).
After a reset a connection may be treated as if it were a fresh connection with the server if desired."""
CryptolReset(self)
self.most_recent_result = None
def reset_server(self) -> None:
"""Resets the Cryptol server, clearing all states."""
CryptolResetServer(self)
self.most_recent_result = None
def __del__(self) -> None:
# when being deleted, ensure we don't have a lingering state on the server
if self.most_recent_result is not None:
try:
CryptolReset(self)
except Exception:
pass
class CryptolDynamicSocketProcess(DynamicSocketProcess):
def __init__(self, command: str, *,
persist: bool=False,
cryptol_path: Optional[str]=None):
environment = os.environ.copy()
if cryptol_path is not None:
environment["CRYPTOLPATH"] = str(cryptol_path)
super().__init__(command, persist=persist, environment=environment)
class CryptolStdIOProcess(StdIOProcess):
def __init__(self, command: str, *,
cryptol_path: Optional[str]=None):
environment = os.environ.copy()
if cryptol_path is not None:
environment["CRYPTOLPATH"] = str(cryptol_path)
super().__init__(command, environment=environment)
def cry(string : str) -> cryptoltypes.CryptolCode:
return cryptoltypes.CryptolLiteral(string)
# The below code relies on `connection.snapshot()` which duplicates
# a connection and points to the same underlying server state. We
# should come back to this and reevaluate if/how to do this, given
# that some of our servers have varying degrees of mutable state.
# class CryptolFunctionHandle:
# def __init__(self,
# connection : CryptolConnection,
# name : str,
# ty : Any,
# schema : Any,
# docs : Optional[str] = None) -> None:
# self.connection = connection.snapshot()
# self.name = name
# self.ty = ty
# self.schema = schema
# self.docs = docs
# self.__doc__ = "Cryptol type: " + ty
# if self.docs is not None and self.__doc__ is not None:
# self.__doc__ += "\n" + self.docs
# def __call__(self, *args : List[Any]) -> Any:
# current_type = self.schema
# remaining_args = args
# arg_types = cryptoltypes.argument_types(current_type)
# Call = TypedDict('Call', {'expression': str, 'function': str, 'arguments': List[Any]})
# current_expr : Union[str, Call]
# current_expr = self.name
# found_args = []
# while len(arg_types) > 0 and len(remaining_args) > 0:
# found_args.append(arg_types[0].from_python(remaining_args[0]))
# current_expr = {'expression': 'call', 'function': self.name, 'arguments': found_args}
# ty = self.connection.check_type(current_expr).result()
# current_type = cryptoltypes.to_schema(ty)
# arg_types = cryptoltypes.argument_types(current_type)
# remaining_args = remaining_args[1:]
# return from_cryptol_arg(self.connection.evaluate_expression(current_expr).result()['value'])
# class CryptolModule(types.ModuleType):
# def __init__(self, connection : CryptolConnection) -> None:
# self.connection = connection.snapshot()
# name = connection.focused_module().result()
# if name["module"] is None:
# raise ValueError("Provided connection is not in a module")
# super(CryptolModule, self).__init__(name["module"])
# for x in self.connection.names().result():
# if 'documentation' in x:
# setattr(self, x['name'],
# CryptolFunctionHandle(self.connection,
# x['name'],
# x['type string'],
# cryptoltypes.to_schema(x['type']),
# x['documentation']))
# else:
# setattr(self, x['name'],
# CryptolFunctionHandle(self.connection,
# x['name'],
# x['type string'],
# cryptoltypes.to_schema(x['type'])))
# def add_cryptol_module(name : str, connection : CryptolConnection) -> None:
# """Given a name for a Python module and a Cryptol connection,
# establish a Python module with the given name in which all the
# Cryptol names are in scope as Python proxy objects.
# """
# sys.modules[name] = CryptolModule(connection)
# class CryptolContext:
# _defined : Dict[str, CryptolFunctionHandle]
# def __init__(self, connection : CryptolConnection) -> None:
# self.connection = connection.snapshot()
# self._defined = {}
# for x in self.connection.names().result():
# if 'documentation' in x:
# self._defined[x['name']] = \
# CryptolFunctionHandle(self.connection,
# x['name'],
# x['type string'],
# cryptoltypes.to_schema(x['type']),
# x['documentation'])
# else:
# self._defined[x['name']] = \
# CryptolFunctionHandle(self.connection,
# x['name'],
# x['type string'],
# cryptoltypes.to_schema(x['type']))
# def __dir__(self) -> Iterable[str]:
# return self._defined.keys()
# def __getattr__(self, name : str) -> Any:
# if name in self._defined:
# return self._defined[name]
# else:
# raise AttributeError()

View File

@ -0,0 +1,240 @@
from __future__ import annotations
import base64
from enum import Enum
from dataclasses import dataclass
from typing import Any, List, Optional, Union
from typing_extensions import Literal
import argo_client.interaction as argo
from argo_client.interaction import HasProtocolState
from . import solver
from .bitvector import BV
def extend_hex(string : str) -> str:
if len(string) % 2 == 1:
return '0' + string
else:
return string
def from_cryptol_arg(val : Any) -> Any:
"""Return the canonical Python value for a Cryptol JSON value."""
if isinstance(val, bool):
return val
elif isinstance(val, int):
return val
elif 'expression' in val.keys():
tag = val['expression']
if tag == 'unit':
return ()
elif tag == 'tuple':
return tuple(from_cryptol_arg(x) for x in val['data'])
elif tag == 'record':
return {k : from_cryptol_arg(val[k]) for k in val['data']}
elif tag == 'sequence':
return [from_cryptol_arg(v) for v in val['data']]
elif tag == 'bits':
enc = val['encoding']
size = val['width']
if enc == 'base64':
n = int.from_bytes(
base64.b64decode(val['data'].encode('ascii')),
byteorder='big')
elif enc == 'hex':
n = int.from_bytes(
bytes.fromhex(extend_hex(val['data'])),
byteorder='big')
else:
raise ValueError("Unknown encoding " + str(enc))
return BV(size, n)
else:
raise ValueError("Unknown expression tag " + tag)
else:
raise TypeError("Unsupported value " + str(val))
class CryptolLoadModule(argo.Command):
def __init__(self, connection : HasProtocolState, mod_name : str) -> None:
super(CryptolLoadModule, self).__init__('load module', {'module name': mod_name}, connection)
def process_result(self, res : Any) -> Any:
return res
class CryptolLoadFile(argo.Command):
def __init__(self, connection : HasProtocolState, filename : str) -> None:
super(CryptolLoadFile, self).__init__('load file', {'file': filename}, connection)
def process_result(self, res : Any) -> Any:
return res
class CryptolExtendSearchPath(argo.Command):
def __init__(self, connection : HasProtocolState, dirs : List[str]) -> None:
super(CryptolExtendSearchPath, self).__init__('extend search path', {'paths': dirs}, connection)
def process_result(self, res : Any) -> Any:
return res
class CryptolEvalExpr(argo.Command):
def __init__(self, connection : HasProtocolState, expr : Any) -> None:
super(CryptolEvalExpr, self).__init__(
'evaluate expression',
{'expression': expr},
connection
)
def process_result(self, res : Any) -> Any:
return from_cryptol_arg(res['value'])
class CryptolCall(argo.Command):
def __init__(self, connection : HasProtocolState, fun : str, args : List[Any]) -> None:
super(CryptolCall, self).__init__(
'call',
{'function': fun, 'arguments': args},
connection
)
def process_result(self, res : Any) -> Any:
return from_cryptol_arg(res['value'])
@dataclass
class CheckReport:
"""Class for describing ``check`` test results."""
success: bool
args: List[Any]
error_msg: Optional[str]
tests_run: int
tests_possible: Optional[int]
class CryptolCheck(argo.Command):
def __init__(self, connection : HasProtocolState, expr : Any, num_tests : Union[Literal['all'],int, None]) -> None:
if num_tests:
args = {'expression': expr, 'number of tests':num_tests}
else:
args = {'expression': expr}
super(CryptolCheck, self).__init__(
'check',
args,
connection
)
def process_result(self, res : Any) -> 'CheckReport':
if res['result'] == 'pass':
return CheckReport(
success=True,
args=[],
error_msg = None,
tests_run=res['tests run'],
tests_possible=res['tests possible'])
elif res['result'] == 'fail':
return CheckReport(
success=False,
args=[from_cryptol_arg(arg['expr']) for arg in res['arguments']],
error_msg = None,
tests_run=res['tests run'],
tests_possible=res['tests possible'])
elif res['result'] == 'error':
return CheckReport(
success=False,
args=[from_cryptol_arg(arg['expr']) for arg in res['arguments']],
error_msg = res['error message'],
tests_run=res['tests run'],
tests_possible=res['tests possible'])
else:
raise ValueError("Unknown check result " + str(res))
class CryptolCheckType(argo.Command):
def __init__(self, connection : HasProtocolState, expr : Any) -> None:
super(CryptolCheckType, self).__init__(
'check type',
{'expression': expr},
connection
)
def process_result(self, res : Any) -> Any:
return res['type schema']
class SmtQueryType(str, Enum):
PROVE = 'prove'
SAFE = 'safe'
SAT = 'sat'
class CryptolProveSat(argo.Command):
def __init__(self, connection : HasProtocolState, qtype : SmtQueryType, expr : Any, solver : solver.Solver, count : Optional[int]) -> None:
super(CryptolProveSat, self).__init__(
'prove or satisfy',
{'query type': qtype,
'expression': expr,
'prover': solver,
'result count': 'all' if count is None else count},
connection
)
self.qtype = qtype
def process_result(self, res : Any) -> Any:
if res['result'] == 'unsatisfiable':
if self.qtype == SmtQueryType.SAT:
return False
elif self.qtype == SmtQueryType.PROVE or self.qtype == SmtQueryType.SAFE:
return True
else:
raise ValueError("Unknown SMT query type: " + self.qtype)
elif res['result'] == 'invalid':
return [from_cryptol_arg(arg['expr'])
for arg in res['counterexample']]
elif res['result'] == 'satisfied':
return [from_cryptol_arg(arg['expr'])
for m in res['models']
for arg in m]
else:
raise ValueError("Unknown SMT result: " + str(res))
class CryptolProve(CryptolProveSat):
def __init__(self, connection : HasProtocolState, expr : Any, solver : solver.Solver) -> None:
super(CryptolProve, self).__init__(connection, SmtQueryType.PROVE, expr, solver, 1)
class CryptolSat(CryptolProveSat):
def __init__(self, connection : HasProtocolState, expr : Any, solver : solver.Solver, count : int) -> None:
super(CryptolSat, self).__init__(connection, SmtQueryType.SAT, expr, solver, count)
class CryptolSafe(CryptolProveSat):
def __init__(self, connection : HasProtocolState, expr : Any, solver : solver.Solver) -> None:
super(CryptolSafe, self).__init__(connection, SmtQueryType.SAFE, expr, solver, 1)
class CryptolNames(argo.Command):
def __init__(self, connection : HasProtocolState) -> None:
super(CryptolNames, self).__init__('visible names', {}, connection)
def process_result(self, res : Any) -> Any:
return res
class CryptolFocusedModule(argo.Command):
def __init__(self, connection : HasProtocolState) -> None:
super(CryptolFocusedModule, self).__init__(
'focused module',
{},
connection
)
def process_result(self, res : Any) -> Any:
return res
class CryptolReset(argo.Notification):
def __init__(self, connection : HasProtocolState) -> None:
super(CryptolReset, self).__init__(
'clear state',
{'state to clear': connection.protocol_state()},
connection
)
class CryptolResetServer(argo.Notification):
def __init__(self, connection : HasProtocolState) -> None:
super(CryptolResetServer, self).__init__(
'clear all states',
{},
connection
)

View File

@ -0,0 +1,370 @@
from __future__ import annotations
import os
from distutils.spawn import find_executable
from typing import Any, List, Optional, Union
from typing_extensions import Literal
import argo_client.interaction as argo
from argo_client.connection import DynamicSocketProcess, ServerConnection, ServerProcess, StdIOProcess, HttpProcess
from . import cryptoltypes
from . import solver
from .commands import *
def connect(command : Optional[str]=None,
*,
cryptol_path : Optional[str] = None,
url : Optional[str] = None,
reset_server : bool = False) -> CryptolConnection:
"""
Connect to a (possibly new) Cryptol server process.
:param command: A command to launch a new Cryptol server in socket mode (if provided).
:param cryptol_path: A replacement for the contents of
the ``CRYPTOLPATH`` environment variable (if provided).
:param url: A URL at which to connect to an already running Cryptol
HTTP server.
:param reset_server: If ``True``, the server that is connected to will be
reset. (This ensures any states from previous server usages have been
cleared.)
If no ``command`` or ``url`` parameters are provided, the following are attempted in order:
1. If the environment variable ``CRYPTOL_SERVER`` is set and referse to an executable,
it is assumed to be a Cryptol server and will be used for a new ``socket`` connection.
2. If the environment variable ``CRYPTOL_SERVER_URL`` is set, it is assumed to be
the URL for a running Cryptol server in ``http`` mode and will be connected to.
3. If an executable ``cryptol-remote-api`` is available on the ``PATH``
it is assumed to be a Cryptol server and will be used for a new ``socket`` connection.
"""
c = None
if command is not None:
if url is not None:
raise ValueError("A Cryptol server URL cannot be specified with a command currently.")
c = CryptolConnection(command, cryptol_path)
elif url is not None:
c = CryptolConnection(ServerConnection(HttpProcess(url)), cryptol_path)
elif (command := os.getenv('CRYPTOL_SERVER')) is not None and (command := find_executable(command)) is not None:
c = CryptolConnection(command+" socket", cryptol_path=cryptol_path)
elif (url := os.getenv('CRYPTOL_SERVER_URL')) is not None:
c = CryptolConnection(ServerConnection(HttpProcess(url)), cryptol_path)
elif (command := find_executable('cryptol-remote-api')) is not None:
c = CryptolConnection(command+" socket", cryptol_path=cryptol_path)
else:
raise ValueError(
"""cryptol.connect requires one of the following:",
1) a command to launch a cryptol server is the first positional argument,
2) a URL to connect to a running cryptol server is provided via the `url` keyword argument,
3) the environment variable `CRYPTOL_SERVER` must refer to a valid server executable, or
4) the environment variable `CRYPTOL_SERVER_URL` must refer to the URL of a running cryptol server.""")
if reset_server:
CryptolResetServer(c)
return c
def connect_stdio(command : str, cryptol_path : Optional[str] = None) -> CryptolConnection:
"""Start a new connection to a new Cryptol server process.
:param command: The command to launch the Cryptol server.
:param cryptol_path: An optional replacement for the contents of
the ``CRYPTOLPATH`` environment variable.
"""
conn = CryptolStdIOProcess(command, cryptol_path=cryptol_path)
return CryptolConnection(conn)
class CryptolConnection:
"""Instances of ``CryptolConnection`` represent a particular point of
time in an interaction with Cryptol. Issuing a command through a
``CryptolConnection`` causes it to change its state into one in
which the command has been carried out.
``CryptolConnection`` is in the middle of the abstraction
hierarchy. It relieves users from thinking about explicitly
encoding state and protocol messages, but it provides a
non-blocking interface in which answers from Cryptol must be
explicitly requested. Note that blocking may occur in the case of
sequential state dependencies between commands.
"""
most_recent_result : Optional[argo.Interaction]
proc: ServerProcess
def __init__(self,
command_or_connection : Union[str, ServerConnection, ServerProcess],
cryptol_path : Optional[str] = None) -> None:
self.most_recent_result = None
if isinstance(command_or_connection, ServerProcess):
self.proc = command_or_connection
self.server_connection = ServerConnection(self.proc)
elif isinstance(command_or_connection, str):
self.proc = CryptolDynamicSocketProcess(command_or_connection, cryptol_path=cryptol_path)
self.server_connection = ServerConnection(self.proc)
else:
self.server_connection = command_or_connection
# Currently disabled in (overly?) conservative effort to not accidentally
# duplicate and share mutable state.
# def snapshot(self) -> CryptolConnection:
# """Create a lightweight snapshot of this connection. The snapshot
# shares the underlying server process, but can have different
# application state.
# """
# copy = CryptolConnection(self.server_connection)
# copy.most_recent_result = self.most_recent_result
# return copy
def protocol_state(self) -> Any:
if self.most_recent_result is None:
return None
else:
return self.most_recent_result.state()
# Protocol messages
def load_file(self, filename : str) -> argo.Command:
"""Load a filename as a Cryptol module, like ``:load`` at the Cryptol
REPL.
"""
self.most_recent_result = CryptolLoadFile(self, filename)
return self.most_recent_result
def load_module(self, module_name : str) -> argo.Command:
"""Load a Cryptol module, like ``:module`` at the Cryptol REPL."""
self.most_recent_result = CryptolLoadModule(self, module_name)
return self.most_recent_result
def eval(self, expression : Any) -> argo.Command:
"""Evaluate a Cryptol expression, represented according to
:ref:`cryptol-json-expression`, with Python datatypes standing
for their JSON equivalents.
"""
self.most_recent_result = CryptolEvalExpr(self, expression)
return self.most_recent_result
def evaluate_expression(self, expression : Any) -> argo.Command:
"""Synonym for member method ``eval``.
"""
return self.eval(expression)
def extend_search_path(self, *dir : str) -> argo.Command:
"""Load a Cryptol module, like ``:module`` at the Cryptol REPL."""
self.most_recent_result = CryptolExtendSearchPath(self, list(dir))
return self.most_recent_result
def call(self, fun : str, *args : List[Any]) -> argo.Command:
encoded_args = [cryptoltypes.CryptolType().from_python(a) for a in args]
self.most_recent_result = CryptolCall(self, fun, encoded_args)
return self.most_recent_result
def check(self, expr : Any, *, num_tests : Union[Literal['all'], int, None] = None) -> argo.Command:
"""Tests the validity of a Cryptol expression with random inputs. The expression must be a function with
return type ``Bit``.
If ``num_tests`` is ``"all"`` then the expression is tested exhaustively (i.e., against all possible inputs).
If ``num_tests`` is omitted, Cryptol defaults to running 100 tests.
"""
if num_tests == "all" or isinstance(num_tests, int) or num_tests is None:
self.most_recent_result = CryptolCheck(self, expr, num_tests)
return self.most_recent_result
else:
raise ValueError('``num_tests`` must be an integer, ``None``, or the string literall ``"all"``')
def check_type(self, code : Any) -> argo.Command:
"""Check the type of a Cryptol expression, represented according to
:ref:`cryptol-json-expression`, with Python datatypes standing for
their JSON equivalents.
"""
self.most_recent_result = CryptolCheckType(self, code)
return self.most_recent_result
def sat(self, expr : Any, solver : solver.Solver = solver.Z3, count : int = 1) -> argo.Command:
"""Check the satisfiability of a Cryptol expression, represented according to
:ref:`cryptol-json-expression`, with Python datatypes standing for
their JSON equivalents. Use the solver named `solver`, and return up to
`count` solutions.
"""
self.most_recent_result = CryptolSat(self, expr, solver, count)
return self.most_recent_result
def prove(self, expr : Any, solver : solver.Solver = solver.Z3) -> argo.Command:
"""Check the validity of a Cryptol expression, represented according to
:ref:`cryptol-json-expression`, with Python datatypes standing for
their JSON equivalents. Use the solver named `solver`.
"""
self.most_recent_result = CryptolProve(self, expr, solver)
return self.most_recent_result
def safe(self, expr : Any, solver : solver.Solver = solver.Z3) -> argo.Command:
"""Check via an external SMT solver that the given term is safe for all inputs,
which means it cannot encounter a run-time error.
"""
self.most_recent_result = CryptolSafe(self, expr, solver)
return self.most_recent_result
def names(self) -> argo.Command:
"""Discover the list of names currently in scope in the current context."""
self.most_recent_result = CryptolNames(self)
return self.most_recent_result
def focused_module(self) -> argo.Command:
"""Return the name of the currently-focused module."""
self.most_recent_result = CryptolFocusedModule(self)
return self.most_recent_result
def reset(self) -> None:
"""Resets the connection, causing its unique state on the server to be freed (if applicable).
After a reset a connection may be treated as if it were a fresh connection with the server if desired."""
CryptolReset(self)
self.most_recent_result = None
def reset_server(self) -> None:
"""Resets the Cryptol server, clearing all states."""
CryptolResetServer(self)
self.most_recent_result = None
def __del__(self) -> None:
# when being deleted, ensure we don't have a lingering state on the server
if self.most_recent_result is not None:
try:
CryptolReset(self)
except Exception:
pass
class CryptolDynamicSocketProcess(DynamicSocketProcess):
def __init__(self, command: str, *,
persist: bool=False,
cryptol_path: Optional[str]=None):
environment = os.environ.copy()
if cryptol_path is not None:
environment["CRYPTOLPATH"] = str(cryptol_path)
super().__init__(command, persist=persist, environment=environment)
class CryptolStdIOProcess(StdIOProcess):
def __init__(self, command: str, *,
cryptol_path: Optional[str]=None):
environment = os.environ.copy()
if cryptol_path is not None:
environment["CRYPTOLPATH"] = str(cryptol_path)
super().__init__(command, environment=environment)
# The below code relies on `connection.snapshot()` which duplicates
# a connection and points to the same underlying server state. We
# should come back to this and reevaluate if/how to do this, given
# that some of our servers have varying degrees of mutable state.
# class CryptolFunctionHandle:
# def __init__(self,
# connection : CryptolConnection,
# name : str,
# ty : Any,
# schema : Any,
# docs : Optional[str] = None) -> None:
# self.connection = connection.snapshot()
# self.name = name
# self.ty = ty
# self.schema = schema
# self.docs = docs
# self.__doc__ = "Cryptol type: " + ty
# if self.docs is not None and self.__doc__ is not None:
# self.__doc__ += "\n" + self.docs
# def __call__(self, *args : List[Any]) -> Any:
# current_type = self.schema
# remaining_args = args
# arg_types = cryptoltypes.argument_types(current_type)
# Call = TypedDict('Call', {'expression': str, 'function': str, 'arguments': List[Any]})
# current_expr : Union[str, Call]
# current_expr = self.name
# found_args = []
# while len(arg_types) > 0 and len(remaining_args) > 0:
# found_args.append(arg_types[0].from_python(remaining_args[0]))
# current_expr = {'expression': 'call', 'function': self.name, 'arguments': found_args}
# ty = self.connection.check_type(current_expr).result()
# current_type = cryptoltypes.to_schema(ty)
# arg_types = cryptoltypes.argument_types(current_type)
# remaining_args = remaining_args[1:]
# return from_cryptol_arg(self.connection.evaluate_expression(current_expr).result()['value'])
# class CryptolModule(types.ModuleType):
# def __init__(self, connection : CryptolConnection) -> None:
# self.connection = connection.snapshot()
# name = connection.focused_module().result()
# if name["module"] is None:
# raise ValueError("Provided connection is not in a module")
# super(CryptolModule, self).__init__(name["module"])
# for x in self.connection.names().result():
# if 'documentation' in x:
# setattr(self, x['name'],
# CryptolFunctionHandle(self.connection,
# x['name'],
# x['type string'],
# cryptoltypes.to_schema(x['type']),
# x['documentation']))
# else:
# setattr(self, x['name'],
# CryptolFunctionHandle(self.connection,
# x['name'],
# x['type string'],
# cryptoltypes.to_schema(x['type'])))
# def add_cryptol_module(name : str, connection : CryptolConnection) -> None:
# """Given a name for a Python module and a Cryptol connection,
# establish a Python module with the given name in which all the
# Cryptol names are in scope as Python proxy objects.
# """
# sys.modules[name] = CryptolModule(connection)
# class CryptolContext:
# _defined : Dict[str, CryptolFunctionHandle]
# def __init__(self, connection : CryptolConnection) -> None:
# self.connection = connection.snapshot()
# self._defined = {}
# for x in self.connection.names().result():
# if 'documentation' in x:
# self._defined[x['name']] = \
# CryptolFunctionHandle(self.connection,
# x['name'],
# x['type string'],
# cryptoltypes.to_schema(x['type']),
# x['documentation'])
# else:
# self._defined[x['name']] = \
# CryptolFunctionHandle(self.connection,
# x['name'],
# x['type string'],
# cryptoltypes.to_schema(x['type']))
# def __dir__(self) -> Iterable[str]:
# return self._defined.keys()
# def __getattr__(self, name : str) -> Any:
# if name in self._defined:
# return self._defined[name]
# else:
# raise AttributeError()

View File

@ -2,6 +2,11 @@
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
pushd $DIR
cabal v2-build exe:cryptol-remote-api
cabal v2-build exe:cryptol-eval-server
pushd $DIR/python
NUM_FAILS=0
@ -45,6 +50,8 @@ else
fi
popd
popd
if [ $NUM_FAILS -eq 0 ]
then
echo "All RPC tests passed"

View File

@ -16,6 +16,7 @@
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE OverloadedStrings #-}
module Cryptol.ModuleSystem.NamingEnv where
import Data.List (nub)
@ -106,6 +107,11 @@ merge :: [Name] -> [Name] -> [Name]
merge xs ys | xs == ys = xs
| otherwise = nub (xs ++ ys)
instance PP NamingEnv where
ppPrec _ (NamingEnv mps) = vcat $ map ppNS $ Map.toList mps
where ppNS (ns,xs) = pp ns $$ nest 2 (vcat (map ppNm (Map.toList xs)))
ppNm (x,as) = pp x <+> "->" <+> hsep (punctuate comma (map pp as))
-- | Generate a mapping from 'PrimIdent' to 'Name' for a
-- given naming environment.
toPrimMap :: NamingEnv -> PrimMap

View File

@ -161,8 +161,10 @@ renameModule' thisNested env mpath m =
allImps = openLoop allNested env openDs imps
(inScope,decls') <-
shadowNames allImps $
shadowNames' CheckNone allImps $
shadowNames' CheckOverlap env $
-- maybe we should allow for a warning
-- if a local name shadows an imported one?
do inScope <- getNamingEnv
ds <- renameTopDecls' (allNested,mpath) (mDecls m)
pure (inScope, ds)

View File

@ -250,10 +250,16 @@ checkEnv check (NamingEnv lenv) r rw0
where
newEnv = NamingEnv newMap
(rwFin,newMap) = Map.mapAccumWithKey doNS rw0 lenv
(rwFin,newMap) = Map.mapAccumWithKey doNS rw0 lenv -- lenv 1 ns at a time
doNS rw ns = Map.mapAccumWithKey (step ns) rw
step ns acc k xs = (acc', [head xs])
-- namespace, current state, k : parse name, xs : possible entities for k
step ns acc k xs = (acc', case check of
CheckNone -> xs
_ -> [head xs]
-- we've already reported an overlap error,
-- so resolve arbitrarily to the first entry
)
where
acc' = acc
{ rwWarnings =

View File

@ -16,7 +16,7 @@ flag static
executable cryptol-test-runner
Main-is: Main.hs
build-depends: base,filepath,test-lib
build-depends: base,filepath,test-lib >= 0.3
GHC-options: -Wall -O2
Default-language: Haskell2010

View File

@ -0,0 +1,4 @@
Loading module Cryptol
Parse error at .\T14\Main.cry:3:9,
unexpected: MalformedUtf8

1
tests/modsys/T15.icry Normal file
View File

@ -0,0 +1 @@
:module T15::B

View File

@ -0,0 +1,4 @@
Loading module Cryptol
Loading module Cryptol
Loading module T15::A
Loading module T15::B

5
tests/modsys/T15/A.cry Normal file
View File

@ -0,0 +1,5 @@
module T15::A where
update = 0x02

3
tests/modsys/T15/B.cry Normal file
View File

@ -0,0 +1,3 @@
module T15::B where
import T15::A

1
tests/modsys/T16.icry Normal file
View File

@ -0,0 +1 @@
:module T16::B

View File

@ -0,0 +1,9 @@
Loading module Cryptol
Loading module Cryptol
Loading module T16::A
Loading module T16::B
[error] at ./T16/B.cry:5:5--5:11
Multiple definitions for symbol: update
(at Cryptol:844:11--844:17, update)
(at ./T16/A.cry:3:1--3:7, T16::A::update)

View File

@ -0,0 +1,9 @@
Loading module Cryptol
Loading module Cryptol
Loading module T16::A
Loading module T16::B
[error] at .\T16\B.cry:5:5--5:11
Multiple definitions for symbol: update
(at Cryptol:844:11--844:17, update)
(at .\T16\A.cry:3:1--3:7, T16::A::update)

5
tests/modsys/T16/A.cry Normal file
View File

@ -0,0 +1,5 @@
module T16::A where
update = 0x02

5
tests/modsys/T16/B.cry Normal file
View File

@ -0,0 +1,5 @@
module T16::B where
import T16::A
f = update

1
tests/modsys/T17.icry Normal file
View File

@ -0,0 +1 @@
:module T17::B

View File

@ -0,0 +1,5 @@
Loading module Cryptol
Loading module Cryptol
Loading module T17::A
Loading module T17::A1
Loading module T17::B

5
tests/modsys/T17/A.cry Normal file
View File

@ -0,0 +1,5 @@
module T17::A where
u = 0x02

5
tests/modsys/T17/A1.cry Normal file
View File

@ -0,0 +1,5 @@
module T17::A1 where
u = 0x03

4
tests/modsys/T17/B.cry Normal file
View File

@ -0,0 +1,4 @@
module T17::B where
import T17::A
import T17::A1

1
tests/modsys/T18.icry Normal file
View File

@ -0,0 +1 @@
:module T18::B

View File

@ -0,0 +1,10 @@
Loading module Cryptol
Loading module Cryptol
Loading module T18::A
Loading module T18::A1
Loading module T18::B
[error] at ./T18/B.cry:6:5--6:6
Multiple definitions for symbol: u
(at ./T18/A.cry:3:1--3:2, T18::A::u)
(at ./T18/A1.cry:3:1--3:2, T18::A1::u)

View File

@ -0,0 +1,10 @@
Loading module Cryptol
Loading module Cryptol
Loading module T18::A
Loading module T18::A1
Loading module T18::B
[error] at .\T18\B.cry:6:5--6:6
Multiple definitions for symbol: u
(at .\T18\A.cry:3:1--3:2, T18::A::u)
(at .\T18\A1.cry:3:1--3:2, T18::A1::u)

5
tests/modsys/T18/A.cry Normal file
View File

@ -0,0 +1,5 @@
module T18::A where
u = 0x02

5
tests/modsys/T18/A1.cry Normal file
View File

@ -0,0 +1,5 @@
module T18::A1 where
u = 0x03

6
tests/modsys/T18/B.cry Normal file
View File

@ -0,0 +1,6 @@
module T18::B where
import T18::A
import T18::A1
f = u

View File

@ -1,15 +1,6 @@
Loading module Cryptol
Loading module Cryptol
Loading module T15
[warning] at T15.cry:5:13--5:14
This binding for `A` shadows the existing binding at
T15.cry:3:11--3:12
[warning] at T15.cry:7:15--7:16
This binding for `A` shadows the existing binding at
T15.cry:5:13--5:14
[warning] at T15.cry:7:15--7:16
This binding for `A::A` shadows the existing binding at
T15.cry:5:13--5:14
Showing a specific instance of polymorphic result:
* Using 'Integer' for 1st type argument of 'T15::A::A::y'
2