add consul catalog provider

This commit is contained in:
Negasus 2021-05-12 11:05:12 +03:00 committed by Umputun
parent a93bd40f8a
commit 738c3baf6c
8 changed files with 1021 additions and 4 deletions

View File

@ -2,7 +2,7 @@
<img class="logo" src="https://raw.githubusercontent.com/umputun/reproxy/master/site/src/logo-bg.svg" width="355px" height="142px" alt="Reproxy | Simple Reverse Proxy"/>
</div>
Reproxy is a simple edge HTTP(s) server / reverse proxy supporting various providers (docker, static, file). One or more providers supply information about the requested server, requested URL, destination URL, and health check URL. It is distributed as a single binary or as a docker container.
Reproxy is a simple edge HTTP(s) server / reverse proxy supporting various providers (docker, static, file, consul catalog). One or more providers supply information about the requested server, requested URL, destination URL, and health check URL. It is distributed as a single binary or as a docker container.
- Automatic SSL termination with <a href="https://letsencrypt.org/" rel="nofollow noopener noreferrer" target="_blank">Let's Encrypt</a>
- Support of user-provided SSL certificates
@ -10,6 +10,7 @@ Reproxy is a simple edge HTTP(s) server / reverse proxy supporting various provi
- Static, command-line proxy rules provider
- Dynamic, file-based proxy rules provider
- Docker provider with an automatic discovery
- Consul Catalog provider with discovery by service tags
- Support of multiple (virtual) hosts
- Optional traffic compression
- User-defined limits and timeouts
@ -99,6 +100,37 @@ If no `reproxy.route` defined, the default is `http://<container_name>:<containe
This is a dynamic provider and any change in container's status will be applied automatically.
### Consul Catalog
Use: `reproxy --consul-catalog.enabled`
Consul Catalog provider periodically (every second by default) calls Consul API for obtaining services, which has any tag with `reproxy.` prefix.
You can redefine check interval with `--consul-catalog.interval` command line flag.
Also, you can redefine consul address with `--consul-catalog.address` command line option. Default address is `http://127.0.0.1:8500`.
For example:
```
reproxy --consul-catalog.enabled --consul-catalog.address=http://192.168.1.100:8500 --consul-catalog.interval=10s
```
By default, provider sets values for every service:
- enabled `false`
- server `*`
- route `^/(.*)`
- dest `http://<SERVICE_ADDRESS_FROM_CONSUL>/$1`
- ping `http://<SERVICE_ADDRESS_FROM_CONSUL>/ping`
This default can be changed with tags:
- `reproxy.server` - server (hostname) to match. Also, can be a list of comma-separated servers.
- `reproxy.route` - source route (location)
- `reproxy.dest` - destination path. Note: this is not full url, but just the path which will be appended to service's ip:port
- `reproxy.port` - destination port for the discovered service
- `reproxy.ping` - ping path for the destination service.
- `reproxy.enabled` - enable (`yes`, `true`, `1`) or disable (`any different value`) service from reproxy destinations.
## SSL support
SSL mode (by default none) can be set to `auto` (ACME/LE certificates), `static` (existing certificate) or `none`. If `auto` turned on SSL certificate will be issued automatically for all discovered server names. User can override it by setting `--ssl.fqdn` value(s)

View File

@ -49,9 +49,10 @@ type ProviderID string
// enum of all provider ids
const (
PIDocker ProviderID = "docker"
PIStatic ProviderID = "static"
PIFile ProviderID = "file"
PIDocker ProviderID = "docker"
PIStatic ProviderID = "static"
PIFile ProviderID = "file"
PIConsulCatalog ProviderID = "consul-catalog"
)
// MatchType defines the type of mapper (rule)

View File

