yubioath-flutter/lib/desktop/rpc.dart

195 lines
4.9 KiB
Dart
Raw Normal View History

2021-11-19 13:10:00 +03:00
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:logging/logging.dart';
2021-11-22 14:01:51 +03:00
import 'package:async/async.dart';
import '../app/models.dart';
2021-11-19 13:10:00 +03:00
import 'models.dart';
final _log = Logger('rpc');
2021-11-19 13:10:00 +03:00
class Signaler {
2021-11-22 15:59:52 +03:00
final _send = StreamController<String>();
final _recv = StreamController<Signal>();
2021-11-19 13:10:00 +03:00
2021-11-22 15:59:52 +03:00
Stream<Signal> get signals => _recv.stream;
2021-11-19 13:10:00 +03:00
void cancel() {
2021-11-22 15:59:52 +03:00
_send.add('cancel');
}
void _close() {
_send.close();
_recv.close();
2021-11-19 13:10:00 +03:00
}
}
class _Request {
final String action;
final List<String> target;
final Map body;
final Signaler? signal;
2021-11-22 14:49:23 +03:00
final Completer<Map<String, dynamic>> completer = Completer();
2021-11-19 13:10:00 +03:00
2021-11-22 15:59:52 +03:00
_Request(this.action, this.target, this.body, this.signal);
2021-11-19 13:10:00 +03:00
Map<String, dynamic> toJson() => {
'kind': 'command',
'action': action,
'target': target,
'body': body,
2021-11-19 13:10:00 +03:00
};
}
2021-11-24 13:36:59 +03:00
const _py2level = {
'DEBUG': Level.CONFIG,
'INFO': Level.INFO,
'WARNING': Level.WARNING,
'ERROR': Level.SEVERE,
'CRITICAL': Level.SHOUT,
};
2021-11-19 13:10:00 +03:00
class RpcSession {
final Process _process;
2021-11-22 14:01:51 +03:00
final StreamController<_Request> _requests = StreamController();
final StreamQueue<RpcResponse> _responses;
2021-11-19 13:10:00 +03:00
RpcSession(this._process)
2021-11-22 14:01:51 +03:00
: _responses = StreamQueue(_process.stdout
2021-11-19 13:10:00 +03:00
.transform(const Utf8Decoder())
.transform(const LineSplitter())
2021-11-22 14:01:51 +03:00
.map((event) => RpcResponse.fromJson(jsonDecode(event)))) {
_process.stderr
.transform(const Utf8Decoder())
.transform(const LineSplitter())
.map((event) => jsonDecode(event))
.listen((event) {
Logger('rpc.${event['name']}').log(
2021-11-24 13:36:59 +03:00
_py2level[event['level']] ?? Level.INFO,
event['message'],
2021-11-25 13:35:31 +03:00
event['exc_text'],
//time: DateTime.fromMillisecondsSinceEpoch(event['time'] * 1000),
);
}, onError: (err) {
Logger('rpc.error').log(
Level.WARNING,
err.toString(),
);
});
_log.info('Launched ykman subprocess...');
2021-11-22 14:01:51 +03:00
_pump();
2021-11-19 13:10:00 +03:00
}
static Future<RpcSession> launch(String executable) async {
2022-02-10 15:47:58 +03:00
var process = await Process.start(executable, []);
2021-11-19 13:10:00 +03:00
return RpcSession(process);
}
Future<Map<String, dynamic>> command(String action, List<String>? target,
{Map? params, Signaler? signal}) {
var request = _Request(action, target ?? [], params ?? {}, signal);
_requests.add(request);
return request.completer.future;
}
setLogLevel(Level level) {
String pyLevel;
if (level.value <= Level.FINE.value) {
pyLevel = 'traffic';
} else if (level.value <= Level.CONFIG.value) {
pyLevel = 'debug';
} else if (level.value <= Level.INFO.value) {
pyLevel = 'info';
} else if (level.value <= Level.WARNING.value) {
pyLevel = 'warning';
} else if (level.value <= Level.SEVERE.value) {
pyLevel = 'error';
} else {
pyLevel = 'critical';
}
command('logging', [], params: {'level': pyLevel});
}
2021-11-19 13:10:00 +03:00
void _send(Map data) {
_log.fine('SEND', jsonEncode(data));
2021-11-19 13:10:00 +03:00
_process.stdin.writeln(jsonEncode(data));
_process.stdin.flush();
}
2021-11-22 14:01:51 +03:00
void _pump() async {
await for (final request in _requests.stream) {
_send(request.toJson());
2021-11-22 15:59:52 +03:00
request.signal?._send.stream.listen((status) {
2021-11-22 14:49:23 +03:00
_send({'kind': 'signal', 'status': status});
2021-11-22 15:59:52 +03:00
});
2021-11-22 14:49:23 +03:00
2021-11-22 15:59:52 +03:00
bool completed = false;
while (!completed) {
2021-11-22 14:01:51 +03:00
final response = await _responses.next;
_log.fine('RECV', jsonEncode(response));
2021-11-22 14:01:51 +03:00
response.map(
signal: (signal) {
2021-11-22 15:59:52 +03:00
request.signal?._recv.sink.add(signal);
2021-11-22 14:01:51 +03:00
},
success: (success) {
request.completer.complete(success.body);
2021-11-22 15:59:52 +03:00
completed = true;
2021-11-22 14:01:51 +03:00
},
error: (error) {
request.completer.completeError(error);
2021-11-22 15:59:52 +03:00
completed = true;
2021-11-22 14:01:51 +03:00
},
);
}
2021-11-22 15:59:52 +03:00
request.signal?._close();
2021-11-22 14:01:51 +03:00
}
}
2021-11-19 13:10:00 +03:00
}
typedef ErrorHandler = Future<void> Function(RpcError e);
class RpcNodeSession {
final RpcSession _rpc;
final DevicePath devicePath;
final List<String> subPath;
final Map<String, ErrorHandler> _errorHandlers = {};
RpcNodeSession(this._rpc, this.devicePath, this.subPath);
void setErrorHandler(String status, ErrorHandler handler) {
_errorHandlers[status] = handler;
}
void unserErrorHandler(String status) {
_errorHandlers.remove(status);
}
Future<Map<String, dynamic>> command(
String action, {
List<String> target = const [],
Map<dynamic, dynamic>? params,
Signaler? signal,
}) async {
try {
return await _rpc.command(
action,
devicePath.segments + subPath + target,
params: params,
signal: signal,
);
} on RpcError catch (e) {
final handler = _errorHandlers[e.status];
if (handler != null) {
_log.info('Attempting recovery on "${e.status}"');
await handler(e);
return command(action, target: target, params: params, signal: signal);
}
rethrow;
}
}
}