feat: timeout for cryptol python client methods (#1282)

interrupt and timeout methods for cryptol python client
This commit is contained in:
Andrew Kent 2021-09-21 20:23:28 -07:00 committed by GitHub
parent fe5aa6642a
commit e256346f83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 356 additions and 105 deletions

View File

@ -22,6 +22,8 @@ import CryptolServer.Check ( check, checkDescr )
import CryptolServer.ClearState
( clearState, clearStateDescr, clearAllStates, clearAllStatesDescr)
import Cryptol.Eval (EvalOpts(..), defaultPPOpts)
import CryptolServer.Interrupt
( interruptServer, interruptServerDescr )
import Cryptol.ModuleSystem (ModuleInput(..), loadModuleByPath, loadModuleByName)
import Cryptol.ModuleSystem.Monad (runModuleM, setFocusedModule)
import Cryptol.TypeCheck.AST (mName)
@ -166,4 +168,8 @@ cryptolEvalMethods =
"clear all states"
clearAllStatesDescr
clearAllStates
, notification
"interrupt"
interruptServerDescr
interruptServer
]

View File

@ -74,6 +74,7 @@ library
CryptolServer.ExtendSearchPath
CryptolServer.Exceptions
CryptolServer.FocusedModule
CryptolServer.Interrupt
CryptolServer.LoadModule
CryptolServer.Options
CryptolServer.Names

View File

@ -26,6 +26,8 @@ import CryptolServer.ExtendSearchPath
( extSearchPath, extSearchPathDescr )
import CryptolServer.FocusedModule
( focusedModule, focusedModuleDescr )
import CryptolServer.Interrupt
( interruptServer, interruptServerDescr )
import CryptolServer.LoadModule
( loadFile, loadFileDescr, loadModule, loadModuleDescr )
import CryptolServer.Names ( visibleNames, visibleNamesDescr )
@ -113,4 +115,8 @@ cryptolMethods =
"prove or satisfy"
proveSatDescr
proveSat
, notification
"interrupt"
interruptServerDescr
interruptServer
]

View File

@ -611,6 +611,24 @@ Return fields
interrupt (notification)
~~~~~~~~~~~~~~~~~~~~~~~~
Interrupt the server, terminating it's current task (if one exists).
Parameter fields
++++++++++++++++
No parameters
Return fields
+++++++++++++
No return fields

View File

@ -1,5 +1,14 @@
# Revision history for `cryptol` Python package
## 2.11.6 -- 2021-09-10
* Add a `timeout` field to the `CryptolConnection` class which can be used
to set a default timeout for all RPC interactions.
* Add an optional `timeout` keyword parameter to each `CryptolConnection` method
which can specify a timeout for a particular method call.
* Add an `interrupt` method to the `CryptolConnection` class which interrupts
any active requests on the server.
## 2.11.5 -- 2021-08-25
* From argo: Change the behavior of the `Command` `state` method so that after

View File