@ -0,0 +1,159 @@
package consulcatalog
import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"sort"
"strings"
)
// HTTPClient represents interface for http client
type HTTPClient interface {
Do(r *http.Request) (*http.Response, error)
}
// consulClient allows to get consul services with 'reproxy' tags
// 1. Client calls https://www.consul.io/api-docs/catalog#list-services API for get services list.
// It returns services list with names and tags (without addresses)
// Next, Client filters this list for exclude services without 'reproxy' tags
// 2. Client calls https://www.consul.io/api-docs/catalog#list-nodes-for-service API for every service
// This API returns data about every service instance. Include address, port and more
// Client stores services addresses and ports to internal storage
type consulClient struct {
address string
httpClient HTTPClient
}
// NewClient creates new Consul consulClient
func NewClient(address string, httpClient HTTPClient) ConsulClient {
cl := &consulClient{
address: strings.TrimSuffix(address, "/"),
httpClient: httpClient,
}
return cl
}
// Get implements ConsulClient interface and returns consul services list,
// which have any tag with 'reproxy.' prefix
func (cl *consulClient) Get() ([]consulService, error) {
var result []consulService
serviceNames, err := cl.getServiceNames()
if err != nil {
return nil, fmt.Errorf("error get service names, %w", err)
}
for _, serviceName := range serviceNames {
services, err := cl.getServices(serviceName)
if err != nil {
return nil, fmt.Errorf("error get nodes for service name %s, %w", serviceName, err)
}
result = append(result, services...)
}
return result, nil
}
func (cl *consulClient) getServiceNames() ([]string, error) {
req, err := http.NewRequest(http.MethodGet, cl.address+"/v1/catalog/services", nil)
if err != nil {
return nil, fmt.Errorf("error create a http request, %w", err)
}
resp, err := cl.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("error send request to consul, %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected response status code %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error read response body, %w", err)
}
err = resp.Body.Close()
if err != nil {
log.Printf("[ERROR] error close body, %v", err)
}
result := map[string][]string{}
err = json.Unmarshal(body, &result)
if err != nil {
return nil, fmt.Errorf("error unmarshal consul response, %w", err)
}
return cl.filterServices(result), nil
}
func (cl *consulClient) filterServices(src map[string][]string) []string {
var result []string
for serviceName, tags := range src {
for _, tag := range tags {
if strings.HasPrefix(tag, "reproxy.") {
result = append(result, serviceName)
}
}
}
sort.Strings(result)
return result
}
func (cl *consulClient) getServices(serviceName string) ([]consulService, error) {
req, err := http.NewRequest(http.MethodGet, cl.address+"/v1/catalog/service/"+serviceName, nil)
if err != nil {
return nil, fmt.Errorf("error create a http request, %w", err)
}
resp, err := cl.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("error send request to consul, %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected response status code %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error read response body, %w", err)
}
err = resp.Body.Close()
if err != nil {
log.Printf("[ERROR] error close body, %v", err)
}
var services []consulService
err = json.Unmarshal(body, &services)
if err != nil {
return nil, fmt.Errorf("error unmarshal consul response, %w", err)
}
for idx, s := range services {
s.Labels = make(map[string]string)
for _, t := range s.ServiceTags {
if strings.HasPrefix(t, "reproxy.") {
delimiterIdx := strings.IndexByte(t, '=')
if delimiterIdx == -1 || delimiterIdx <= len("reproxy.") {
s.Labels[t] = ""
continue
}
s.Labels[t[:delimiterIdx]] = t[delimiterIdx+1:]
}
}
services[idx] = s
}
return services, nil
}

View File

