1
1
mirror of https://github.com/dbcli/pgcli.git synced 2024-10-06 02:07:53 +03:00

extract_tables returns tuples, not lists

This commit is contained in:
Darik Gamble 2015-11-16 07:28:22 -05:00
parent 7ee8a90358
commit 5b5e861258
2 changed files with 43 additions and 43 deletions

View File

@ -161,7 +161,7 @@ def extract_tables(sql):
"""
parsed = sqlparse.parse(sql)
if not parsed:
return []
return ()
# INSERT statements must stop looking for tables at the sign of first
# Punctuation. eg: INSERT INTO abc (col1, col2) VALUES (1, 2)
@ -177,7 +177,7 @@ def extract_tables(sql):
# to have is_function=True
identifiers = extract_table_identifiers(stream,
allow_functions=not insert_stmt)
return list(identifiers)
return tuple(identifiers)
def find_prev_keyword(sql):

View File

@ -4,142 +4,142 @@ from pgcli.packages.parseutils import find_prev_keyword, is_open_quote
def test_empty_string():
tables = extract_tables('')
assert tables == []
assert tables == ()
def test_simple_select_single_table():
tables = extract_tables('select * from abc')
assert tables == [(None, 'abc', None, False)]
assert tables == ((None, 'abc', None, False),)
def test_simple_select_single_table_schema_qualified():
tables = extract_tables('select * from abc.def')
assert tables == [('abc', 'def', None, False)]
assert tables == (('abc', 'def', None, False),)
def test_simple_select_single_table_double_quoted():
tables = extract_tables('select * from "Abc"')
assert tables == [(None, 'Abc', None, False)]
assert tables == ((None, 'Abc', None, False),)
def test_simple_select_multiple_tables():
tables = extract_tables('select * from abc, def')
assert sorted(tables) == [(None, 'abc', None, False),
(None, 'def', None, False)]
assert set(tables) == set([(None, 'abc', None, False),
(None, 'def', None, False)])
def test_simple_select_multiple_tables_double_quoted():
tables = extract_tables('select * from "Abc", "Def"')
assert tables == [(None, 'Abc', None, False),
(None, 'Def', None, False)]
assert set(tables) == set([(None, 'Abc', None, False),
(None, 'Def', None, False)])
def test_simple_select_single_table_deouble_quoted_aliased():
tables = extract_tables('select * from "Abc" a')
assert tables == [(None, 'Abc', 'a', False)]
assert tables == ((None, 'Abc', 'a', False),)
def test_simple_select_multiple_tables_deouble_quoted_aliased():
tables = extract_tables('select * from "Abc" a, "Def" d')
assert tables == [(None, 'Abc', 'a', False),
(None, 'Def', 'd', False)]
assert set(tables) == set([(None, 'Abc', 'a', False),
(None, 'Def', 'd', False)])
def test_simple_select_multiple_tables_schema_qualified():
tables = extract_tables('select * from abc.def, ghi.jkl')
assert sorted(tables) == [('abc', 'def', None, False),
('ghi', 'jkl', None, False)]
assert set(tables) == set([('abc', 'def', None, False),
('ghi', 'jkl', None, False)])
def test_simple_select_with_cols_single_table():
tables = extract_tables('select a,b from abc')
assert tables == [(None, 'abc', None, False)]
assert tables == ((None, 'abc', None, False),)
def test_simple_select_with_cols_single_table_schema_qualified():
tables = extract_tables('select a,b from abc.def')
assert tables == [('abc', 'def', None, False)]
assert tables == (('abc', 'def', None, False),)
def test_simple_select_with_cols_multiple_tables():
tables = extract_tables('select a,b from abc, def')
assert sorted(tables) == [(None, 'abc', None, False),
(None, 'def', None, False)]
assert set(tables) == set([(None, 'abc', None, False),
(None, 'def', None, False)])
def test_simple_select_with_cols_multiple_tables():
tables = extract_tables('select a,b from abc.def, def.ghi')
assert sorted(tables) == [('abc', 'def', None, False),
('def', 'ghi', None, False)]
assert set(tables) == set([('abc', 'def', None, False),
('def', 'ghi', None, False)])
def test_select_with_hanging_comma_single_table():
tables = extract_tables('select a, from abc')
assert tables == [(None, 'abc', None, False)]
assert tables == ((None, 'abc', None, False),)
def test_select_with_hanging_comma_multiple_tables():
tables = extract_tables('select a, from abc, def')
assert sorted(tables) == [(None, 'abc', None, False),
(None, 'def', None, False)]
assert set(tables) == set([(None, 'abc', None, False),
(None, 'def', None, False)])
def test_select_with_hanging_period_multiple_tables():
tables = extract_tables('SELECT t1. FROM tabl1 t1, tabl2 t2')
assert sorted(tables) == [(None, 'tabl1', 't1', False),
(None, 'tabl2', 't2', False)]
assert set(tables) == set([(None, 'tabl1', 't1', False),
(None, 'tabl2', 't2', False)])
def test_simple_insert_single_table():
tables = extract_tables('insert into abc (id, name) values (1, "def")')
# sqlparse mistakenly assigns an alias to the table
# AND mistakenly identifies the field list as
# assert tables == [(None, 'abc', None, False)]
# assert tables == ((None, 'abc', None, False),)
assert tables == [(None, 'abc', 'abc', False)]
assert tables == ((None, 'abc', 'abc', False),)
@pytest.mark.xfail
def test_simple_insert_single_table_schema_qualified():
tables = extract_tables('insert into abc.def (id, name) values (1, "def")')
assert tables == [('abc', 'def', None, False)]
assert tables == (('abc', 'def', None, False),)
def test_simple_update_table():
tables = extract_tables('update abc set id = 1')
assert tables == [(None, 'abc', None, False)]
assert tables == ((None, 'abc', None, False),)
def test_simple_update_table():
tables = extract_tables('update abc.def set id = 1')
assert tables == [('abc', 'def', None, False)]
assert tables == (('abc', 'def', None, False),)
@pytest.mark.parametrize('join_type', ['', 'INNER', 'LEFT', 'RIGHT OUTER'])
def test_join_table(join_type):
sql = 'SELECT * FROM abc a {0} JOIN def d ON a.id = d.num'.format(join_type)
tables = extract_tables(sql)
assert sorted(tables) == [(None, 'abc', 'a', False),
(None, 'def', 'd', False)]
assert set(tables) == set([(None, 'abc', 'a', False),
(None, 'def', 'd', False)])
def test_join_table_schema_qualified():
tables = extract_tables('SELECT * FROM abc.def x JOIN ghi.jkl y ON x.id = y.num')
assert tables == [('abc', 'def', 'x', False),
('ghi', 'jkl', 'y', False)]
assert set(tables) == set([('abc', 'def', 'x', False),
('ghi', 'jkl', 'y', False)])
def test_join_as_table():
tables = extract_tables('SELECT * FROM my_table AS m WHERE m.a > 5')
assert tables == [(None, 'my_table', 'm', False)]
assert tables == ((None, 'my_table', 'm', False),)
@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
def test_simple_function_as_table(arg_list):
tables = extract_tables('SELECT * FROM foo({0})'.format(arg_list))
assert tables == [(None, 'foo', None, True)]
assert tables == ((None, 'foo', None, True),)
@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
def test_simple_schema_qualified_function_as_table(arg_list):
tables = extract_tables('SELECT * FROM foo.bar({0})'.format(arg_list))
assert tables == [('foo', 'bar', None, True)]
assert tables == (('foo', 'bar', None, True),)
@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
def test_simple_aliased_function_as_table(arg_list):
tables = extract_tables('SELECT * FROM foo({0}) bar'.format(arg_list))
assert tables == [(None, 'foo', 'bar', True)]
assert tables == ((None, 'foo', 'bar', True),)
def test_simple_table_and_function():
tables = extract_tables('SELECT * FROM foo JOIN bar()')
assert tables == [(None, 'foo', None, False),
(None, 'bar', None, True)]
assert set(tables) == set([(None, 'foo', None, False),
(None, 'bar', None, True)])
def test_complex_table_and_function():
tables = extract_tables('''SELECT * FROM foo.bar baz
JOIN bar.qux(x, y, z) quux''')
assert tables == [('foo', 'bar', 'baz', False),
('bar', 'qux', 'quux', True)]
assert set(tables) == set([('foo', 'bar', 'baz', False),
('bar', 'qux', 'quux', True)])