@ -62,33 +62,34 @@ def from_cryptol_arg(val : Any) -> CryptolValue:
class CryptolLoadModule(argo.Command):
def __init__(self, connection : HasProtocolState, mod_name : str) -> None:
super(CryptolLoadModule, self).__init__('load module', {'module name': mod_name}, connection)
def __init__(self, connection : HasProtocolState, mod_name : str, timeout: Optional[float]) -> None:
super(CryptolLoadModule, self).__init__('load module', {'module name': mod_name}, connection, timeout=timeout)
def process_result(self, res : Any) -> Any:
return res
class CryptolLoadFile(argo.Command):
def __init__(self, connection : HasProtocolState, filename : str) -> None:
super(CryptolLoadFile, self).__init__('load file', {'file': filename}, connection)
def __init__(self, connection : HasProtocolState, filename : str, timeout: Optional[float]) -> None:
super(CryptolLoadFile, self).__init__('load file', {'file': filename}, connection, timeout=timeout)
def process_result(self, res : Any) -> Any:
return res
class CryptolExtendSearchPath(argo.Command):
def __init__(self, connection : HasProtocolState, dirs : List[str]) -> None:
super(CryptolExtendSearchPath, self).__init__('extend search path', {'paths': dirs}, connection)
def __init__(self, connection : HasProtocolState, dirs : List[str], timeout: Optional[float]) -> None:
super(CryptolExtendSearchPath, self).__init__('extend search path', {'paths': dirs}, connection, timeout=timeout)
def process_result(self, res : Any) -> Any:
return res
class CryptolEvalExprRaw(argo.Command):
def __init__(self, connection : HasProtocolState, expr : Any) -> None:
def __init__(self, connection : HasProtocolState, expr : Any, timeout: Optional[float]) -> None:
super(CryptolEvalExprRaw, self).__init__(
'evaluate expression',
{'expression': expr},
connection
connection,
timeout=timeout
)
def process_result(self, res : Any) -> Any:
@ -100,11 +101,12 @@ class CryptolEvalExpr(CryptolEvalExprRaw):
class CryptolCallRaw(argo.Command):
def __init__(self, connection : HasProtocolState, fun : str, args : List[Any]) -> None:
def __init__(self, connection : HasProtocolState, fun : str, args : List[Any], timeout: Optional[float]) -> None:
super(CryptolCallRaw, self).__init__(
'call',
{'function': fun, 'arguments': args},
connection
connection,
timeout=timeout
)
def process_result(self, res : Any) -> Any:
@ -153,7 +155,10 @@ def to_check_report(res : Any) -> CheckReport:
raise ValueError("Unknown check result " + str(res))
class CryptolCheckRaw(argo.Command):
def __init__(self, connection : HasProtocolState, expr : Any, num_tests : Union[Literal['all'],int, None]) -> None:
def __init__(self, connection : HasProtocolState,
expr : Any,
num_tests : Union[Literal['all'],int, None],
timeout: Optional[float]) -> None:
if num_tests:
args = {'expression': expr, 'number of tests':num_tests}
else:
@ -161,7 +166,8 @@ class CryptolCheckRaw(argo.Command):
super(CryptolCheckRaw, self).__init__(
'check',
args,
connection
connection,
timeout=timeout
)
def process_result(self, res : Any) -> Any:
@ -173,11 +179,12 @@ class CryptolCheck(CryptolCheckRaw):
class CryptolCheckType(argo.Command):
def __init__(self, connection : HasProtocolState, expr : Any) -> None:
def __init__(self, connection : HasProtocolState, expr : Any, timeout: Optional[float]) -> None:
super(CryptolCheckType, self).__init__(
'check type',
{'expression': expr},
connection
connection,
timeout=timeout
)
def process_result(self, res : Any) -> Any:
@ -190,7 +197,13 @@ class SmtQueryType(str, Enum):
SAT = 'sat'
class CryptolProveSatRaw(argo.Command):
def __init__(self, connection : HasProtocolState, qtype : SmtQueryType, expr : Any, solver : Solver, count : Optional[int]) -> None:
def __init__(self,
connection : HasProtocolState,
qtype : SmtQueryType,
expr : Any,
solver : Solver,
count : Optional[int],
timeout: Optional[float]) -> None:
super(CryptolProveSatRaw, self).__init__(
'prove or satisfy',
{'query type': qtype,
@ -198,7 +211,8 @@ class CryptolProveSatRaw(argo.Command):
'prover': solver.name(),
'hash consing': "true" if solver.hash_consing() else "false",
'result count': 'all' if count is None else count},
connection
connection,
timeout=timeout
)
self.qtype = qtype
@ -228,41 +242,42 @@ class CryptolProveSat(CryptolProveSatRaw):
raise ValueError("Unknown SMT result: " + str(res))
class CryptolProveRaw(CryptolProveSatRaw):
def __init__(self, connection : HasProtocolState, expr : Any, solver : Solver) -> None:
super(CryptolProveRaw, self).__init__(connection, SmtQueryType.PROVE, expr, solver, 1)
def __init__(self, connection : HasProtocolState, expr : Any, solver : Solver, timeout: Optional[float]) -> None:
super(CryptolProveRaw, self).__init__(connection, SmtQueryType.PROVE, expr, solver, 1, timeout)
class CryptolProve(CryptolProveSat):
def __init__(self, connection : HasProtocolState, expr : Any, solver : Solver) -> None:
super(CryptolProve, self).__init__(connection, SmtQueryType.PROVE, expr, solver, 1)
def __init__(self, connection : HasProtocolState, expr : Any, solver : Solver, timeout: Optional[float]) -> None:
super(CryptolProve, self).__init__(connection, SmtQueryType.PROVE, expr, solver, 1, timeout=timeout)
class CryptolSatRaw(CryptolProveSatRaw):
def __init__(self, connection : HasProtocolState, expr : Any, solver : Solver, count : int) -> None:
super(CryptolSatRaw, self).__init__(connection, SmtQueryType.SAT, expr, solver, count)
def __init__(self, connection : HasProtocolState, expr : Any, solver : Solver, count : int, timeout: Optional[float]) -> None:
super(CryptolSatRaw, self).__init__(connection, SmtQueryType.SAT, expr, solver, count, timeout=timeout)
class CryptolSat(CryptolProveSat):
def __init__(self, connection : HasProtocolState, expr : Any, solver : Solver, count : int) -> None:
super(CryptolSat, self).__init__(connection, SmtQueryType.SAT, expr, solver, count)
def __init__(self, connection : HasProtocolState, expr : Any, solver : Solver, count : int, timeout: Optional[float]) -> None:
super(CryptolSat, self).__init__(connection, SmtQueryType.SAT, expr, solver, count, timeout=timeout)
class CryptolSafeRaw(CryptolProveSatRaw):
def __init__(self, connection : HasProtocolState, expr : Any, solver : Solver) -> None:
super(CryptolSafeRaw, self).__init__(connection, SmtQueryType.SAFE, expr, solver, 1)
def __init__(self, connection : HasProtocolState, expr : Any, solver : Solver, timeout: Optional[float]) -> None:
super(CryptolSafeRaw, self).__init__(connection, SmtQueryType.SAFE, expr, solver, 1, timeout=timeout)
class CryptolSafe(CryptolProveSat):
def __init__(self, connection : HasProtocolState, expr : Any, solver : Solver) -> None:
super(CryptolSafe, self).__init__(connection, SmtQueryType.SAFE, expr, solver, 1)
def __init__(self, connection : HasProtocolState, expr : Any, solver : Solver, timeout: Optional[float]) -> None:
super(CryptolSafe, self).__init__(connection, SmtQueryType.SAFE, expr, solver, 1, timeout=timeout)
class CryptolNames(argo.Command):
def __init__(self, connection : HasProtocolState) -> None:
super(CryptolNames, self).__init__('visible names', {}, connection)
def __init__(self, connection : HasProtocolState, timeout: Optional[float]) -> None:
super(CryptolNames, self).__init__('visible names', {}, connection, timeout=timeout)
def process_result(self, res : Any) -> Any:
return res
class CryptolFocusedModule(argo.Command):
def __init__(self, connection : HasProtocolState) -> None:
def __init__(self, connection : HasProtocolState, timeout: Optional[float]) -> None:
super(CryptolFocusedModule, self).__init__(
'focused module',
{},
connection
connection,
timeout=timeout
)
def process_result(self, res : Any) -> Any:
@ -285,3 +300,11 @@ class CryptolResetServer(argo.Notification):
{},
connection
)
class CryptolInterrupt(argo.Notification):
def __init__(self, connection : HasProtocolState) -> None:
super(CryptolInterrupt, self).__init__(
'interrupt',
{},
connection
)