@ -0,0 +1,258 @@
package consulcatalog
import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net/http"
"net/http/httptest"
"strings"
"testing"
)
func TestNewClient(t *testing.T) {
cl := NewClient("demo//", &http.Client{})
assert.IsType(t, &consulClient{}, cl)
}
func TestClient_getServiceNames(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "/v1/catalog/services", req.RequestURI)
rw.Write([]byte(`{"s1":[],"s2":["baz","wow"],"s3":["bar","reproxy.enabled","foo"],"s4":["reproxy.enabled"]}`))
}))
cl := &consulClient{
address: server.URL,
httpClient: server.Client(),
}
names, err := cl.getServiceNames()
require.NoError(t, err)
require.Equal(t, 2, len(names))
assert.Equal(t, names[0], "s3")
assert.Equal(t, names[1], "s4")
}
func TestClient_getServiceNames_error_send_request(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
}))
server.Close()
cl := &consulClient{
address: server.URL,
httpClient: server.Client(),
}
_, err := cl.getServiceNames()
require.Error(t, err)
assert.True(t, strings.HasPrefix(err.Error(), "error send request to consul, Get "))
}
func TestClient_getServiceNames_bad_status_code(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(400)
}))
cl := &consulClient{
address: server.URL,
httpClient: server.Client(),
}
_, err := cl.getServiceNames()
require.Error(t, err)
assert.Equal(t, "unexpected response status code 400", err.Error())
}
func TestClient_getServiceNames_wrong_answer(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Write([]byte(`bad json`))
}))
cl := &consulClient{
address: server.URL,
httpClient: server.Client(),
}
_, err := cl.getServiceNames()
require.Error(t, err)
assert.Equal(t, "error unmarshal consul response, invalid character 'b' looking for beginning of value", err.Error())
}
func TestClient_getServices(t *testing.T) {
body := `[
{"ServiceID":"s1","ServiceName":"n1","ServiceTags":[],"ServiceAddress":"a1","ServicePort":1000},
{"ServiceID":"s2","ServiceName":"n2","ServiceTags":["reproxy.enabled","foo"],"ServiceAddress":"a2","ServicePort":2000},
{"ServiceID":"s3","ServiceName":"n3","ServiceTags":["reproxy.foo","reproxy.a=1","reproxy.b=bar","reproxy.baz=","reproxy.=bad"],"ServiceAddress":"a3","ServicePort":3000}
]`
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "/v1/catalog/service/service1", req.RequestURI)
rw.Write([]byte(body))
}))
cl := &consulClient{
address: server.URL,
httpClient: server.Client(),
}
services, err := cl.getServices("service1")
require.NoError(t, err)
require.Equal(t, 3, len(services))
assert.Equal(t, "s1", services[0].ServiceID)
assert.Equal(t, "n1", services[0].ServiceName)
assert.Equal(t, "a1", services[0].ServiceAddress)
assert.Equal(t, 1000, services[0].ServicePort)
assert.Equal(t, 0, len(services[0].Labels))
var v string
var ok bool
assert.Equal(t, "s2", services[1].ServiceID)
assert.Equal(t, "n2", services[1].ServiceName)
assert.Equal(t, "a2", services[1].ServiceAddress)
assert.Equal(t, 2000, services[1].ServicePort)
assert.Equal(t, 1, len(services[1].Labels))
v, ok = services[1].Labels["reproxy.enabled"]
assert.True(t, ok)
assert.Equal(t, "", v)
assert.Equal(t, "s3", services[2].ServiceID)
assert.Equal(t, "n3", services[2].ServiceName)
assert.Equal(t, "a3", services[2].ServiceAddress)
assert.Equal(t, 3000, services[2].ServicePort)
assert.Equal(t, 5, len(services[2].Labels))
v, ok = services[2].Labels["reproxy.foo"]
assert.True(t, ok)
assert.Equal(t, "", v)
v, ok = services[2].Labels["reproxy.a"]
assert.True(t, ok)
assert.Equal(t, "1", v)
v, ok = services[2].Labels["reproxy.b"]
assert.True(t, ok)
assert.Equal(t, "bar", v)
v, ok = services[2].Labels["reproxy.baz"]
assert.True(t, ok)
assert.Equal(t, "", v)
v, ok = services[2].Labels["reproxy.=bad"]
assert.True(t, ok)
assert.Equal(t, "", v)
}
func TestClient_getServices_request_error(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
}))
server.Close()
cl := &consulClient{
address: server.URL,
httpClient: server.Client(),
}
_, err := cl.getServices("service1")
require.Error(t, err)
assert.True(t, strings.HasPrefix(err.Error(), "error send request to consul, Get "))
}
func TestClient_getServices_bad_status_code(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(400)
}))
cl := &consulClient{
address: server.URL,
httpClient: server.Client(),
}
_, err := cl.getServices("service1")
require.Error(t, err)
assert.Equal(t, "unexpected response status code 400", err.Error())
}
func TestClient_getServices_wrong_answer(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Write([]byte("bad json"))
}))
cl := &consulClient{
address: server.URL,
httpClient: server.Client(),
}
_, err := cl.getServices("service1")
require.Error(t, err)
assert.Equal(t, "error unmarshal consul response, invalid character 'b' looking for beginning of value", err.Error())
}
func TestClient_Get(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if req.RequestURI == "/v1/catalog/services" {
rw.Write([]byte(`{"s1":["reproxy.enabled"]}`))
return
}
if req.RequestURI == "/v1/catalog/service/s1" {
rw.Write([]byte(`[{"ServiceID":"s1","ServiceName":"n1","ServiceTags":["reproxy.enabled"],"ServiceAddress":"a1","ServicePort":1000}]`))
return
}
panic("unexpected " + req.RequestURI)
}))
cl := &consulClient{
address: server.URL,
httpClient: server.Client(),
}
res, err := cl.Get()
require.NoError(t, err)
require.Equal(t, 1, len(res))
assert.Equal(t, "s1", res[0].ServiceID)
assert.Equal(t, "n1", res[0].ServiceName)
assert.Equal(t, "a1", res[0].ServiceAddress)
assert.Equal(t, 1000, res[0].ServicePort)
}
func TestClient_Get_error_get_names(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if req.RequestURI == "/v1/catalog/services" {
rw.WriteHeader(400)
return
}
if req.RequestURI == "/v1/catalog/service/s1" {
rw.Write([]byte(`[{"ServiceID":"s1","ServiceName":"n1","ServiceTags":["reproxy.enabled"],"ServiceAddress":"a1","ServicePort":1000}]`))
return
}
panic("unexpected " + req.RequestURI)
}))
cl := &consulClient{
address: server.URL,
httpClient: server.Client(),
}
_, err := cl.Get()
require.Error(t, err)
assert.Equal(t, "error get service names, unexpected response status code 400", err.Error())
}
func TestClient_Get_error_get_services(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if req.RequestURI == "/v1/catalog/services" {
return
}
if req.RequestURI == "/v1/catalog/service/s1" {
rw.WriteHeader(400)
return
}
panic("unexpected " + req.RequestURI)
}))
cl := &consulClient{
address: server.URL,
httpClient: server.Client(),
}
_, err := cl.Get()
require.Error(t, err)
assert.Equal(t, "error get service names, error unmarshal consul response, unexpected end of JSON input", err.Error())
}

