1
1
mirror of https://github.com/dbcli/pgcli.git synced 2024-09-11 13:56:36 +03:00

Merge pull request #667 from dbcli/koljonen/tests_parametrize_completer

Parametrize completer in tests + deduplicate test code
This commit is contained in:
Amjith Ramanujam 2017-04-21 07:40:13 -07:00 committed by GitHub
commit 1e1d980ae2
4 changed files with 844 additions and 996 deletions

2
pylintrc Normal file
View File

@ -0,0 +1,2 @@
[MESSAGES CONTROL]
disable=missing-docstring,invalid-name

View File

@ -1,26 +1,59 @@
from functools import partial
from itertools import product
from pgcli.packages.parseutils.meta import FunctionMetadata, ForeignKey
from prompt_toolkit.completion import Completion
from functools import partial
from prompt_toolkit.document import Document
from mock import Mock
import pytest
parametrize = pytest.mark.parametrize
qual = ['if_more_than_one_table', 'always']
no_qual = ['if_more_than_one_table', 'never']
def escape(name):
if not name.islower() or name in ('select', 'insert'):
return '"' + name + '"'
return name
escape = lambda name: ('"' + name + '"' if not name.islower() or name in (
'select', 'insert') else name)
def completion(display_meta, text, pos=0):
return Completion(text, start_position=pos,
display_meta=display_meta)
return Completion(text, start_position=pos, display_meta=display_meta)
def get_result(completer, text, position=None):
position = len(text) if position is None else position
return completer.get_completions(
Document(text=text, cursor_position=position), Mock()
)
def result_set(completer, text, position=None):
return set(get_result(completer, text, position))
# The code below is quivalent to
# def schema(text, pos=0):
# return completion('schema', text, pos)
# and so on
schema, table, view, function, column, keyword, datatype, alias, name_join,\
fk_join, join = [partial(completion, display_meta)
for display_meta in('schema', 'table', 'view', 'function', 'column',
'keyword', 'datatype', 'table alias', 'name join', 'fk join', 'join')]
schema = partial(completion, 'schema')
table = partial(completion, 'table')
view = partial(completion, 'view')
function = partial(completion, 'function')
column = partial(completion, 'column')
keyword = partial(completion, 'keyword')
datatype = partial(completion, 'datatype')
alias = partial(completion, 'table alias')
name_join = partial(completion, 'name join')
fk_join = partial(completion, 'fk join')
join = partial(completion, 'join')
def wildcard_expansion(cols, pos=-1):
return Completion(cols, start_position=pos, display_meta='columns',
display = '*')
return Completion(
cols, start_position=pos, display_meta='columns', display='*')
class MetaData(object):
def __init__(self, metadata):
@ -35,38 +68,107 @@ class MetaData(object):
def keywords(self, pos=0):
return [keyword(kw, pos) for kw in self.completer.keywords]
def columns(self, parent, schema='public', typ='tables', pos=0):
def columns(self, tbl, parent='public', typ='tables', pos=0):
if typ == 'functions':
fun = [x for x in self.metadata[typ][schema] if x[0] == parent][0]
fun = [x for x in self.metadata[typ][parent] if x[0] == tbl][0]
cols = fun[1]
else:
cols = self.metadata[typ][schema][parent]
cols = self.metadata[typ][parent][tbl]
return [column(escape(col), pos) for col in cols]
def datatypes(self, schema='public', pos=0):
return [datatype(escape(x), pos)
for x in self.metadata.get('datatypes', {}).get(schema, [])]
def datatypes(self, parent='public', pos=0):
return [
datatype(escape(x), pos)
for x in self.metadata.get('datatypes', {}).get(parent, [])]
def tables(self, schema='public', pos=0):
return [table(escape(x), pos)
for x in self.metadata.get('tables', {}).get(schema, [])]
def tables(self, parent='public', pos=0):
return [
table(escape(x), pos)
for x in self.metadata.get('tables', {}).get(parent, [])]
def views(self, schema='public', pos=0):
return [view(escape(x), pos)
for x in self.metadata.get('views', {}).get(schema, [])]
def views(self, parent='public', pos=0):
return [
view(escape(x), pos)
for x in self.metadata.get('views', {}).get(parent, [])]
def functions(self, schema='public', pos=0):
return [function(escape(x[0] + '()'), pos)
for x in self.metadata.get('functions', {}).get(schema, [])]
def functions(self, parent='public', pos=0):
return [
function(escape(x[0] + '()'), pos)
for x in self.metadata.get('functions', {}).get(parent, [])]
def schemas(self, pos=0):
schemas = set(sch for schs in self.metadata.values() for sch in schs)
return [schema(escape(s), pos=pos) for s in schemas]
def functions_and_keywords(self, parent='public', pos=0):
return (
self.functions(parent, pos) + self.builtin_functions(pos) +
self.keywords(pos)
)
# Note that the filtering parameters here only apply to the columns
def columns_functions_and_keywords(
self, tbl, parent='public', typ='tables', pos=0
):
return (
self.functions_and_keywords(pos=pos) +
self.columns(tbl, parent, typ, pos)
)
def from_clause_items(self, parent='public', pos=0):
return (
self.functions(parent, pos) + self.views(parent, pos) +
self.tables(parent, pos)
)
def schemas_and_from_clause_items(self, parent='public', pos=0):
return self.from_clause_items(parent, pos) + self.schemas(pos)
def types(self, parent='public', pos=0):
return self.datatypes(parent, pos) + self.tables(parent, pos)
@property
def completer(self):
return self.get_completer()
def get_completers(self, casing):
"""
Returns a function taking three bools `casing`, `filtr`, `aliasing` and
the list `qualify`, all defaulting to None.
Returns a list of completers.
These parameters specify the allowed values for the corresponding
completer parameters, `None` meaning any, i.e. (None, None, None, None)
results in all 24 possible completers, whereas e.g.
(True, False, True, ['never']) results in the one completer with
casing, without `search_path` filtering of objects, with table
aliasing, and without column qualification.
"""
def _cfg(_casing, filtr, aliasing, qualify):
cfg = {'settings': {}}
if _casing:
cfg['casing'] = casing
cfg['settings']['search_path_filter'] = filtr
cfg['settings']['generate_aliases'] = aliasing
cfg['settings']['qualify_columns'] = qualify
return cfg
def _cfgs(casing, filtr, aliasing, qualify):
casings = [True, False] if casing is None else [casing]
filtrs = [True, False] if filtr is None else [filtr]
aliases = [True, False] if aliasing is None else [aliasing]
qualifys = qualify or ['always', 'if_more_than_one_table', 'never']
return [
_cfg(*p) for p in product(casings, filtrs, aliases, qualifys)
]
def completers(casing=None, filtr=None, aliasing=None, qualify=None):
get_comp = self.get_completer
return [
get_comp(**c) for c in _cfgs(casing, filtr, aliasing, qualify)
]
return completers
def get_completer(self, settings=None, casing=None):
metadata = self.metadata
from pgcli.pgcompleter import PGCompleter
@ -74,29 +176,32 @@ class MetaData(object):
schemata, tables, tbl_cols, views, view_cols = [], [], [], [], []
for schema, tbls in metadata['tables'].items():
schemata.append(schema)
for sch, tbls in metadata['tables'].items():
schemata.append(sch)
for table, cols in tbls.items():
tables.append((schema, table))
for tbl, cols in tbls.items():
tables.append((sch, tbl))
# Let all columns be text columns
tbl_cols.extend([(schema, table, col, 'text') for col in cols])
tbl_cols.extend([(sch, tbl, col, 'text') for col in cols])
for schema, tbls in metadata.get('views', {}).items():
for view, cols in tbls.items():
views.append((schema, view))
for sch, tbls in metadata.get('views', {}).items():
for tbl, cols in tbls.items():
views.append((sch, tbl))
# Let all columns be text columns
view_cols.extend([(schema, view, col, 'text') for col in cols])
view_cols.extend([(sch, tbl, col, 'text') for col in cols])
functions = [FunctionMetadata(schema, *func_meta)
for schema, funcs in metadata['functions'].items()
for func_meta in funcs]
functions = [
FunctionMetadata(sch, *func_meta)
for sch, funcs in metadata['functions'].items()
for func_meta in funcs]
datatypes = [(schema, datatype)
for schema, datatypes in metadata['datatypes'].items()
for datatype in datatypes]
datatypes = [
(sch, typ)
for sch, datatypes in metadata['datatypes'].items()
for typ in datatypes]
foreignkeys = [ForeignKey(*fk) for fks in metadata['foreignkeys'].values()
foreignkeys = [
ForeignKey(*fk) for fks in metadata['foreignkeys'].values()
for fk in fks]
comp.extend_schemata(schemata)

View File

@ -1,9 +1,8 @@
from __future__ import unicode_literals
import pytest
import itertools
from metadata import (MetaData, alias, name_join, fk_join, join,
schema, table, function, wildcard_expansion, column)
from prompt_toolkit.document import Document
schema, table, function, wildcard_expansion, column,
get_result, result_set, qual, no_qual, parametrize)
metadata = {
'tables': {
@ -63,330 +62,225 @@ metadata = {
testdata = MetaData(metadata)
cased_schemas = [schema(x) for x in ('public', 'blog', 'CUSTOM', '"Custom"')]
@pytest.fixture
def completer():
return testdata.get_completer(settings={'search_path_filter': True})
casing = ('SELECT', 'Orders', 'User_Emails', 'CUSTOM', 'Func1', 'Entries',
'Tags', 'EntryTags', 'EntAccLog',
'EntryID', 'EntryTitle', 'EntryText')
completers = testdata.get_completers(casing)
@pytest.fixture
def completer_with_casing():
return testdata.get_completer(
settings={'search_path_filter': True},
casing=casing
)
@pytest.fixture
def completer_with_aliases():
return testdata.get_completer(
settings={'generate_aliases': True, 'search_path_filter': True}
)
@parametrize('completer', completers(filtr=True, casing=False, qualify=no_qual))
@parametrize('table', ['users', '"users"'])
def test_suggested_column_names_from_shadowed_visible_table(completer, table) :
result = result_set(completer, 'SELECT FROM ' + table, len('SELECT '))
assert result == set(testdata.columns_functions_and_keywords('users'))
@pytest.fixture
def completer_aliases_casing():
return testdata.get_completer(
settings={'generate_aliases': True, 'search_path_filter': True},
casing=casing
)
@pytest.fixture
def completer_all_schemas():
return testdata.get_completer()
@pytest.fixture
def completer_all_schemas_casing():
return testdata.get_completer(casing=casing)
@pytest.fixture
def completer_all_schemas_aliases():
return testdata.get_completer(settings={'generate_aliases': True})
@pytest.fixture
def complete_event():
from mock import Mock
return Mock()
@pytest.mark.parametrize('table', [
'users',
'"users"',
])
def test_suggested_column_names_from_shadowed_visible_table(completer, complete_event, table):
"""
Suggest column and function names when selecting from table
:param completer:
:param complete_event:
:return:
"""
text = 'SELECT FROM ' + table
position = len('SELECT ')
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.columns('users') +
testdata.functions() +
list(testdata.builtin_functions() +
testdata.keywords())
)
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False, qualify=no_qual))
@parametrize('text', [
'SELECT from custom.users',
'WITH users as (SELECT 1 AS foo) SELECT from custom.users',
])
def test_suggested_column_names_from_qualified_shadowed_table(completer, complete_event, text):
position = text.find(' ') + 1
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.columns('users', 'custom') +
testdata.functions() +
list(testdata.builtin_functions() +
testdata.keywords())
)
])
def test_suggested_column_names_from_qualified_shadowed_table(completer, text):
result = result_set(completer, text, position = text.find(' ') + 1)
assert result == set(testdata.columns_functions_and_keywords(
'users', 'custom'
))
@pytest.mark.parametrize('text', [
'WITH users as (SELECT 1 AS foo) SELECT from users',
])
def test_suggested_column_names_from_cte(completer, complete_event, text):
position = text.find(' ') + 1
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set([column('foo')] + testdata.functions() +
list(testdata.builtin_functions() + testdata.keywords())
)
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False, qualify=no_qual))
@parametrize('text', ['WITH users as (SELECT 1 AS foo) SELECT from users',])
def test_suggested_column_names_from_cte(completer, text):
result = result_set(completer, text, text.find(' ') + 1)
assert result == set([column('foo')] + testdata.functions_and_keywords())
@parametrize('completer', completers(casing=False))
@parametrize('text', [
'SELECT * FROM users JOIN custom.shipments ON ',
'''SELECT *
FROM public.users
JOIN custom.shipments ON '''
])
def test_suggested_join_conditions(completer, complete_event, text):
position = len(text)
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set([
def test_suggested_join_conditions(completer, text):
result = result_set(completer, text)
assert result == set([
alias('users'),
alias('shipments'),
name_join('shipments.id = users.id'),
fk_join('shipments.user_id = users.id')])
@pytest.mark.parametrize(('query', 'tbl'), itertools.product((
@parametrize('completer', completers(filtr=True, casing=False, aliasing=False))
@parametrize(('query', 'tbl'), itertools.product((
'SELECT * FROM public.{0} RIGHT OUTER JOIN ',
'''SELECT *
FROM {0}
JOIN '''
), ('users', '"users"', 'Users')))
def test_suggested_joins(completer, complete_event, query, tbl):
text = query.format(tbl)
position = len(text)
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.schemas() + testdata.tables() + [
join('custom.shipments ON shipments.user_id = {0}.id'.format(tbl)),
] + testdata.functions())
def test_suggested_column_names_from_schema_qualifed_table(completer, complete_event):
"""
Suggest column and function names when selecting from a qualified-table
:param completer:
:param complete_event:
:return:
"""
text = 'SELECT from custom.products'
position = len('SELECT ')
result = set(completer.get_completions(
Document(text=text, cursor_position=position), complete_event))
assert set(result) == set(testdata.columns('products', 'custom') + testdata.functions() +
list(testdata.builtin_functions() +
testdata.keywords())
)
def test_suggested_column_names_in_function(completer, complete_event):
"""
Suggest column and function names when selecting multiple
columns from table
:param completer:
:param complete_event:
:return:
"""
text = 'SELECT MAX( from custom.products'
position = len('SELECT MAX(')
result = completer.get_completions(
Document(text=text, cursor_position=position),
complete_event)
assert set(result) == set(testdata.columns('products', 'custom'))
def test_suggested_joins(completer, query, tbl):
result = result_set(completer, query.format(tbl))
assert result == set(
testdata.schemas_and_from_clause_items() +
[join('custom.shipments ON shipments.user_id = {0}.id'.format(tbl))]
)
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False, qualify=no_qual))
def test_suggested_column_names_from_schema_qualifed_table(completer):
result = result_set(
completer, 'SELECT from custom.products', len('SELECT ')
)
assert result == set(testdata.columns_functions_and_keywords(
'products', 'custom'
))
@parametrize('completer', completers(filtr=True, casing=False, qualify=no_qual))
def test_suggested_column_names_in_function(completer):
result = result_set(
completer, 'SELECT MAX( from custom.products', len('SELECT MAX(')
)
assert result == set(testdata.columns('products', 'custom'))
@parametrize('completer', completers(casing=False, aliasing=False))
@parametrize('text', [
'SELECT * FROM Custom.',
'SELECT * FROM custom.',
'SELECT * FROM "custom".',
])
@pytest.mark.parametrize('use_leading_double_quote', [False, True])
def test_suggested_table_names_with_schema_dot(completer, complete_event,
text, use_leading_double_quote):
@parametrize('use_leading_double_quote', [False, True])
def test_suggested_table_names_with_schema_dot(
completer, text, use_leading_double_quote
):
if use_leading_double_quote:
text += '"'
start_pos = -1
start_position = -1
else:
start_pos = 0
start_position = 0
position = len(text)
result = completer.get_completions(
Document(text=text, cursor_position=position), complete_event)
assert set(result) == set(testdata.tables('custom', start_pos)
+ testdata.functions('custom', start_pos))
result = result_set(completer, text)
assert result == set(testdata.from_clause_items('custom', start_position))
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(casing=False, aliasing=False))
@parametrize('text', [
'SELECT * FROM "Custom".',
])
@pytest.mark.parametrize('use_leading_double_quote', [False, True])
def test_suggested_table_names_with_schema_dot2(completer, complete_event,
text, use_leading_double_quote):
@parametrize('use_leading_double_quote', [False, True])
def test_suggested_table_names_with_schema_dot2(
completer, text, use_leading_double_quote
):
if use_leading_double_quote:
text += '"'
start_pos = -1
start_position = -1
else:
start_pos = 0
start_position = 0
position = len(text)
result = completer.get_completions(
Document(text=text, cursor_position=position), complete_event)
assert set(result) == set(testdata.functions('Custom', start_pos) +
testdata.tables('Custom', start_pos))
result = result_set(completer, text)
assert result == set(testdata.from_clause_items('Custom', start_position))
def test_suggested_column_names_with_qualified_alias(completer, complete_event):
"""
Suggest column names on table alias and dot
:param completer:
:param complete_event:
:return:
"""
text = 'SELECT p. from custom.products p'
position = len('SELECT p.')
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.columns('products', 'custom'))
def test_suggested_multiple_column_names(completer, complete_event):
"""
Suggest column and function names when selecting multiple
columns from table
:param completer:
:param complete_event:
:return:
"""
text = 'SELECT id, from custom.products'
position = len('SELECT id, ')
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.columns('products', 'custom') +
testdata.functions() +
list(testdata.builtin_functions() +
testdata.keywords())
)
@parametrize('completer', completers(filtr=True, casing=False))
def test_suggested_column_names_with_qualified_alias(completer):
result = result_set(
completer, 'SELECT p. from custom.products p', len('SELECT p.')
)
assert result == set(testdata.columns('products', 'custom'))
def test_suggested_multiple_column_names_with_alias(completer, complete_event):
"""
Suggest column names on table alias and dot
when selecting multiple columns from table
:param completer:
:param complete_event:
:return:
"""
text = 'SELECT p.id, p. from custom.products p'
position = len('SELECT u.id, u.')
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.columns('products', 'custom'))
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False, qualify=no_qual))
def test_suggested_multiple_column_names(completer):
result = result_set(
completer, 'SELECT id, from custom.products', len('SELECT id, ')
)
assert result == set(testdata.columns_functions_and_keywords(
'products', 'custom'
))
@parametrize('completer', completers(filtr=True, casing=False))
def test_suggested_multiple_column_names_with_alias(completer):
result = result_set(
completer,
'SELECT p.id, p. from custom.products p',
len('SELECT u.id, u.')
)
assert result == set(testdata.columns('products', 'custom'))
@parametrize('completer', completers(filtr=True, casing=False))
@parametrize('text', [
'SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON ',
'SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON JOIN public.orders z ON z.id > y.id'
])
def test_suggestions_after_on(completer, complete_event, text):
def test_suggestions_after_on(completer, text):
position = len('SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON ')
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set([
result = result_set(completer, text, position)
assert result == set([
alias('x'),
alias('y'),
name_join('y.price = x.price'),
name_join('y.product_name = x.product_name'),
name_join('y.id = x.id')])
def test_suggested_aliases_after_on_right_side(completer, complete_event):
@parametrize('completer', completers())
def test_suggested_aliases_after_on_right_side(completer):
text = 'SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON x.id = '
position = len(text)
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set([
alias('x'),
alias('y')])
result = result_set(completer, text)
assert result == set([alias('x'), alias('y')])
def test_table_names_after_from(completer, complete_event):
@parametrize('completer', completers(filtr=True, casing=False, aliasing=False))
def test_table_names_after_from(completer):
text = 'SELECT * FROM '
position = len('SELECT * FROM ')
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.schemas() + testdata.tables()
+ testdata.functions())
result = result_set(completer, text)
assert result == set(testdata.schemas_and_from_clause_items())
def test_schema_qualified_function_name(completer, complete_event):
@parametrize('completer', completers(filtr=True, casing=False))
def test_schema_qualified_function_name(completer):
text = 'SELECT custom.func'
postion = len(text)
result = set(completer.get_completions(
Document(text=text, cursor_position=postion), complete_event))
result = result_set(completer, text)
assert result == set([
function('func3()', -len('func')),
function('set_returning_func()', -len('func'))])
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False))
@parametrize('text', [
'SELECT 1::custom.',
'CREATE TABLE foo (bar custom.',
'CREATE FUNCTION foo (bar INT, baz custom.',
'ALTER TABLE foo ALTER COLUMN bar TYPE custom.',
])
def test_schema_qualified_type_name(text, completer, complete_event):
pos = len(text)
result = completer.get_completions(
Document(text=text, cursor_position=pos), complete_event)
assert set(result) == set(testdata.datatypes('custom')
+ testdata.tables('custom'))
def test_schema_qualified_type_name(completer, text):
result = result_set(completer, text)
assert result == set(testdata.types('custom'))
def test_suggest_columns_from_aliased_set_returning_function(completer, complete_event):
sql = 'select f. from custom.set_returning_func() f'
pos = len('select f.')
result = completer.get_completions(Document(text=sql, cursor_position=pos),
complete_event)
assert set(result) == set(
@parametrize('completer', completers(filtr=True, casing=False))
def test_suggest_columns_from_aliased_set_returning_function(completer):
result = result_set(
completer,
'select f. from custom.set_returning_func() f',
len('select f.')
)
assert result == set(
testdata.columns('set_returning_func', 'custom', 'functions'))
@pytest.mark.parametrize('text', [
@parametrize('completer',completers(filtr=True, casing=False, qualify=no_qual))
@parametrize('text', [
'SELECT * FROM custom.set_returning_func()',
'SELECT * FROM Custom.set_returning_func()',
'SELECT * FROM Custom.Set_Returning_Func()'
])
def test_wildcard_column_expansion_with_function(completer, complete_event, text):
pos = len('SELECT *')
def test_wildcard_column_expansion_with_function(completer, text):
position = len('SELECT *')
completions = completer.get_completions(
Document(text=text, cursor_position=pos), complete_event)
completions = get_result(completer, text, position)
col_list = 'x'
expected = [wildcard_expansion(col_list)]
@ -394,19 +288,21 @@ def test_wildcard_column_expansion_with_function(completer, complete_event, text
assert expected == completions
def test_wildcard_column_expansion_with_alias_qualifier(completer, complete_event):
sql = 'SELECT p.* FROM custom.products p'
pos = len('SELECT p.*')
@parametrize('completer', completers(filtr=True, casing=False))
def test_wildcard_column_expansion_with_alias_qualifier(completer):
text = 'SELECT p.* FROM custom.products p'
position = len('SELECT p.*')
completions = completer.get_completions(
Document(text=sql, cursor_position=pos), complete_event)
completions = get_result(completer, text, position)
col_list = 'id, p.product_name, p.price'
expected = [wildcard_expansion(col_list)]
assert expected == completions
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False))
@parametrize('text', [
'''
SELECT count(1) FROM users;
CREATE FUNCTION foo(custom.products _products) returns custom.shipments
@ -435,32 +331,33 @@ def test_wildcard_column_expansion_with_alias_qualifier(completer, complete_even
'INSERT INTO orders (*)',
'INSERT INTO Orders (*)'
])
def test_wildcard_column_expansion_with_insert(completer, complete_event, text):
pos = text.index('*') + 1
completions = completer.get_completions(
Document(text=text, cursor_position=pos), complete_event)
def test_wildcard_column_expansion_with_insert(completer, text):
position = text.index('*') + 1
completions = get_result(completer, text, position)
expected = [wildcard_expansion('id, ordered_date, status')]
assert expected == completions
def test_wildcard_column_expansion_with_table_qualifier(completer, complete_event):
sql = 'SELECT "select".* FROM public."select"'
pos = len('SELECT "select".*')
completions = completer.get_completions(
Document(text=sql, cursor_position=pos), complete_event)
@parametrize('completer', completers(filtr=True, casing=False))
def test_wildcard_column_expansion_with_table_qualifier(completer):
text = 'SELECT "select".* FROM public."select"'
position = len('SELECT "select".*')
completions = get_result(completer, text, position)
col_list = 'id, "select"."insert", "select"."ABC"'
expected = [wildcard_expansion(col_list)]
assert expected == completions
def test_wildcard_column_expansion_with_two_tables(completer, complete_event):
sql = 'SELECT * FROM public."select" JOIN custom.users ON true'
pos = len('SELECT *')
completions = completer.get_completions(
Document(text=sql, cursor_position=pos), complete_event)
@parametrize('completer',completers(filtr=True, casing=False, qualify=qual))
def test_wildcard_column_expansion_with_two_tables(completer):
text = 'SELECT * FROM public."select" JOIN custom.users ON true'
position = len('SELECT *')
completions = get_result(completer, text, position)
cols = ('"select".id, "select"."insert", "select"."ABC", '
'users.id, users.phone_number')
@ -468,19 +365,21 @@ def test_wildcard_column_expansion_with_two_tables(completer, complete_event):
assert completions == expected
def test_wildcard_column_expansion_with_two_tables_and_parent(completer, complete_event):
sql = 'SELECT "select".* FROM public."select" JOIN custom.users u ON true'
pos = len('SELECT "select".*')
@parametrize('completer', completers(filtr=True, casing=False))
def test_wildcard_column_expansion_with_two_tables_and_parent(completer):
text = 'SELECT "select".* FROM public."select" JOIN custom.users u ON true'
position = len('SELECT "select".*')
completions = completer.get_completions(
Document(text=sql, cursor_position=pos), complete_event)
completions = get_result(completer, text, position)
col_list = 'id, "select"."insert", "select"."ABC"'
expected = [wildcard_expansion(col_list)]
assert expected == completions
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False))
@parametrize('text', [
'SELECT U. FROM custom.Users U',
'SELECT U. FROM custom.USERS U',
'SELECT U. FROM custom.users U',
@ -488,200 +387,190 @@ def test_wildcard_column_expansion_with_two_tables_and_parent(completer, complet
'SELECT U. FROM "custom".USERS U',
'SELECT U. FROM "custom".users U'
])
def test_suggest_columns_from_unquoted_table(completer, complete_event, text):
pos = len('SELECT U.')
result = completer.get_completions(Document(text=text, cursor_position=pos),
complete_event)
assert set(result) == set(testdata.columns('users', 'custom'))
def test_suggest_columns_from_unquoted_table(completer, text):
position = len('SELECT U.')
result = result_set(completer, text, position)
assert result == set(testdata.columns('users', 'custom'))
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False))
@parametrize('text', [
'SELECT U. FROM custom."Users" U',
'SELECT U. FROM "custom"."Users" U'
])
def test_suggest_columns_from_quoted_table(completer, complete_event, text):
pos = len('SELECT U.')
result = completer.get_completions(Document(text=text, cursor_position=pos),
complete_event)
assert set(result) == set(testdata.columns('Users', 'custom'))
def test_suggest_columns_from_quoted_table(completer, text):
position = len('SELECT U.')
result = result_set(completer, text, position)
assert result == set(testdata.columns('Users', 'custom'))
texts = ['SELECT * FROM ', 'SELECT * FROM public.Orders O CROSS JOIN ']
@pytest.mark.parametrize('text', texts)
def test_schema_or_visible_table_completion(completer, complete_event, text):
result = completer.get_completions(Document(text=text), complete_event)
assert set(result) == set(testdata.schemas()
+ testdata.views() + testdata.tables() + testdata.functions())
result = completer.get_completions(Document(text=text), complete_event)
@pytest.mark.parametrize('text', texts)
def test_table_aliases(completer_with_aliases, complete_event, text):
result = completer_with_aliases.get_completions(
Document(text=text), complete_event)
assert set(result) == set(testdata.schemas() + [
@parametrize('completer', completers(filtr=True, casing=False, aliasing=False))
@parametrize('text', texts)
def test_schema_or_visible_table_completion(completer, text):
result = result_set(completer, text)
assert result == set(testdata.schemas_and_from_clause_items())
@parametrize('completer', completers(aliasing=True, casing=False, filtr=True))
@parametrize('text', texts)
def test_table_aliases(completer, text):
result = result_set(completer, text)
assert result == set(testdata.schemas() + [
table('users u'),
table('orders o' if text == 'SELECT * FROM ' else 'orders o2'),
table('"select" s'),
function('func1() f'),
function('func2() f')])
@pytest.mark.parametrize('text', texts)
def test_aliases_with_casing(completer_aliases_casing, complete_event, text):
result = completer_aliases_casing.get_completions(
Document(text=text), complete_event)
assert set(result) == set(cased_schemas + [
@parametrize('completer', completers(aliasing=True, casing=True, filtr=True))
@parametrize('text', texts)
def test_aliases_with_casing(completer, text):
result = result_set(completer, text)
assert result == set(cased_schemas + [
table('users u'),
table('Orders O' if text == 'SELECT * FROM ' else 'Orders O2'),
table('"select" s'),
function('Func1() F'),
function('func2() f')])
@pytest.mark.parametrize('text', texts)
def test_table_casing(completer_with_casing, complete_event, text):
result = completer_with_casing.get_completions(
Document(text=text), complete_event)
assert set(result) == set(cased_schemas + [
@parametrize('completer', completers(aliasing=False, casing=True, filtr=True))
@parametrize('text', texts)
def test_table_casing(completer, text):
result = result_set(completer, text)
assert result == set(cased_schemas + [
table('users'),
table('Orders'),
table('"select"'),
function('Func1()'),
function('func2()')])
def test_alias_search_without_aliases2(completer_with_casing, complete_event):
@parametrize('completer', completers(aliasing=False, casing=True))
def test_alias_search_without_aliases2(completer):
text = 'SELECT * FROM blog.et'
result = completer_with_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == table('EntryTags', -2)
def test_alias_search_without_aliases1(completer_with_casing, complete_event):
@parametrize('completer', completers(aliasing=False, casing=True))
def test_alias_search_without_aliases1(completer):
text = 'SELECT * FROM blog.e'
result = completer_with_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == table('Entries', -1)
def test_alias_search_with_aliases2(completer_aliases_casing, complete_event):
@parametrize('completer', completers(aliasing=True, casing=True))
def test_alias_search_with_aliases2(completer):
text = 'SELECT * FROM blog.et'
result = completer_aliases_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == table('EntryTags ET', -2)
def test_alias_search_with_aliases1(completer_aliases_casing, complete_event):
@parametrize('completer', completers(aliasing=True, casing=True))
def test_alias_search_with_aliases1(completer):
text = 'SELECT * FROM blog.e'
result = completer_aliases_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == table('Entries E', -1)
def test_join_alias_search_with_aliases1(completer_aliases_casing,
complete_event):
@parametrize('completer', completers(aliasing=True, casing=True))
def test_join_alias_search_with_aliases1(completer):
text = 'SELECT * FROM blog.Entries E JOIN blog.e'
result = completer_aliases_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[:2] == [table('Entries E2', -1), join(
'EntAccLog EAL ON EAL.EntryID = E.EntryID', -1)]
def test_join_alias_search_without_aliases1(completer_with_casing,
complete_event):
@parametrize('completer', completers(aliasing=False, casing=True))
def test_join_alias_search_without_aliases1(completer):
text = 'SELECT * FROM blog.Entries JOIN blog.e'
result = completer_with_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[:2] == [table('Entries', -1), join(
'EntAccLog ON EntAccLog.EntryID = Entries.EntryID', -1)]
def test_join_alias_search_with_aliases2(completer_aliases_casing,
complete_event):
@parametrize('completer', completers(aliasing=True, casing=True))
def test_join_alias_search_with_aliases2(completer):
text = 'SELECT * FROM blog.Entries E JOIN blog.et'
result = completer_aliases_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == join('EntryTags ET ON ET.EntryID = E.EntryID', -2)
def test_join_alias_search_without_aliases2(completer_with_casing,
complete_event):
@parametrize('completer', completers(aliasing=False, casing=True))
def test_join_alias_search_without_aliases2(completer):
text = 'SELECT * FROM blog.Entries JOIN blog.et'
result = completer_with_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == join(
'EntryTags ON EntryTags.EntryID = Entries.EntryID', -2)
def test_function_alias_search_without_aliases(completer_with_casing,
complete_event):
@parametrize('completer', completers())
def test_function_alias_search_without_aliases(completer):
text = 'SELECT blog.ees'
result = completer_with_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == function('extract_entry_symbols()', -3)
def test_function_alias_search_with_aliases(completer_aliases_casing,
complete_event):
@parametrize('completer', completers())
def test_function_alias_search_with_aliases(completer):
text = 'SELECT blog.ee'
result = completer_aliases_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == function('enter_entry()', -2)
def test_column_alias_search(completer_aliases_casing, complete_event):
text = 'SELECT et FROM blog.Entries E'
result = completer_aliases_casing.get_completions(
Document(text, cursor_position=len('SELECT et')), complete_event)
@parametrize('completer',completers(filtr=True, casing=True, qualify=no_qual))
def test_column_alias_search(completer):
result = get_result(
completer, 'SELECT et FROM blog.Entries E', len('SELECT et')
)
cols = ('EntryText', 'EntryTitle', 'EntryID')
assert result[:3] == [column(c, -2) for c in cols]
def test_column_alias_search_qualified(completer_aliases_casing,
complete_event):
text = 'SELECT E.ei FROM blog.Entries E'
result = completer_aliases_casing.get_completions(
Document(text, cursor_position=len('SELECT E.ei')), complete_event)
@parametrize('completer', completers(casing=True))
def test_column_alias_search_qualified(completer):
result = get_result(
completer, 'SELECT E.ei FROM blog.Entries E', len('SELECT E.ei')
)
cols = ('EntryID', 'EntryTitle')
assert result[:3] == [column(c, -2) for c in cols]
def test_schema_object_order(completer_all_schemas, complete_event):
text = 'SELECT * FROM u'
position = len('SELECT * FROM u')
result = completer_all_schemas.get_completions(
Document(text=text, cursor_position=position),
complete_event
)
@parametrize('completer', completers(casing=False, filtr=False, aliasing=False))
def test_schema_object_order(completer):
result = get_result(completer, 'SELECT * FROM u')
assert result[:3] == [
table(t, pos=-1) for t in ('users', 'custom."Users"', 'custom.users')
]
def test_all_schema_objects(completer_all_schemas, complete_event):
text = 'SELECT * FROM '
position = len('SELECT * FROM ')
result = set(
completer_all_schemas.get_completions(
Document(text=text, cursor_position=position),
complete_event
)
)
@parametrize('completer', completers(casing=False, filtr=False, aliasing=False))
def test_all_schema_objects(completer):
text = ('SELECT * FROM ')
result = result_set(completer, text)
assert result >= set(
[table(x) for x in ('orders', '"select"', 'custom.shipments')]
+ [function(x+'()') for x in ('func2', 'custom.func3')]
)
def test_all_schema_objects_with_casing(
completer_all_schemas_casing, complete_event
):
@parametrize('completer', completers(filtr=False, aliasing=False, casing=True))
def test_all_schema_objects_with_casing(completer):
text = 'SELECT * FROM '
position = len('SELECT * FROM ')
result = set(
completer_all_schemas_casing.get_completions(
Document(text=text, cursor_position=position),
complete_event
)
)
result = result_set(completer, text)
assert result >= set(
[table(x) for x in ('Orders', '"select"', 'CUSTOM.shipments')]
+ [function(x+'()') for x in ('func2', 'CUSTOM.func3')]
)
def test_all_schema_objects_with_aliases(
completer_all_schemas_aliases, complete_event
):
text = 'SELECT * FROM '
position = len('SELECT * FROM ')
result = set(
completer_all_schemas_aliases.get_completions(
Document(text=text, cursor_position=position),
complete_event
)
)
@parametrize('completer', completers(casing=False, filtr=False, aliasing=True))
def test_all_schema_objects_with_aliases(completer):
text = ('SELECT * FROM ')
result = result_set(completer, text)
assert result >= set(
[table(x) for x in ('orders o', '"select" s', 'custom.shipments s')]
+ [function(x) for x in ('func2() f', 'custom.func3() f')]

File diff suppressed because it is too large Load Diff