View File

@ -20,7 +20,8 @@ def connect(command : Optional[str]=None,
url : Optional[str] = None,
reset_server : bool = False,
verify : Union[bool, str] = True,
log_dest : Optional[TextIO] = None) -> CryptolConnection:
log_dest : Optional[TextIO] = None,
timeout : Optional[float] = None) -> CryptolConnection:
"""
Connect to a (possibly new) Cryptol server process.
@ -45,6 +46,10 @@ def connect(command : Optional[str]=None,
will print traffic to ``stderr``, ``log_dest=open('foo.log', 'w')`` will log to ``foo.log``,
etc.
:param timeout: Optional default timeout (in seconds) for methods. Can be modified/read via the
`timeout` member field on a `CryptolConnection`. Method invocations which specify
the optional `timeout` keyword parameter will cause the default to be ignored for that method.
If no ``command`` or ``url`` parameters are provided, the following are attempted in order:
1. If the environment variable ``CRYPTOL_SERVER`` is set and referse to an executable,
@ -61,7 +66,7 @@ def connect(command : Optional[str]=None,
if command is not None:
if url is not None:
raise ValueError("A Cryptol server URL cannot be specified with a command currently.")
c = CryptolConnection(command, cryptol_path, log_dest=log_dest)
c = CryptolConnection(command, cryptol_path, log_dest=log_dest, timeout=timeout)
# User-passed url?
if c is None and url is not None:
c = CryptolConnection(ServerConnection(HttpProcess(url, verify=verify)), cryptol_path, log_dest=log_dest)
@ -71,17 +76,17 @@ def connect(command : Optional[str]=None,
if command is not None:
command = find_executable(command)
if command is not None:
c = CryptolConnection(command+" socket", cryptol_path=cryptol_path, log_dest=log_dest)
c = CryptolConnection(command+" socket", cryptol_path=cryptol_path, log_dest=log_dest, timeout=timeout)
# Check `CRYPTOL_SERVER_URL` env var if no connection identified yet
if c is None:
url = os.getenv('CRYPTOL_SERVER_URL')
if url is not None:
c = CryptolConnection(ServerConnection(HttpProcess(url,verify=verify)), cryptol_path, log_dest=log_dest)
c = CryptolConnection(ServerConnection(HttpProcess(url,verify=verify)), cryptol_path, log_dest=log_dest, timeout=timeout)
# Check if `cryptol-remote-api` is in the PATH if no connection identified yet
if c is None:
command = find_executable('cryptol-remote-api')
if command is not None:
c = CryptolConnection(command+" socket", cryptol_path=cryptol_path, log_dest=log_dest)
c = CryptolConnection(command+" socket", cryptol_path=cryptol_path, log_dest=log_dest, timeout=timeout)
# Raise an error if still no connection identified yet
if c is not None:
if reset_server:
@ -99,7 +104,8 @@ def connect(command : Optional[str]=None,
def connect_stdio(command : str,
cryptol_path : Optional[str] = None,
log_dest : Optional[TextIO] = None) -> CryptolConnection:
log_dest : Optional[TextIO] = None,
timeout : Optional[float] = None) -> CryptolConnection:
"""Start a new connection to a new Cryptol server process.
:param command: The command to launch the Cryptol server.
@ -110,7 +116,7 @@ def connect_stdio(command : str,
"""
conn = CryptolStdIOProcess(command, cryptol_path=cryptol_path)
return CryptolConnection(conn, log_dest=log_dest)
return CryptolConnection(conn, log_dest=log_dest, timeout=timeout)
class CryptolConnection:
@ -128,12 +134,17 @@ class CryptolConnection:
"""
most_recent_result : Optional[argo.Interaction]
timeout : Optional[float]
"""(Optional) default timeout in seconds for methods."""
def __init__(self,
command_or_connection : Union[str, ServerConnection, ServerProcess],
cryptol_path : Optional[str] = None,
*,
log_dest : Optional[TextIO] = None) -> None:
log_dest : Optional[TextIO] = None,
timeout : Optional[float] = None) -> None:
self.most_recent_result = None
self.timeout = timeout
if isinstance(command_or_connection, ServerProcess):
self.server_connection = ServerConnection(command_or_connection)
elif isinstance(command_or_connection, str):
@ -163,141 +174,185 @@ class CryptolConnection:
return self.most_recent_result.state()
# Protocol messages
def load_file(self, filename : str) -> argo.Command:
def load_file(self, filename : str, *, timeout:Optional[float] = None) -> argo.Command:
"""Load a filename as a Cryptol module, like ``:load`` at the Cryptol
REPL.
:param timeout: Optional timeout for this request (in seconds).
"""
self.most_recent_result = CryptolLoadFile(self, filename)
timeout = timeout if timeout is not None else self.timeout
self.most_recent_result = CryptolLoadFile(self, filename, timeout)
return self.most_recent_result
def load_module(self, module_name : str) -> argo.Command:
"""Load a Cryptol module, like ``:module`` at the Cryptol REPL."""
self.most_recent_result = CryptolLoadModule(self, module_name)
def load_module(self, module_name : str, *, timeout:Optional[float] = None) -> argo.Command:
"""Load a Cryptol module, like ``:module`` at the Cryptol REPL.
:param timeout: Optional timeout for this request (in seconds).
"""
timeout = timeout if timeout is not None else self.timeout
self.most_recent_result = CryptolLoadModule(self, module_name, timeout)
return self.most_recent_result
def eval_raw(self, expression : Any) -> argo.Command:
def eval_raw(self, expression : Any, *, timeout:Optional[float] = None) -> argo.Command:
"""Like the member method ``eval``, but does not call
``from_cryptol_arg`` on the ``.result()``.
:param timeout: Optional timeout for this request (in seconds).
"""
self.most_recent_result = CryptolEvalExprRaw(self, expression)
self.most_recent_result = CryptolEvalExprRaw(self, expression, timeout)
return self.most_recent_result
def eval(self, expression : Any) -> argo.Command:
def eval(self, expression : Any, *, timeout:Optional[float] = None) -> argo.Command:
"""Evaluate a Cryptol expression, represented according to
:ref:`cryptol-json-expression`, with Python datatypes standing
for their JSON equivalents.
:param timeout: Optional timeout for this request (in seconds).
"""
self.most_recent_result = CryptolEvalExpr(self, expression)
timeout = timeout if timeout is not None else self.timeout
self.most_recent_result = CryptolEvalExpr(self, expression, timeout)
return self.most_recent_result
def evaluate_expression(self, expression : Any) -> argo.Command:
def evaluate_expression(self, expression : Any, *, timeout:Optional[float] = None) -> argo.Command:
"""Synonym for member method ``eval``.
"""
return self.eval(expression)
def extend_search_path(self, *dir : str) -> argo.Command:
"""Extend the search path for loading Cryptol modules."""
self.most_recent_result = CryptolExtendSearchPath(self, list(dir))
:param timeout: Optional timeout for this request (in seconds)."""
return self.eval(expression, timeout=timeout)
def extend_search_path(self, *dir : str, timeout:Optional[float] = None) -> argo.Command:
"""Extend the search path for loading Cryptol modules.
:param timeout: Optional timeout for this request (in seconds)."""
timeout = timeout if timeout is not None else self.timeout
self.most_recent_result = CryptolExtendSearchPath(self, list(dir), timeout)
return self.most_recent_result
def call_raw(self, fun : str, *args : List[Any]) -> argo.Command:
def call_raw(self, fun : str, *args : List[Any], timeout:Optional[float] = None) -> argo.Command:
"""Like the member method ``call``, but does not call
``from_cryptol_arg`` on the ``.result()``.
"""
timeout = timeout if timeout is not None else self.timeout
encoded_args = [cryptoltypes.CryptolType().from_python(a) for a in args]
self.most_recent_result = CryptolCallRaw(self, fun, encoded_args)
self.most_recent_result = CryptolCallRaw(self, fun, encoded_args, timeout)
return self.most_recent_result
def call(self, fun : str, *args : List[Any]) -> argo.Command:
def call(self, fun : str, *args : List[Any], timeout:Optional[float] = None) -> argo.Command:
"""Call function ``fun`` with specified ``args``.
:param timeout: Optional timeout for this request (in seconds)."""
timeout = timeout if timeout is not None else self.timeout
encoded_args = [cryptoltypes.CryptolType().from_python(a) for a in args]
self.most_recent_result = CryptolCall(self, fun, encoded_args)
self.most_recent_result = CryptolCall(self, fun, encoded_args, timeout)
return self.most_recent_result
def check_raw(self, expr : Any, *, num_tests : Union[Literal['all'], int, None] = None) -> argo.Command:
def check_raw(self, expr : Any, *, num_tests : Union[Literal['all'], int, None] = None, timeout:Optional[float] = None) -> argo.Command:
"""Like the member method ``check``, but does not call
`to_check_report` on the ``.result()``.
"""
if num_tests == "all" or isinstance(num_tests, int) or num_tests is None:
self.most_recent_result = CryptolCheckRaw(self, expr, num_tests)
self.most_recent_result = CryptolCheckRaw(self, expr, num_tests, timeout)
return self.most_recent_result
else:
raise ValueError('``num_tests`` must be an integer, ``None``, or the string literall ``"all"``')
def check(self, expr : Any, *, num_tests : Union[Literal['all'], int, None] = None) -> argo.Command:
def check(self, expr : Any, *, num_tests : Union[Literal['all'], int, None] = None, timeout:Optional[float] = None) -> argo.Command:
"""Tests the validity of a Cryptol expression with random inputs. The expression must be a function with
return type ``Bit``.
If ``num_tests`` is ``"all"`` then the expression is tested exhaustively (i.e., against all possible inputs).
If ``num_tests`` is omitted, Cryptol defaults to running 100 tests.
:param timeout: Optional timeout for this request (in seconds).
"""
timeout = timeout if timeout is not None else self.timeout
if num_tests == "all" or isinstance(num_tests, int) or num_tests is None:
self.most_recent_result = CryptolCheck(self, expr, num_tests)
self.most_recent_result = CryptolCheck(self, expr, num_tests, timeout)
return self.most_recent_result
else:
raise ValueError('``num_tests`` must be an integer, ``None``, or the string literall ``"all"``')
def check_type(self, code : Any) -> argo.Command:
def check_type(self, code : Any, *, timeout:Optional[float] = None) -> argo.Command:
"""Check the type of a Cryptol expression, represented according to
:ref:`cryptol-json-expression`, with Python datatypes standing for
their JSON equivalents.
:param timeout: Optional timeout for this request (in seconds).
"""
self.most_recent_result = CryptolCheckType(self, code)
timeout = timeout if timeout is not None else self.timeout
self.most_recent_result = CryptolCheckType(self, code, timeout)
return self.most_recent_result
def sat_raw(self, expr : Any, solver : solver.Solver = solver.Z3, count : int = 1) -> argo.Command:
def sat_raw(self, expr : Any, solver : solver.Solver = solver.Z3, count : int = 1, *, timeout:Optional[float] = None) -> argo.Command:
"""Like the member method ``sat``, but does not call
`to_smt_query_result` on the ``.result()``.
"""
self.most_recent_result = CryptolSatRaw(self, expr, solver, count)
self.most_recent_result = CryptolSatRaw(self, expr, solver, count, timeout)
return self.most_recent_result
def sat(self, expr : Any, solver : solver.Solver = solver.Z3, count : int = 1) -> argo.Command:
def sat(self, expr : Any, solver : solver.Solver = solver.Z3, count : int = 1, *, timeout:Optional[float] = None) -> argo.Command:
"""Check the satisfiability of a Cryptol expression, represented according to
:ref:`cryptol-json-expression`, with Python datatypes standing for
their JSON equivalents. Use the solver named `solver`, and return up to
`count` solutions.
:param timeout: Optional timeout for this request (in seconds).
"""
self.most_recent_result = CryptolSat(self, expr, solver, count)
timeout = timeout if timeout is not None else self.timeout
self.most_recent_result = CryptolSat(self, expr, solver, count, timeout)
return self.most_recent_result
def prove_raw(self, expr : Any, solver : solver.Solver = solver.Z3) -> argo.Command:
def prove_raw(self, expr : Any, solver : solver.Solver = solver.Z3, *, timeout:Optional[float] = None) -> argo.Command:
"""Like the member method ``prove``, but does not call
`to_smt_query_result` on the ``.result()``.
"""
self.most_recent_result = CryptolProveRaw(self, expr, solver)
self.most_recent_result = CryptolProveRaw(self, expr, solver, timeout)
return self.most_recent_result
def prove(self, expr : Any, solver : solver.Solver = solver.Z3) -> argo.Command:
def prove(self, expr : Any, solver : solver.Solver = solver.Z3, *, timeout:Optional[float] = None) -> argo.Command:
"""Check the validity of a Cryptol expression, represented according to
:ref:`cryptol-json-expression`, with Python datatypes standing for
their JSON equivalents. Use the solver named `solver`.
:param timeout: Optional timeout for this request (in seconds).
"""
self.most_recent_result = CryptolProve(self, expr, solver)
timeout = timeout if timeout is not None else self.timeout
self.most_recent_result = CryptolProve(self, expr, solver, timeout)
return self.most_recent_result
def safe_raw(self, expr : Any, solver : solver.Solver = solver.Z3) -> argo.Command:
def safe_raw(self, expr : Any, solver : solver.Solver = solver.Z3, *, timeout:Optional[float] = None) -> argo.Command:
"""Like the member method ``safe``, but does not call
`to_smt_query_result` on the ``.result()``.
"""
self.most_recent_result = CryptolSafeRaw(self, expr, solver)
self.most_recent_result = CryptolSafeRaw(self, expr, solver, timeout)
return self.most_recent_result
def safe(self, expr : Any, solver : solver.Solver = solver.Z3) -> argo.Command:
def safe(self, expr : Any, solver : solver.Solver = solver.Z3, *, timeout:Optional[float] = None) -> argo.Command:
"""Check via an external SMT solver that the given term is safe for all inputs,
which means it cannot encounter a run-time error.
:param timeout: Optional timeout for this request (in seconds).
"""
self.most_recent_result = CryptolSafe(self, expr, solver)
timeout = timeout if timeout is not None else self.timeout
self.most_recent_result = CryptolSafe(self, expr, solver, timeout)
return self.most_recent_result
def names(self) -> argo.Command:
"""Discover the list of names currently in scope in the current context."""
self.most_recent_result = CryptolNames(self)
def names(self, *, timeout:Optional[float] = None) -> argo.Command:
"""Discover the list of names currently in scope in the current context.
:param timeout: Optional timeout for this request (in seconds)."""
timeout = timeout if timeout is not None else self.timeout
self.most_recent_result = CryptolNames(self, timeout)
return self.most_recent_result
def focused_module(self) -> argo.Command:
"""Return the name of the currently-focused module."""
self.most_recent_result = CryptolFocusedModule(self)
def focused_module(self, *, timeout:Optional[float] = None) -> argo.Command:
"""Return the name of the currently-focused module.
:param timeout: Optional timeout for this request (in seconds)."""
timeout = timeout if timeout is not None else self.timeout
self.most_recent_result = CryptolFocusedModule(self, timeout)
return self.most_recent_result
def reset(self) -> None:
@ -312,6 +367,10 @@ class CryptolConnection:
CryptolResetServer(self)
self.most_recent_result = None
def interrupt(self) -> None:
"""Interrupt the Cryptol server, cancelling any in-progress requests."""
CryptolInterrupt(self)
def logging(self, on : bool, *, dest : TextIO = sys.stderr) -> None:
"""Whether to log received and transmitted JSON."""
self.server_connection.logging(on=on,dest=dest)

View File

@ -1,6 +1,6 @@
[[package]]
name = "argo-client"
version = "0.0.7"
version = "0.0.9"
description = "A JSON RPC client library."
category = "main"
optional = false
@ -118,12 +118,12 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
[metadata]
lock-version = "1.1"
python-versions = ">=3.7.0"
content-hash = "d02d45c69699f0c6e033eb54c94fd8858dbe1d83677f2bca29f3a5ba68c02cbe"
content-hash = "fd163dd3fcc8f5322f548ad841c621370611d2f25fafc2dffb03395d30ed7d5e"
[metadata.files]
argo-client = [
{file = "argo-client-0.0.7.tar.gz", hash = "sha256:f3d54416448fff0162b1b669ca7ae1d846b8197fd77b7809f78ceb7f1b2675fb"},
{file = "argo_client-0.0.7-py3-none-any.whl", hash = "sha256:0149c5fc5a008cdaabd3764af0bcadce3efb07fdf6e1fb6fca86fc9cda5560d8"},
{file = "argo-client-0.0.9.tar.gz", hash = "sha256:762e16d7428046ecf1d4fd50d877cba9a3c21eac260806030876f76acac81325"},
{file = "argo_client-0.0.9-py3-none-any.whl", hash = "sha256:c482e3ba3346c7677fc1c8a470b173ba0d59736d6c628b0193a13b8e3aeeb9f1"},
]
bitvector = [
{file = "BitVector-3.5.0.tar.gz", hash = "sha256:cac2fbccf11e325115827ed7be03e5fd62615227b0bbf3fa5a18a842a221839c"},

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "cryptol"
version = "2.11.5"
version = "2.11.6"
readme = "README.md"
keywords = ["cryptography", "verification"]
description = "Cryptol client for the Cryptol 2.11 RPC server"
@ -15,7 +15,7 @@ include = [
python = ">=3.7.0"
requests = "^2.25.1"
BitVector = "^3.4.9"
argo-client = "0.0.7"
argo-client = "0.0.9"
[tool.poetry.dev-dependencies]
mypy = "^0.812"

View File

@ -1,7 +1,9 @@
import unittest
from argo_client.interaction import ArgoException
from pathlib import Path
import unittest
import io
import time
import cryptol
import cryptol.cryptoltypes
from cryptol.bitvector import BV
@ -17,37 +19,128 @@ class BasicServerTests(unittest.TestCase):
@classmethod
def setUpClass(self):
self.c = cryptol.sync.connect(verify=False)
@classmethod
def tearDownClass(self):
if self.c:
self.c.reset()
self.c = cryptol.connect(verify=False)
def test_extend_search_path(self):
"""Test that extending the search path acts as expected w.r.t. loads."""
c = self.c
c.extend_search_path(str(Path('tests','cryptol','test-files', 'test-subdir')))
c.load_module('Bar')
ans1 = c.eval("theAnswer")
ans2 = c.eval("id theAnswer")
c.load_module('Bar').result()
ans1 = c.eval("theAnswer").result()
ans2 = c.eval("id theAnswer").result()
self.assertEqual(ans1, ans2)
def test_logging(self):
c = self.c
c.extend_search_path(str(Path('tests','cryptol','test-files', 'test-subdir')))
c.load_module('Bar')
c.load_module('Bar').result()
log_buffer = io.StringIO()
c.logging(on=True, dest=log_buffer)
_ = c.eval("theAnswer")
_ = c.eval("theAnswer").result()
contents = log_buffer.getvalue()
print(f'CONTENTS: {contents}', file=sys.stderr)
self.assertEqual(len(contents.strip().splitlines()), 2)
self.assertEqual(len(contents.strip().splitlines()), 2,
msg=f'log contents: {str(contents.strip().splitlines())}')
_ = c.eval("theAnswer")
_ = c.eval("theAnswer").result()
def test_check_timeout(self):
c = self.c
c.load_file(str(Path('tests','cryptol','test-files', 'examples','AES.cry'))).result()
t1 = time.time()
with self.assertRaises(ArgoException):
c.check("\\(bv : [256]) -> ~ (~ (~ (~bv))) == bv", num_tests="all", timeout=1.0).result()
t2 = time.time()
self.assertLess(t2 - t1, 2.0)
t1 = time.time()
with self.assertRaises(ArgoException):
c.check("\\(bv : [256]) -> ~ (~ (~ (~bv))) == bv", num_tests="all", timeout=5.0).result()
t2 = time.time()
self.assertLess(t2 - t1, 7)
t1 = time.time()
c.check("\\(bv : [256]) -> ~ (~ (~ (~bv))) == bv", num_tests=10, timeout=5.0).result()
t2 = time.time()
self.assertLess(t2 - t1, 5)
def test_interrupt(self):
c = self.c
c.load_file(str(Path('tests','cryptol','test-files', 'examples','AES.cry')))
c.check("\\(bv : [256]) -> ~ (~ (~ (~bv))) == bv", num_tests="all")
# ^ .result() intentionally omitted so we don't wait on it's result and we can interrupt
# it on the next line.
time.sleep(.5)
c.interrupt()
self.assertTrue(c.safe("aesEncrypt").result())
def test_prove_timeout(self):
c = self.c
c.load_file(str(Path('tests','cryptol','test-files', 'examples','AES.cry')))
pt = BV(size=128, value=0x3243f6a8885a308d313198a2e0370734)
key = BV(size=128, value=0x2b7e151628aed2a6abf7158809cf4f3c)
ct = c.call("aesEncrypt", (pt, key)).result()
expected_ct = BV(size=128, value=0x3925841d02dc09fbdc118597196a0b32)
self.assertEqual(ct, expected_ct)
decrypted_ct = c.call("aesDecrypt", (ct, key)).result()
self.assertEqual(pt, decrypted_ct)
pt = BV(size=128, value=0x00112233445566778899aabbccddeeff)
key = BV(size=128, value=0x000102030405060708090a0b0c0d0e0f)
ct = c.call("aesEncrypt", (pt, key)).result()
expected_ct = BV(size=128, value=0x69c4e0d86a7b0430d8cdb78070b4c55a)
self.assertEqual(ct, expected_ct)
decrypted_ct = c.call("aesDecrypt", (ct, key)).result()
self.assertEqual(pt, decrypted_ct)
self.assertTrue(c.safe("aesEncrypt").result())
self.assertTrue(c.safe("aesDecrypt").result())
self.assertTrue(c.check("AESCorrect").result().success)
t1 = time.time()
with self.assertRaises(ArgoException):
c.prove("AESCorrect", timeout=1.0).result()
t2 = time.time()
# check the timeout worked
self.assertGreaterEqual(t2 - t1, 1.0)
self.assertLess(t2 - t1, 5.0)
# make sure things are still working
self.assertTrue(c.safe("aesEncrypt").result())
# set the timeout at the connection level
c.timeout = 1.0
t1 = time.time()
with self.assertRaises(ArgoException):
c.prove("AESCorrect").result()
t2 = time.time()
# check the timeout worked
self.assertGreaterEqual(t2 - t1, 1.0)
self.assertLess(t2 - t1, 5.0)
# make sure things are still working
c.timeout = None
self.assertTrue(c.safe("aesEncrypt").result())
c.timeout = 1.0
t1 = time.time()
with self.assertRaises(ArgoException):
# override timeout with longer time
c.prove("AESCorrect", timeout=5.0).result()
t2 = time.time()
self.assertGreaterEqual(t2 - t1, 5.0)
self.assertLess(t2 - t1, 10.0)
# make sure things are still working
c.timeout = None
self.assertTrue(c.safe("aesEncrypt").result())
class BasicLoggingServerTests(unittest.TestCase):
@ -70,8 +163,10 @@ class BasicLoggingServerTests(unittest.TestCase):
c.extend_search_path(str(Path('tests','cryptol','test-files', 'test-subdir')))
c.load_module('Bar')
_ = c.eval("theAnswer")
content_lines = self.log_buffer.getvalue().strip().splitlines()
self.assertEqual(len(self.log_buffer.getvalue().splitlines()), 6)
self.assertEqual(len(content_lines), 6,
msg=f'log contents: {str(content_lines)}')
if __name__ == "__main__":
unittest.main()

View File

@ -99,6 +99,7 @@ class CryptolTests(unittest.TestCase):
self.assertIsInstance(c.prove('\\(x : [8]) -> x == reverse (reverse x)', solver.SBV_OFFLINE), solver.OfflineSmtQuery)
self.assertIsInstance(c.prove('\\(x : [8]) -> x == reverse (reverse x)', solver.W4_OFFLINE), solver.OfflineSmtQuery)
def test_check(self):
c = self.c
res = c.check("\\x -> x==(x:[8])")

View File

@ -0,0 +1,33 @@
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
module CryptolServer.Interrupt
( interruptServer
, interruptServerDescr
) where
import qualified Argo
import qualified Argo.Doc as Doc
import qualified Data.Aeson as JSON
import CryptolServer
------------------------------------------------------------------------
-- Interrupt All Threads Command
data InterruptServerParams = InterruptServerParams
instance JSON.FromJSON InterruptServerParams where
parseJSON =
JSON.withObject "params for \"interrupt\"" $
\_ -> pure InterruptServerParams
instance Doc.DescribedMethod InterruptServerParams () where
parameterFieldDescription = []
interruptServerDescr :: Doc.Block
interruptServerDescr =
Doc.Paragraph [Doc.Text "Interrupt the server, terminating it's current task (if one exists)."]
interruptServer :: InterruptServerParams -> CryptolNotification ()
interruptServer _ = CryptolNotification . const $ Argo.interruptAllThreads

2
deps/argo vendored

@ -1 +1 @@
Subproject commit 101cad520fc353a8673f9ba9d71b70e2f9ec59d9
Subproject commit 72dc6f057d609ddaee8d964c41984faaa97e4cf1