View File

@ -0,0 +1,62 @@
// Code generated by moq; DO NOT EDIT.
// github.com/matryer/moq
package consulcatalog
import (
"sync"
)
// ConsulClientMock is a mock implementation of ConsulClient.
//
// func TestSomethingThatUsesConsulClient(t *testing.T) {
//
// // make and configure a mocked ConsulClient
// mockedConsulClient := &ConsulClientMock{
// GetFunc: func() ([]consulService, error) {
// panic("mock out the Get method")
// },
// }
//
// // use mockedConsulClient in code that requires ConsulClient
// // and then make assertions.
//
// }
type ConsulClientMock struct {
// GetFunc mocks the Get method.
GetFunc func() ([]consulService, error)
// calls tracks calls to the methods.
calls struct {
// Get holds details about calls to the Get method.
Get []struct {
}
}
lockGet sync.RWMutex
}
// Get calls GetFunc.
func (mock *ConsulClientMock) Get() ([]consulService, error) {
if mock.GetFunc == nil {
panic("ConsulClientMock.GetFunc: method is nil but ConsulClient.Get was just called")
}
callInfo := struct {
}{}
mock.lockGet.Lock()
mock.calls.Get = append(mock.calls.Get, callInfo)
mock.lockGet.Unlock()
return mock.GetFunc()
}
// GetCalls gets all the calls that were made to Get.
// Check the length with:
// len(mockedConsulClient.GetCalls())
func (mock *ConsulClientMock) GetCalls() []struct {
} {
var calls []struct {
}
mock.lockGet.RLock()
calls = mock.calls.Get
mock.lockGet.RUnlock()
return calls
}

