1
1
mirror of https://github.com/dbcli/pgcli.git synced 2024-10-06 10:17:15 +03:00

Add fallback queries so that redshift autocomplete works

This commit is contained in:
Timothy Cleaver 2016-02-02 12:15:19 +10:00
parent 9d87e8fd6f
commit 463f5e4e10

View File

@ -1,5 +1,7 @@
import traceback
import logging
import pprint
import itertools
import psycopg2
import psycopg2.extras
import psycopg2.extensions as ext
@ -85,37 +87,37 @@ class PGExecute(object):
ORDER BY 1 '''
tables_query = '''
SELECT n.nspname schema_name,
SELECT n.nspname schema_name,
c.relname table_name
FROM pg_catalog.pg_class c
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n
ON n.oid = c.relnamespace
WHERE c.relkind = ANY(%s)
WHERE c.relkind = ANY(%s)
ORDER BY 1,2;'''
columns_query = '''
SELECT nsp.nspname schema_name,
SELECT nsp.nspname schema_name,
cls.relname table_name,
att.attname column_name
FROM pg_catalog.pg_attribute att
FROM pg_catalog.pg_attribute att
INNER JOIN pg_catalog.pg_class cls
ON att.attrelid = cls.oid
INNER JOIN pg_catalog.pg_namespace nsp
ON cls.relnamespace = nsp.oid
WHERE cls.relkind = ANY(%s)
WHERE cls.relkind = ANY(%s)
AND NOT att.attisdropped
AND att.attnum > 0
ORDER BY 1, 2, 3'''
functions_query = '''
SELECT n.nspname schema_name,
SELECT n.nspname schema_name,
p.proname func_name,
pg_catalog.pg_get_function_arguments(p.oid) arg_list,
pg_catalog.pg_get_function_result(p.oid) return_type,
p.proisagg is_aggregate,
p.proiswindow is_window,
p.proretset is_set_returning
FROM pg_catalog.pg_proc p
FROM pg_catalog.pg_proc p
INNER JOIN pg_catalog.pg_namespace n
ON n.oid = p.pronamespace
ORDER BY 1, 2'''
@ -305,10 +307,18 @@ class PGExecute(object):
def search_path(self):
"""Returns the current search path as a list of schema names"""
with self.conn.cursor() as cur:
_logger.debug('Search path query. sql: %r', self.search_path_query)
cur.execute(self.search_path_query)
return [x[0] for x in cur.fetchall()]
try:
with self.conn.cursor() as cur:
_logger.debug('Search path query. sql: %r', self.search_path_query)
cur.execute(self.search_path_query)
return [x[0] for x in cur.fetchall()]
except psycopg2.ProgrammingError:
fallback = 'SELECT * FROM current_schemas(true)'
with self.conn.cursor() as cur:
_logger.debug('Search path query. sql: %r', fallback)
cur.execute(fallback)
return cur.fetchone()[0]
def schemata(self):
"""Returns a list of schema names in the database"""
@ -374,25 +384,88 @@ class PGExecute(object):
yield row
def databases(self):
with self.conn.cursor() as cur:
_logger.debug('Databases Query. sql: %r', self.databases_query)
cur.execute(self.databases_query)
return [x[0] for x in cur.fetchall()]
try:
with self.conn.cursor() as cur:
_logger.debug('Databases Query. sql: %r', self.databases_query)
cur.execute(self.databases_query)
return [x[0] for x in cur.fetchall()]
except psycopg2.ProgrammingError:
fallback = '''
SELECT d.datname as "Name"
FROM pg_catalog.pg_database d
ORDER BY 1
'''
with self.conn.cursor() as cur:
_logger.debug('Databases Query. sql: %r', fallback)
cur.execute(fallback)
return [x[0] for x in cur.fetchall()]
def functions(self):
"""Yields FunctionMetadata named tuples"""
with self.conn.cursor() as cur:
_logger.debug('Functions Query. sql: %r', self.functions_query)
cur.execute(self.functions_query)
for row in cur:
yield FunctionMetadata(*row)
try:
with self.conn.cursor() as cur:
_logger.debug('Functions Query. sql: %r', self.functions_query)
cur.execute(self.functions_query)
for row in cur:
yield FunctionMetadata(*row)
except psycopg2.ProgrammingError:
fallback = '''
SELECT n.nspname schema_name,
p.proname func_name,
p.proargnames,
oidvectortypes(p.proargtypes) proargtypes,
t.typname return_type,
p.proisagg is_aggregate,
false is_window,
p.proretset is_set_returning
FROM pg_catalog.pg_proc p
INNER JOIN pg_catalog.pg_namespace n
ON n.oid = p.pronamespace
INNER JOIN pg_catalog.pg_type t
ON p.prorettype = t.oid
ORDER BY 1, 2'''
with self.conn.cursor() as cur:
_logger.debug('Functions Query. sql: %r', fallback)
cur.execute(fallback)
for row in cur:
_logger.debug(pprint.pformat(row))
names = row[2] if row[2] is not None else []
args = itertools.izip_longest(names, row[3].split(', '), '')
_logger.debug(list(args))
typed_args = [f[0] + ', ' + f[1] for f in args]
arg_list = ', '.join(typed_args)
_logger.debug(arg_list)
yield FunctionMetadata(row[0], row[1], arg_list, row[4], row[5], row[6], row[7])
def datatypes(self):
"""Yields tuples of (schema_name, type_name)"""
try:
with self.conn.cursor() as cur:
_logger.debug('Datatypes Query. sql: %r', self.datatypes_query)
cur.execute(self.datatypes_query)
for row in cur:
yield row
except psycopg2.ProgrammingError:
fallback = '''
SELECT n.nspname AS "schema",
pg_catalog.format_type(t.oid, NULL) AS "name",
pg_catalog.obj_description(t.oid, 'pg_type') AS "description"
FROM pg_catalog.pg_type t
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
WHERE (t.typrelid = 0 OR (SELECT c.relkind = 'c' FROM pg_catalog.pg_class c WHERE c.oid = t.typrelid))
AND t.typname !~ '^_'
AND n.nspname <> 'pg_catalog'
AND n.nspname <> 'information_schema'
AND pg_catalog.pg_type_is_visible(t.oid)
ORDER BY 1, 2;
'''
with self.conn.cursor() as cur:
_logger.debug('Datatypes Query. sql: %r', fallback)
cur.execute(fallback)
for row in cur:
yield row
with self.conn.cursor() as cur:
_logger.debug('Datatypes Query. sql: %r', self.datatypes_query)
cur.execute(self.datatypes_query)
for row in cur:
yield row