1
1
mirror of https://github.com/dbcli/pgcli.git synced 2024-10-04 17:27:22 +03:00
pgcli/tests/parseutils/test_parseutils.py
2017-06-14 18:12:05 +02:00

268 lines
8.4 KiB
Python

import pytest
from pgcli.packages.parseutils.tables import extract_tables
from pgcli.packages.parseutils.utils import find_prev_keyword, is_open_quote
def test_empty_string():
tables = extract_tables('')
assert tables == ()
def test_simple_select_single_table():
tables = extract_tables('select * from abc')
assert tables == ((None, 'abc', None, False),)
@pytest.mark.parametrize('sql', [
'select * from "abc"."def"',
'select * from abc."def"',
])
def test_simple_select_single_table_schema_qualified_quoted_table(sql):
tables = extract_tables(sql)
assert tables == (('abc', 'def', '"def"', False),)
@pytest.mark.parametrize('sql', [
'select * from abc.def',
'select * from "abc".def',
])
def test_simple_select_single_table_schema_qualified(sql):
tables = extract_tables(sql)
assert tables == (('abc', 'def', None, False),)
def test_simple_select_single_table_double_quoted():
tables = extract_tables('select * from "Abc"')
assert tables == ((None, 'Abc', None, False),)
def test_simple_select_multiple_tables():
tables = extract_tables('select * from abc, def')
assert set(tables) == set([(None, 'abc', None, False),
(None, 'def', None, False)])
def test_simple_select_multiple_tables_double_quoted():
tables = extract_tables('select * from "Abc", "Def"')
assert set(tables) == set([(None, 'Abc', None, False),
(None, 'Def', None, False)])
def test_simple_select_single_table_deouble_quoted_aliased():
tables = extract_tables('select * from "Abc" a')
assert tables == ((None, 'Abc', 'a', False),)
def test_simple_select_multiple_tables_deouble_quoted_aliased():
tables = extract_tables('select * from "Abc" a, "Def" d')
assert set(tables) == set([(None, 'Abc', 'a', False),
(None, 'Def', 'd', False)])
def test_simple_select_multiple_tables_schema_qualified():
tables = extract_tables('select * from abc.def, ghi.jkl')
assert set(tables) == set([('abc', 'def', None, False),
('ghi', 'jkl', None, False)])
def test_simple_select_with_cols_single_table():
tables = extract_tables('select a,b from abc')
assert tables == ((None, 'abc', None, False),)
def test_simple_select_with_cols_single_table_schema_qualified():
tables = extract_tables('select a,b from abc.def')
assert tables == (('abc', 'def', None, False),)
def test_simple_select_with_cols_multiple_tables():
tables = extract_tables('select a,b from abc, def')
assert set(tables) == set([(None, 'abc', None, False),
(None, 'def', None, False)])
def test_simple_select_with_cols_multiple_qualified_tables():
tables = extract_tables('select a,b from abc.def, def.ghi')
assert set(tables) == set([('abc', 'def', None, False),
('def', 'ghi', None, False)])
def test_select_with_hanging_comma_single_table():
tables = extract_tables('select a, from abc')
assert tables == ((None, 'abc', None, False),)
def test_select_with_hanging_comma_multiple_tables():
tables = extract_tables('select a, from abc, def')
assert set(tables) == set([(None, 'abc', None, False),
(None, 'def', None, False)])
def test_select_with_hanging_period_multiple_tables():
tables = extract_tables('SELECT t1. FROM tabl1 t1, tabl2 t2')
assert set(tables) == set([(None, 'tabl1', 't1', False),
(None, 'tabl2', 't2', False)])
def test_simple_insert_single_table():
tables = extract_tables('insert into abc (id, name) values (1, "def")')
# sqlparse mistakenly assigns an alias to the table
# AND mistakenly identifies the field list as
# assert tables == ((None, 'abc', 'abc', False),)
assert tables == ((None, 'abc', 'abc', False),)
@pytest.mark.xfail
def test_simple_insert_single_table_schema_qualified():
tables = extract_tables('insert into abc.def (id, name) values (1, "def")')
assert tables == (('abc', 'def', None, False),)
def test_simple_update_table_no_schema():
tables = extract_tables('update abc set id = 1')
assert tables == ((None, 'abc', None, False),)
def test_simple_update_table_with_schema():
tables = extract_tables('update abc.def set id = 1')
assert tables == (('abc', 'def', None, False),)
@pytest.mark.parametrize('join_type', ['', 'INNER', 'LEFT', 'RIGHT OUTER'])
def test_join_table(join_type):
sql = 'SELECT * FROM abc a {0} JOIN def d ON a.id = d.num'.format(join_type)
tables = extract_tables(sql)
assert set(tables) == set([(None, 'abc', 'a', False),
(None, 'def', 'd', False)])
def test_join_table_schema_qualified():
tables = extract_tables('SELECT * FROM abc.def x JOIN ghi.jkl y ON x.id = y.num')
assert set(tables) == set([('abc', 'def', 'x', False),
('ghi', 'jkl', 'y', False)])
def test_incomplete_join_clause():
sql = '''select a.x, b.y
from abc a join bcd b
on a.id = '''
tables = extract_tables(sql)
assert tables == ((None, 'abc', 'a', False),
(None, 'bcd', 'b', False))
def test_join_as_table():
tables = extract_tables('SELECT * FROM my_table AS m WHERE m.a > 5')
assert tables == ((None, 'my_table', 'm', False),)
def test_multiple_joins():
sql = '''select * from t1
inner join t2 ON
t1.id = t2.t1_id
inner join t3 ON
t2.id = t3.'''
tables = extract_tables(sql)
assert tables == (
(None, 't1', None, False),
(None, 't2', None, False),
(None, 't3', None, False))
def test_subselect_tables():
sql = 'SELECT * FROM (SELECT FROM abc'
tables = extract_tables(sql)
assert tables == ((None, 'abc', None, False),)
@pytest.mark.parametrize('text', ['SELECT * FROM foo.', 'SELECT 123 AS foo'])
def test_extract_no_tables(text):
tables = extract_tables(text)
assert tables == tuple()
@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
def test_simple_function_as_table(arg_list):
tables = extract_tables('SELECT * FROM foo({0})'.format(arg_list))
assert tables == ((None, 'foo', None, True),)
@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
def test_simple_schema_qualified_function_as_table(arg_list):
tables = extract_tables('SELECT * FROM foo.bar({0})'.format(arg_list))
assert tables == (('foo', 'bar', None, True),)
@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
def test_simple_aliased_function_as_table(arg_list):
tables = extract_tables('SELECT * FROM foo({0}) bar'.format(arg_list))
assert tables == ((None, 'foo', 'bar', True),)
def test_simple_table_and_function():
tables = extract_tables('SELECT * FROM foo JOIN bar()')
assert set(tables) == set([(None, 'foo', None, False),
(None, 'bar', None, True)])
def test_complex_table_and_function():
tables = extract_tables('''SELECT * FROM foo.bar baz
JOIN bar.qux(x, y, z) quux''')
assert set(tables) == set([('foo', 'bar', 'baz', False),
('bar', 'qux', 'quux', True)])
def test_find_prev_keyword_using():
q = 'select * from tbl1 inner join tbl2 using (col1, '
kw, q2 = find_prev_keyword(q)
assert kw.value == '(' and q2 == 'select * from tbl1 inner join tbl2 using ('
@pytest.mark.parametrize('sql', [
'select * from foo where bar',
'select * from foo where bar = 1 and baz or ',
'select * from foo where bar = 1 and baz between qux and ',
])
def test_find_prev_keyword_where(sql):
kw, stripped = find_prev_keyword(sql)
assert kw.value == 'where' and stripped == 'select * from foo where'
@pytest.mark.parametrize('sql', [
'create table foo (bar int, baz ',
'select * from foo() as bar (baz '
])
def test_find_prev_keyword_open_parens(sql):
kw, _ = find_prev_keyword(sql)
assert kw.value == '('
@pytest.mark.parametrize('sql', [
'',
'$$ foo $$',
"$$ 'foo' $$",
'$$ "foo" $$',
'$$ $a$ $$',
'$a$ $$ $a$',
'foo bar $$ baz $$',
])
def test_is_open_quote__closed(sql):
assert not is_open_quote(sql)
@pytest.mark.parametrize('sql', [
'$$',
';;;$$',
'foo $$ bar $$; foo $$',
'$$ foo $a$',
"foo 'bar baz",
"$a$ foo ",
'$$ "foo" ',
'$$ $a$ ',
'foo bar $$ baz',
])
def test_is_open_quote__open(sql):
assert is_open_quote(sql)