View File

@ -0,0 +1,191 @@
package consulcatalog
import (
"context"
"fmt"
"github.com/umputun/reproxy/app/discovery"
"log"
"regexp"
"sort"
"strings"
"time"
)
//go:generate moq -out consul_client_mock.go -skip-ensure -fmt goimports . ConsulClient
// ConsulClient defines interface getting consul services
type ConsulClient interface {
Get() ([]consulService, error)
}
type consulService struct {
ServiceID string `json:"ServiceID"`
ServiceName string `json:"ServiceName"`
ServiceTags []string `json:"ServiceTags"`
ServiceAddress string `json:"ServiceAddress"`
ServicePort int `json:"ServicePort"`
Labels map[string]string `json:"-"`
}
// ConsulCatalog provider periodically gets consul services with tags, started with 'reproxy.'
// It stores service list IDs in the internal storage. If service list was changed, it send signal to the core
// The provider maps services with rules, described in the docker provider documentation
//
// reproxy.route sets source route, and reproxy.dest sets the destination.
// Optional reproxy.server enforces match by server name (hostname) and reproxy.ping sets the health check url
type ConsulCatalog struct {
client ConsulClient
refreshInterval time.Duration
// current services list with ServiceID as map key
list map[string]struct{}
}
// New creates new ConsulCatalog instance
func New(client ConsulClient, checkInterval time.Duration) *ConsulCatalog {
cc := &ConsulCatalog{
client: client,
refreshInterval: checkInterval,
list: make(map[string]struct{}),
}
return cc
}
// Events gets eventsCh, which emit services list update events
func (cc *ConsulCatalog) Events(ctx context.Context) (res <-chan discovery.ProviderID) {
eventsCh := make(chan discovery.ProviderID)
go func() {
if err := cc.events(ctx, eventsCh); err != context.Canceled {
log.Printf("[ERROR] unexpected consulcatalog events error: %s", err)
}
}()
return eventsCh
}
func (cc *ConsulCatalog) events(ctx context.Context, eventsCh chan<- discovery.ProviderID) error {
var err error
ticker := time.NewTicker(cc.refreshInterval)
defer ticker.Stop()
for {
err = cc.checkUpdates(eventsCh)
if err != nil {
log.Printf("[ERROR] error update consul catalog data, %v", err)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}
func (cc *ConsulCatalog) checkUpdates(eventsCh chan<- discovery.ProviderID) error {
services, err := cc.client.Get()
if err != nil {
return fmt.Errorf("unable to get services list, %w", err)
}
if !cc.serviceListWasChanged(services) {
return nil
}
cc.updateServices(services)
eventsCh <- discovery.PIConsulCatalog
return nil
}
func (cc *ConsulCatalog) serviceListWasChanged(services []consulService) bool {
if len(services) != len(cc.list) {
return true
}
for _, s := range services {
if _, ok := cc.list[s.ServiceID]; !ok {
return true
}
}
return false
}
func (cc *ConsulCatalog) updateServices(services []consulService) {
for key := range cc.list {
delete(cc.list, key)
}
for _, s := range services {
cc.list[s.ServiceID] = struct{}{}
}
}
// List all containers and make url mappers
// If AutoAPI enabled all each container and set all params, if not - allow only container with reproxy.* tags
func (cc *ConsulCatalog) List() ([]discovery.URLMapper, error) {
log.Print("[DEBUG] call consul catalog list")
res := make([]discovery.URLMapper, 0, len(cc.list))
services, err := cc.client.Get()
if err != nil {
return nil, fmt.Errorf("error get services list, %w", err)
}
for _, c := range services {
enabled := false
srcURL := "^/(.*)"
destURL := fmt.Sprintf("http://%s:%d/$1", c.ServiceAddress, c.ServicePort)
pingURL := fmt.Sprintf("http://%s:%d/ping", c.ServiceAddress, c.ServicePort)
server := "*"
if v, ok := c.Labels["reproxy.enabled"]; ok && (v == "true" || v == "yes" || v == "1") {
enabled = true
}
if v, ok := c.Labels["reproxy.route"]; ok {
enabled = true
srcURL = v
}
if v, ok := c.Labels["reproxy.dest"]; ok {
enabled = true
destURL = fmt.Sprintf("http://%s:%d%s", c.ServiceAddress, c.ServicePort, v)
}
if v, ok := c.Labels["reproxy.server"]; ok {
enabled = true
server = v
}
if v, ok := c.Labels["reproxy.ping"]; ok {
enabled = true
pingURL = fmt.Sprintf("http://%s:%d%s", c.ServiceAddress, c.ServicePort, v)
}
if !enabled {
log.Printf("[DEBUG] service %s disabled", c.ServiceID)
continue
}
srcRegex, err := regexp.Compile(srcURL)
if err != nil {
return nil, fmt.Errorf("invalid src regex %s: %w", srcURL, err)
}
// server label may have multiple, comma separated servers
for _, srv := range strings.Split(server, ",") {
res = append(res, discovery.URLMapper{Server: strings.TrimSpace(srv), SrcMatch: *srcRegex, Dst: destURL,
PingURL: pingURL, ProviderID: discovery.PIConsulCatalog})
}
}
// sort by len(SrcMatch) to have shorter matches after longer
// this way we can handle possible conflicts with more detailed match triggered before less detailed
sort.Slice(res, func(i, j int) bool {
return len(res[i].SrcMatch.String()) > len(res[j].SrcMatch.String())
})
return res, nil
}

View File

@ -0,0 +1,302 @@
package consulcatalog
import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/umputun/reproxy/app/discovery"
"sort"
"testing"
"time"
)
func TestNew(t *testing.T) {
cc := New(&ConsulClientMock{GetFunc: func() ([]consulService, error) {
return nil, nil
}}, time.Second)
assert.IsType(t, &ConsulCatalog{}, cc)
assert.Equal(t, time.Second, cc.refreshInterval)
}
func TestConsulCatalog_List_error(t *testing.T) {
clientMock := &ConsulClientMock{GetFunc: func() ([]consulService, error) {
return nil, fmt.Errorf("err1")
}}
cc := &ConsulCatalog{client: clientMock}
_, err := cc.List()
require.Error(t, err)
assert.Equal(t, "error get services list, err1", err.Error())
}
func TestConsulCatalog_List(t *testing.T) {
clientMock := &ConsulClientMock{GetFunc: func() ([]consulService, error) {
return []consulService{
{
ServiceID: "id0",
ServiceName: "name0",
ServiceAddress: "addr0",
ServicePort: 1000,
Labels: map[string]string{"foo.bar": "baz"},
},
{
ServiceID: "id1",
ServiceName: "name1",
ServiceAddress: "addr1",
ServicePort: 1000,
Labels: map[string]string{"reproxy.enabled": "false"},
},
{
ServiceID: "id2",
ServiceName: "name2",
ServiceAddress: "addr2",
ServicePort: 2000,
Labels: map[string]string{"reproxy.enabled": "true"},
},
{
ServiceID: "id3",
ServiceName: "name3",
ServiceAddress: "addr3",
ServicePort: 3000,
Labels: map[string]string{"reproxy.route": "^/api/123/(.*)", "reproxy.dest": "/blah/$1",
"reproxy.server": "example.com,domain.com", "reproxy.ping": "/ping", "reproxy.enabled": "yes"},
},
{
ServiceID: "id4",
ServiceName: "name44",
ServiceAddress: "addr44",
ServicePort: 4000,
Labels: map[string]string{"reproxy.enabled": "1"},
},
}, nil
}}
cc := &ConsulCatalog{
client: clientMock,
}
res, err := cc.List()
require.NoError(t, err)
require.Equal(t, 4, len(res))
// sort slice for exclude random item positions after sorting by SrtMatch in List function
sort.Slice(res, func(i, j int) bool {
return len(res[i].Dst+res[i].Server) > len(res[j].Dst+res[j].Server)
})
assert.Equal(t, "^/api/123/(.*)", res[0].SrcMatch.String())
assert.Equal(t, "http://addr3:3000/blah/$1", res[0].Dst)
assert.Equal(t, "example.com", res[0].Server)
assert.Equal(t, "http://addr3:3000/ping", res[0].PingURL)
assert.Equal(t, "^/api/123/(.*)", res[1].SrcMatch.String())
assert.Equal(t, "http://addr3:3000/blah/$1", res[1].Dst)
assert.Equal(t, "domain.com", res[1].Server)
assert.Equal(t, "http://addr3:3000/ping", res[1].PingURL)
assert.Equal(t, "^/(.*)", res[2].SrcMatch.String())
assert.Equal(t, "http://addr44:4000/$1", res[2].Dst)
assert.Equal(t, "http://addr44:4000/ping", res[2].PingURL)
assert.Equal(t, "*", res[2].Server)
assert.Equal(t, "^/(.*)", res[3].SrcMatch.String())
assert.Equal(t, "http://addr2:2000/$1", res[3].Dst)
assert.Equal(t, "http://addr2:2000/ping", res[3].PingURL)
assert.Equal(t, "*", res[3].Server)
}
func TestConsulCatalog_serviceListWasChanged(t *testing.T) {
type fields struct {
list map[string]struct{}
}
type args struct {
services []consulService
}
tests := []struct {
name string
fields fields
args args
want bool
}{
{
name: "empty, not changed",
fields: fields{
list: map[string]struct{}{},
},
args: args{
services: []consulService{},
},
want: false,
},
{
name: "not changed",
fields: fields{
list: map[string]struct{}{"1": {}, "2": {}, "3": {}},
},
args: args{
services: []consulService{{ServiceID: "1"}, {ServiceID: "3"}, {ServiceID: "2"}},
},
want: false,
},
{
name: "changed",
fields: fields{
list: map[string]struct{}{"1": {}, "2": {}, "3": {}},
},
args: args{
services: []consulService{{ServiceID: "1"}, {ServiceID: "100"}, {ServiceID: "2"}},
},
want: true,
},
{
name: "new service, changed",
fields: fields{
list: map[string]struct{}{"1": {}, "2": {}, "3": {}},
},
args: args{
services: []consulService{{ServiceID: "1"}, {ServiceID: "3"}, {ServiceID: "2"}, {ServiceID: "4"}},
},
want: true,
},
{
name: "remove service, changed",
fields: fields{
list: map[string]struct{}{"1": {}, "2": {}, "3": {}},
},
args: args{
services: []consulService{{ServiceID: "1"}, {ServiceID: "3"}},
},
want: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
cc := &ConsulCatalog{
list: tt.fields.list,
}
if got := cc.serviceListWasChanged(tt.args.services); got != tt.want {
t.Errorf("serviceListWasChanged() = %v, want %v", got, tt.want)
}
})
}
}
func TestConsulCatalog_updateServices(t *testing.T) {
cc := &ConsulCatalog{
list: map[string]struct{}{
"3": {},
"100": {},
},
}
cc.updateServices([]consulService{{ServiceID: "1"}, {ServiceID: "2"}, {ServiceID: "3"}})
require.Equal(t, 3, len(cc.list))
_, ok := cc.list["1"]
assert.True(t, ok)
_, ok = cc.list["2"]
assert.True(t, ok)
_, ok = cc.list["3"]
assert.True(t, ok)
}
func TestConsulCatalog_checkUpdates_http_error(t *testing.T) {
clientMock := &ConsulClientMock{
GetFunc: func() ([]consulService, error) {
return nil, fmt.Errorf("err1")
},
}
cc := &ConsulCatalog{
client: clientMock,
}
err := cc.checkUpdates(nil)
require.Error(t, err)
assert.Equal(t, "unable to get services list, err1", err.Error())
}
func TestConsulCatalog_checkUpdates_not_changed(t *testing.T) {
clientMock := &ConsulClientMock{
GetFunc: func() ([]consulService, error) {
return nil, nil
},
}
cc := &ConsulCatalog{
client: clientMock,
}
err := cc.checkUpdates(nil)
require.NoError(t, err)
assert.Equal(t, 0, len(cc.list))
}
func TestConsulCatalog_checkUpdates_changed(t *testing.T) {
clientMock := &ConsulClientMock{
GetFunc: func() ([]consulService, error) {
return []consulService{{ServiceID: "1"}}, nil
},
}
cc := &ConsulCatalog{
list: map[string]struct{}{
"2": {},
},
client: clientMock,
}
ch := make(chan discovery.ProviderID, 1)
err := cc.checkUpdates(ch)
require.NoError(t, err)
assert.Equal(t, 1, len(cc.list))
_, ok := cc.list["1"]
assert.True(t, ok)
s, ok := <-ch
assert.True(t, ok)
assert.Equal(t, discovery.PIConsulCatalog, s)
}
func TestConsulCatalog_Events(t *testing.T) {
clientMock := &ConsulClientMock{
GetFunc: func() ([]consulService, error) {
return []consulService{
{
ServiceID: "1",
Labels: map[string]string{"reproxy.enabled": "1"},
},
}, nil
},
}
cc := &ConsulCatalog{
list: map[string]struct{}{
"2": {},
},
client: clientMock,
refreshInterval: time.Millisecond,
}
ch := cc.Events(context.Background())
var s discovery.ProviderID
select {
case s = <-ch:
case <-time.After(time.Millisecond * 5):
t.Fatal("not received event")
return
}
assert.Equal(t, discovery.PIConsulCatalog, s)
list, err := cc.List()
require.NoError(t, err)
assert.Equal(t, 1, len(list))
}

