pgweb/pkg/client/client.go

341 lines
6.9 KiB
Go
Raw Normal View History

2015-04-30 19:47:07 +03:00
package client
2014-10-11 02:14:17 +04:00
import (
"fmt"
neturl "net/url"
2014-10-11 02:14:17 +04:00
"reflect"
2016-01-13 06:33:44 +03:00
"strings"
2014-11-11 08:10:05 +03:00
2015-04-30 20:09:29 +03:00
_ "github.com/lib/pq"
2014-11-11 08:10:05 +03:00
"github.com/jmoiron/sqlx"
2015-04-30 19:47:07 +03:00
"github.com/sosedoff/pgweb/pkg/command"
"github.com/sosedoff/pgweb/pkg/connection"
"github.com/sosedoff/pgweb/pkg/history"
"github.com/sosedoff/pgweb/pkg/shared"
2015-04-30 19:47:07 +03:00
"github.com/sosedoff/pgweb/pkg/statements"
2014-10-11 02:14:17 +04:00
)
type Client struct {
db *sqlx.DB
2016-01-13 10:29:14 +03:00
tunnel *Tunnel
History []history.Record `json:"history"`
ConnectionString string `json:"connection_string"`
2014-10-11 02:14:17 +04:00
}
// Struct to hold table rows browsing options
type RowsOptions struct {
Where string // Custom filter
Offset int // Number of rows to skip
Limit int // Number of rows to fetch
SortColumn string // Column to sort by
SortOrder string // Sort direction (ASC, DESC)
}
2016-01-13 06:33:44 +03:00
func getSchemaAndTable(str string) (string, string) {
chunks := strings.Split(str, ".")
if len(chunks) == 1 {
return "public", chunks[0]
}
return chunks[0], chunks[1]
}
2015-04-30 19:47:07 +03:00
func New() (*Client, error) {
str, err := connection.BuildString(command.Opts)
2015-04-30 19:47:07 +03:00
if command.Opts.Debug && str != "" {
fmt.Println("Creating a new client for:", str)
}
if err != nil {
return nil, err
}
db, err := sqlx.Open("postgres", str)
2014-10-11 02:14:17 +04:00
if err != nil {
return nil, err
}
client := Client{
db: db,
2015-04-30 19:47:07 +03:00
ConnectionString: str,
History: history.New(),
}
return &client, nil
2014-10-11 02:14:17 +04:00
}
func NewFromUrl(url string, sshInfo *shared.SSHInfo) (*Client, error) {
var tunnel *Tunnel
if sshInfo != nil {
if command.Opts.Debug {
fmt.Println("Opening SSH tunnel for:", sshInfo)
}
tunnel, err := NewTunnel(sshInfo, url)
if err != nil {
tunnel.Close()
return nil, err
}
err = tunnel.Configure()
if err != nil {
tunnel.Close()
return nil, err
}
go tunnel.Start()
uri, err := neturl.Parse(url)
if err != nil {
tunnel.Close()
return nil, err
}
// Override remote postgres port with local proxy port
url = strings.Replace(url, uri.Host, fmt.Sprintf("127.0.0.1:%v", tunnel.Port), 1)
}
2015-04-30 19:47:07 +03:00
if command.Opts.Debug {
fmt.Println("Creating a new client for:", url)
}
db, err := sqlx.Open("postgres", url)
if err != nil {
return nil, err
}
client := Client{
db: db,
tunnel: tunnel,
2015-04-30 19:47:07 +03:00
ConnectionString: url,
History: history.New(),
}
return &client, nil
}
2014-10-12 07:38:32 +04:00
func (client *Client) Test() error {
return client.db.Ping()
}
2014-10-16 06:59:43 +04:00
func (client *Client) Info() (*Result, error) {
2015-04-30 19:47:07 +03:00
return client.query(statements.PG_INFO)
2014-10-16 06:59:43 +04:00
}
2014-10-16 01:05:23 +04:00
func (client *Client) Databases() ([]string, error) {
2015-04-30 19:47:07 +03:00
return client.fetchRows(statements.PG_DATABASES)
2014-10-16 01:05:23 +04:00
}
func (client *Client) Schemas() ([]string, error) {
2015-04-30 19:47:07 +03:00
return client.fetchRows(statements.PG_SCHEMAS)
}
2016-01-13 06:33:44 +03:00
func (client *Client) Objects() (*Result, error) {
return client.query(statements.PG_OBJECTS)
2014-10-11 02:14:17 +04:00
}
2014-10-16 06:54:40 +04:00
func (client *Client) Table(table string) (*Result, error) {
2016-01-13 06:33:44 +03:00
schema, table := getSchemaAndTable(table)
return client.query(statements.PG_TABLE_SCHEMA, schema, table)
2014-10-16 06:54:40 +04:00
}
func (client *Client) MaterializedView(name string) (*Result, error) {
return client.query(statements.PG_MATERIALIZED_VIEW_SCHEMA, name)
}
func (client *Client) TableRows(table string, opts RowsOptions) (*Result, error) {
2016-01-13 06:33:44 +03:00
schema, table := getSchemaAndTable(table)
sql := fmt.Sprintf(`SELECT * FROM "%s"."%s"`, schema, table)
if opts.Where != "" {
sql += fmt.Sprintf(" WHERE %s", opts.Where)
}
if opts.SortColumn != "" {
if opts.SortOrder == "" {
opts.SortOrder = "ASC"
}
sql += fmt.Sprintf(" ORDER BY %s %s", opts.SortColumn, opts.SortOrder)
}
if opts.Limit > 0 {
sql += fmt.Sprintf(" LIMIT %d", opts.Limit)
}
if opts.Offset > 0 {
sql += fmt.Sprintf(" OFFSET %d", opts.Offset)
}
return client.query(sql)
}
func (client *Client) TableRowsCount(table string, opts RowsOptions) (*Result, error) {
2016-01-13 06:33:44 +03:00
schema, table := getSchemaAndTable(table)
sql := fmt.Sprintf(`SELECT COUNT(1) FROM "%s"."%s"`, schema, table)
if opts.Where != "" {
sql += fmt.Sprintf(" WHERE %s", opts.Where)
}
return client.query(sql)
}
func (client *Client) TableInfo(table string) (*Result, error) {
2015-04-30 19:47:07 +03:00
return client.query(statements.PG_TABLE_INFO, table)
}
2014-10-11 22:20:16 +04:00
func (client *Client) TableIndexes(table string) (*Result, error) {
2016-01-13 06:33:44 +03:00
schema, table := getSchemaAndTable(table)
res, err := client.query(statements.PG_TABLE_INDEXES, schema, table)
2014-10-11 22:20:16 +04:00
if err != nil {
return nil, err
}
return res, err
}
2015-12-05 03:14:03 +03:00
func (client *Client) TableConstraints(table string) (*Result, error) {
2016-01-13 06:33:44 +03:00
schema, table := getSchemaAndTable(table)
res, err := client.query(statements.PG_TABLE_CONSTRAINTS, schema, table)
2015-12-05 03:14:03 +03:00
if err != nil {
return nil, err
}
return res, err
}
// Returns all active queriers on the server
func (client *Client) Activity() (*Result, error) {
2015-04-30 19:47:07 +03:00
return client.query(statements.PG_ACTIVITY)
}
2014-10-11 02:14:17 +04:00
func (client *Client) Query(query string) (*Result, error) {
res, err := client.query(query)
// Save history records only if query did not fail
if err == nil && !client.hasHistoryRecord(query) {
2015-04-30 19:47:07 +03:00
client.History = append(client.History, history.NewRecord(query))
}
return res, err
}
func (client *Client) query(query string, args ...interface{}) (*Result, error) {
action := strings.ToLower(strings.Split(query, " ")[0])
if action == "update" || action == "delete" {
res, err := client.db.Exec(query, args...)
if err != nil {
return nil, err
}
affected, err := res.RowsAffected()
if err != nil {
return nil, err
}
result := Result{
Columns: []string{"Rows Affected"},
Rows: []Row{
Row{affected},
},
}
return &result, nil
}
rows, err := client.db.Queryx(query, args...)
2014-10-11 02:14:17 +04:00
if err != nil {
return nil, err
}
defer rows.Close()
cols, err := rows.Columns()
if err != nil {
return nil, err
}
// Make sure to never return null colums
if cols == nil {
cols = []string{}
}
result := Result{
Columns: cols,
Rows: []Row{},
}
2014-10-11 02:14:17 +04:00
for rows.Next() {
obj, err := rows.SliceScan()
for i, item := range obj {
if item == nil {
obj[i] = nil
} else {
t := reflect.TypeOf(item).Kind().String()
if t == "slice" {
obj[i] = string(item.([]byte))
}
}
}
if err == nil {
result.Rows = append(result.Rows, obj)
}
}
result.PrepareBigints()
2014-10-11 02:14:17 +04:00
return &result, nil
}
2015-04-30 19:47:07 +03:00
// Close database connection
func (client *Client) Close() error {
if client.tunnel != nil {
client.tunnel.Close()
}
2015-04-30 20:09:29 +03:00
if client.db != nil {
return client.db.Close()
}
2015-04-30 20:09:29 +03:00
return nil
2015-04-30 19:47:07 +03:00
}
// Fetch all rows as strings for a single column
func (client *Client) fetchRows(q string) ([]string, error) {
res, err := client.query(q)
if err != nil {
return nil, err
}
// Init empty slice so json.Marshal will encode it to "[]" instead of "null"
results := make([]string, 0)
for _, row := range res.Rows {
results = append(results, row[0].(string))
}
return results, nil
}
func (client *Client) hasHistoryRecord(query string) bool {
result := false
for _, record := range client.History {
if record.Query == query {
result = true
break
}
}
return result
}