Allow @ send-text over a socket to work with more than 1MB of text

This commit is contained in:
Kovid Goyal 2020-04-30 08:58:46 +05:30
parent 195ea6c140
commit 8327dabb29
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 55 additions and 30 deletions

View File

@ -360,28 +360,16 @@ class Boss:
except (Exception, SystemExit) as e:
self.show_error(_('remote_control mapping failed'), str(e))
def handle_peer_cmd(self, msg_bytes: bytes) -> Optional[bytes]:
cmd_prefix_b = b'\x1bP@kitty-cmd'
pl = len(cmd_prefix_b)
terminator = b'\x1b\\'
tl = len(terminator)
resp = b''
pos = 0
while msg_bytes[pos:pos+pl] == cmd_prefix_b:
idx = msg_bytes.find(terminator, pos + pl)
if idx < pos + pl:
break
cmd = msg_bytes[pos+pl:idx].decode('utf-8')
response = self._handle_remote_command(cmd, from_peer=True)
if response is not None:
resp += cmd_prefix_b + json.dumps(response).encode('utf-8') + b'\x1b\\'
pos = idx + tl
return resp or None
def peer_message_received(self, msg_bytes: bytes) -> Optional[bytes]:
cmd_prefix_b = b'\x1bP@kitty-cmd'
if msg_bytes.startswith(cmd_prefix_b):
return self.handle_peer_cmd(msg_bytes)
cmd_prefix = b'\x1bP@kitty-cmd'
terminator = b'\x1b\\'
if msg_bytes.startswith(cmd_prefix) and msg_bytes.endswith(terminator):
cmd = msg_bytes[len(cmd_prefix):-len(terminator)].decode('utf-8')
response = self._handle_remote_command(cmd, from_peer=True)
if response is None:
return None
return cmd_prefix + json.dumps(response).encode('utf-8') + terminator
data = json.loads(msg_bytes.decode('utf-8'))
if isinstance(data, dict) and data.get('cmd') == 'new_instance':
from .cli_stub import CLIOptions

View File

@ -1279,7 +1279,7 @@ typedef struct {
char *data;
size_t capacity, used;
int fd;
bool finished, close_socket;
bool finished, close_socket, is_peer_command;
} PeerReadData;
static PeerReadData empty_prd = {.fd = -1, 0};
@ -1329,6 +1329,41 @@ accept_peer(int listen_fd, bool shutting_down) {
return true;
}
#define KITTY_CMD_PREFIX "\x1bP@kitty-cmd{"
static inline void
queue_peer_message(ChildMonitor *self, char *buf, size_t sz, int fd) {
children_mutex(lock);
ensure_space_for(self, messages, Message, self->messages_count + 1, messages_capacity, 16, true);
Message *m = self->messages + self->messages_count++;
m->data = buf; m->sz = sz; m->fd = fd;
children_mutex(unlock);
wakeup_main_loop();
}
static inline void
dispatch_peer_command(ChildMonitor *self, PeerReadData *rd, int fd) {
size_t end = 0;
for (size_t i = 0; i < rd->used - 1; i++) {
if (rd->data[i] == 0x1b && rd->data[i+1] == '\\') {
end = i + 2;
break;
}
}
if (!end) return;
char *buf = malloc(end + 8);
if (buf) {
memcpy(buf, rd->data, end);
queue_peer_message(self, buf, end, fd);
}
rd->is_peer_command = false;
if (rd->used > end) {
rd->used -= end;
memmove(rd->data, rd->data + end, rd->used);
if (rd->used >= sizeof(KITTY_CMD_PREFIX) - 1 && memcmp(rd->data, KITTY_CMD_PREFIX, sizeof(KITTY_CMD_PREFIX)-1) == 0) rd->is_peer_command = true;
} else rd->used = 0;
}
static inline bool
read_from_peer(ChildMonitor *self, int s) {
bool read_finished = false;
@ -1337,26 +1372,28 @@ read_from_peer(ChildMonitor *self, int s) {
#define failed(msg) { read_finished = true; log_error("%s", msg); rd->finished = true; rd->close_socket = true; break; }
if (rd->fd == s) {
if (rd->used >= rd->capacity) {
if (rd->capacity >= 1024 * 1024) failed("Ignoring too large message from peer");
if (rd->capacity >= 64 * 1024) failed("Ignoring too large message from peer");
rd->capacity = MAX(8192u, rd->capacity * 2);
rd->data = realloc(rd->data, rd->capacity);
if (!rd->data) failed("Out of memory");
}
ssize_t n = recv(s, rd->data + rd->used, rd->capacity - rd->used, 0);
if (n == 0) {
while (rd->is_peer_command) dispatch_peer_command(self, rd, s);
read_finished = true; rd->finished = true;
children_mutex(lock);
ensure_space_for(self, messages, Message, self->messages_count + 1, messages_capacity, 16, true);
Message *m = self->messages + self->messages_count++;
m->data = rd->data; rd->data = NULL; m->sz = rd->used; m->fd = s;
children_mutex(unlock);
wakeup_main_loop();
if (rd->used) queue_peer_message(self, rd->data, rd->used, s);
else free(rd->data);
rd->data = NULL;
} else if (n < 0) {
if (errno != EINTR) {
perror("Error reading from talk peer");
failed("");
}
} else rd->used += n;
} else {
if (!rd->used && memcmp(rd->data, KITTY_CMD_PREFIX, sizeof(KITTY_CMD_PREFIX)-1) == 0) rd->is_peer_command = true;
rd->used += n;
while (rd->is_peer_command) dispatch_peer_command(self, rd, s);
}
break;
}
}