1
1
mirror of https://github.com/kanaka/mal.git synced 2024-11-11 00:52:44 +03:00
mal/plpgsql/io.sql

225 lines
6.5 KiB
MySQL
Raw Normal View History

-- dblink is necessary to be able to sub-transactions (autonomous
-- transactions) to the stream table. This is necessary to be able to
-- modify the stream table from the perspective of outside callers
-- because actual code can be long-lived and it's direct updates will
-- not be seen until the process completes.
CREATE SCHEMA io
CREATE TABLE stream (
stream_id integer,
open boolean,
data varchar,
rl_prompt varchar -- prompt for readline input
);
-- stdin
INSERT INTO io.stream (stream_id, open, data, rl_prompt)
VALUES (0, false, '', '');
-- stdout
INSERT INTO io.stream (stream_id, open, data, rl_prompt)
VALUES (1, false, '', '');
-- ---------------------------------------------------------
CREATE FUNCTION io.open(sid integer) RETURNS void AS $$
DECLARE
query varchar;
BEGIN
--RAISE NOTICE 'io.open start';
query := format('UPDATE io.stream
SET data = '''', rl_prompt = '''', open = true
WHERE stream_id = %L', sid);
PERFORM dblink('dbname=mal', query);
--RAISE NOTICE 'io.open done';
END; $$ LANGUAGE 'plpgsql' STRICT;
CREATE FUNCTION io.close(sid integer) RETURNS void AS $$
DECLARE
query varchar;
BEGIN
--RAISE NOTICE 'io.close start';
query := format('UPDATE io.stream
SET rl_prompt = '''', open = false
WHERE stream_id = %L', sid);
PERFORM dblink('dbname=mal', query);
--RAISE NOTICE 'io.close done';
END; $$ LANGUAGE 'plpgsql' STRICT;
-- called from read via dblink
CREATE FUNCTION io.__read(sid integer) RETURNS varchar AS $$
DECLARE
input varchar;
isopen boolean;
BEGIN
LOCK io.stream;
SELECT data, open INTO input, isopen FROM io.stream
WHERE stream_id = sid;
IF input <> '' THEN
UPDATE io.stream SET data = '' WHERE stream_id = sid;
RETURN input;
END IF;
IF isopen = false THEN
RETURN NULL;
END IF;
RETURN input;
END; $$ LANGUAGE 'plpgsql' STRICT;
-- read:
-- read from stream stream_id in stream table. Waits until there is
-- either data to return or the stream closes (NULL data). Returns
-- NULL when stream is closed.
CREATE FUNCTION io.read(sid integer DEFAULT 0) RETURNS varchar AS $$
DECLARE
query varchar;
input varchar;
sleep real = 0.05;
BEGIN
-- poll / wait for input
query := format('SELECT io.__read(%L);', sid);
WHILE true
LOOP
-- atomic get and set to empty
SELECT cur_data INTO input FROM dblink('dbname=mal', query)
AS t1(cur_data varchar);
IF input <> '' OR input IS NULL THEN
RETURN input;
END IF;
PERFORM pg_sleep(sleep);
IF sleep < 0.5 THEN
sleep := sleep * 1.1; -- backoff
END IF;
END LOOP;
2016-03-26 07:11:40 +03:00
END; $$ LANGUAGE 'plpgsql' STRICT;
-- read_or_error:
-- similar to read, but throws exception when stream is closed
CREATE FUNCTION io.read_or_error(sid integer DEFAULT 0) RETURNS varchar AS $$
DECLARE
input varchar;
BEGIN
input := io.read(sid);
IF input IS NULL THEN
raise EXCEPTION 'Stream ''%'' is closed', sid;
ELSE
RETURN input;
END IF;
END; $$ LANGUAGE 'plpgsql' STRICT;
-- readline:
-- set prompt and wait for readline style input on the stream
CREATE FUNCTION io.readline(prompt varchar, sid integer DEFAULT 0)
RETURNS varchar AS $$
DECLARE
query varchar;
BEGIN
-- set prompt / request readline style input
IF sid = 0 THEN
PERFORM io.wait_flushed(1);
ELSIF sid = 1 THEN
PERFORM io.wait_flushed(0);
END IF;
query := format('LOCK io.stream; UPDATE io.stream SET rl_prompt = %L',
prompt);
PERFORM dblink('dbname=mal', query);
RETURN io.read(sid);
2016-03-26 07:11:40 +03:00
END; $$ LANGUAGE 'plpgsql' STRICT;
CREATE FUNCTION io.write(data varchar, sid integer DEFAULT 1)
RETURNS void AS $$
DECLARE
query varchar;
BEGIN
query := format('LOCK io.stream;
UPDATE io.stream SET data = data || %L WHERE stream_id = %L',
data, sid);
--RAISE NOTICE 'write query: %', query;
PERFORM dblink('dbname=mal', query);
2016-03-26 07:11:40 +03:00
END; $$ LANGUAGE 'plpgsql' STRICT;
CREATE FUNCTION io.writeline(data varchar, sid integer DEFAULT 1)
RETURNS void AS $$
BEGIN
PERFORM io.write(data || E'\n', sid);
2016-03-26 07:11:40 +03:00
END; $$ LANGUAGE 'plpgsql' STRICT;
-- ---------------------------------------------------------
-- called from wait_rl_prompt via dblink
CREATE FUNCTION io.__wait_rl_prompt(sid integer) RETURNS varchar AS $$
DECLARE
isopen boolean;
prompt varchar;
datas integer;
BEGIN
LOCK io.stream;
SELECT open, rl_prompt INTO isopen, prompt FROM io.stream
WHERE stream_id = sid;
SELECT count(stream_id) INTO datas FROM io.stream WHERE data <> '';
IF isopen = false THEN
return NULL;
--raise EXCEPTION 'Stream ''%'' is closed', sid;
END IF;
IF datas = 0 AND prompt <> '' THEN
UPDATE io.stream SET rl_prompt = '' WHERE stream_id = sid;
-- There is pending data on some stream
RETURN prompt;
END IF;
RETURN ''; -- '' -> no input
END; $$ LANGUAGE 'plpgsql' STRICT;
-- wait_rl_prompt:
-- wait for rl_prompt to be set on the given stream and return the
-- rl_prompt value. Errors if stream is already closed.
CREATE FUNCTION io.wait_rl_prompt(sid integer DEFAULT 0) RETURNS varchar AS $$
DECLARE
query varchar;
prompt varchar;
sleep real = 0.05;
BEGIN
query := format('SELECT io.__wait_rl_prompt(%L);', sid);
WHILE true
LOOP
SELECT rl_prompt INTO prompt FROM dblink('dbname=mal', query)
AS t1(rl_prompt varchar);
IF prompt IS NULL THEN
raise EXCEPTION 'Stream ''%'' is closed', sid;
END IF;
IF prompt <> '' THEN
sleep := 0.05; -- reset sleep timer
RETURN prompt;
END IF;
PERFORM pg_sleep(sleep);
IF sleep < 0.5 THEN
sleep := sleep * 1.1; -- backoff
END IF;
END LOOP;
2016-03-26 07:11:40 +03:00
END; $$ LANGUAGE 'plpgsql' STRICT;
CREATE FUNCTION io.wait_flushed(sid integer DEFAULT 1) RETURNS void AS $$
DECLARE
query varchar;
pending integer;
sleep real = 0.05;
BEGIN
query := format('SELECT count(stream_id) FROM io.stream
WHERE stream_id = %L AND data <> ''''', sid);
WHILE true
LOOP
SELECT p INTO pending FROM dblink('dbname=mal', query)
AS t1(p integer);
IF pending = 0 THEN RETURN; END IF;
PERFORM pg_sleep(sleep);
IF sleep < 0.5 THEN
sleep := sleep * 1.1; -- backoff
END IF;
END LOOP;
END; $$ LANGUAGE 'plpgsql' STRICT;