View File

@ -19,6 +19,7 @@ import (
"github.com/umputun/reproxy/app/discovery"
"github.com/umputun/reproxy/app/discovery/provider"
"github.com/umputun/reproxy/app/discovery/provider/consulcatalog"
"github.com/umputun/reproxy/app/mgmt"
"github.com/umputun/reproxy/app/proxy"
)
@ -62,6 +63,12 @@ var opts struct {
APIPrefix string `long:"prefix" env:"PREFIX" description:"prefix for docker source routes"`
} `group:"docker" namespace:"docker" env-namespace:"DOCKER"`
ConsulCatalog struct {
Enabled bool `long:"enabled" env:"ENABLED" description:"enable consul catalog provider"`
Address string `long:"address" env:"ADDRESS" default:"http://127.0.0.1:8500" description:"consul address"`
CheckInterval time.Duration `long:"interval" env:"INTERVAL" default:"1s" description:"consul catalog check interval"`
} `group:"consul-catalog" namespace:"consul-catalog" env-namespace:"CONSUL_CATALOG"`
File struct {
Enabled bool `long:"enabled" env:"ENABLED" description:"enable file provider"`
Name string `long:"name" env:"NAME" default:"reproxy.yml" description:"file name"`
@ -269,6 +276,11 @@ func makeProviders() ([]discovery.Provider, error) {
AutoAPI: opts.Docker.AutoAPI, APIPrefix: opts.Docker.APIPrefix, RefreshInterval: refreshInterval})
}
if opts.ConsulCatalog.Enabled {
client := consulcatalog.NewClient(opts.ConsulCatalog.Address, http.DefaultClient)
res = append(res, consulcatalog.New(client, opts.ConsulCatalog.CheckInterval))
}
if len(res) == 0 && opts.Assets.Location == "" {
return nil, errors.New("no providers enabled")
}