support multiple servers

This commit is contained in:
Umputun 2021-04-02 00:07:36 -05:00
parent 0dcb927e01
commit 190b194d88
14 changed files with 185 additions and 74 deletions

View File

@ -21,18 +21,27 @@ type Service struct {
// UrlMapper contains all info about source and destination routes
type UrlMapper struct {
Server string
SrcMatch *regexp.Regexp
Dst string
ProviderID string
ProviderID ProviderID
}
// Provider defines sources of mappers
type Provider interface {
Events(ctx context.Context) (res <-chan struct{})
List() (res []UrlMapper, err error)
ID() string
ID() ProviderID
}
type ProviderID string
const (
PIDocker ProviderID = "docker"
PIStatic ProviderID = "static"
PIFile ProviderID = "file"
)
// NewService makes service with given providers
func NewService(providers []Provider) *Service {
return &Service{providers: providers}
@ -41,6 +50,7 @@ func NewService(providers []Provider) *Service {
// Do runs blocking loop getting events from all providers
// and updating mappers on each event
func (s *Service) Do(ctx context.Context) error {
var evChs []<-chan struct{}
for _, p := range s.providers {
evChs = append(evChs, p.Events(ctx))
@ -61,16 +71,20 @@ func (s *Service) Do(ctx context.Context) error {
}
// Match url to all providers mappers
func (s *Service) Match(url string) (string, bool) {
func (s *Service) Match(srv, src string) (string, bool) {
s.lock.RLock()
defer s.lock.RUnlock()
for _, m := range s.mappers {
dest := m.SrcMatch.ReplaceAllString(url, m.Dst)
if url != dest {
if m.Server != "*" && m.Server != "" && m.Server != srv {
continue
}
dest := m.SrcMatch.ReplaceAllString(src, m.Dst)
if src != dest {
return dest, true
}
}
return url, false
return src, false
}
func (s *Service) mergeLists() (res []UrlMapper) {
@ -79,6 +93,9 @@ func (s *Service) mergeLists() (res []UrlMapper) {
if err != nil {
continue
}
for i := range lst {
lst[i].ProviderID = p.ID()
}
res = append(res, lst...)
}
return res

View File

@ -3,6 +3,7 @@ package discovery
import (
"context"
"regexp"
"strconv"
"testing"
"time"
@ -23,6 +24,9 @@ func TestService_Do(t *testing.T) {
{SrcMatch: regexp.MustCompile("^/api/svc2/(.*)"), Dst: "http://127.0.0.2:8080/blah2/$1/abc"},
}, nil
},
IDFunc: func() ProviderID {
return PIFile
},
}
p2 := &ProviderMock{
EventsFunc: func(ctx context.Context) <-chan struct{} {
@ -33,6 +37,9 @@ func TestService_Do(t *testing.T) {
{SrcMatch: regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"},
}, nil
},
IDFunc: func() ProviderID {
return PIDocker
},
}
svc := NewService([]Provider{p1, p2})
@ -43,11 +50,18 @@ func TestService_Do(t *testing.T) {
require.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)
assert.Equal(t, 3, len(svc.mappers))
assert.Equal(t, PIFile, svc.mappers[0].ProviderID)
assert.Equal(t, "^/api/svc1/(.*)", svc.mappers[0].SrcMatch.String())
assert.Equal(t, "http://127.0.0.1:8080/blah1/$1", svc.mappers[0].Dst)
assert.Equal(t, 1, len(p1.EventsCalls()))
assert.Equal(t, 1, len(p2.EventsCalls()))
assert.Equal(t, 1, len(p1.ListCalls()))
assert.Equal(t, 1, len(p2.ListCalls()))
assert.Equal(t, 2, len(p1.IDCalls()))
assert.Equal(t, 1, len(p2.IDCalls()))
}
func TestService_Match(t *testing.T) {
@ -60,9 +74,13 @@ func TestService_Match(t *testing.T) {
ListFunc: func() ([]UrlMapper, error) {
return []UrlMapper{
{SrcMatch: regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1"},
{SrcMatch: regexp.MustCompile("^/api/svc2/(.*)"), Dst: "http://127.0.0.2:8080/blah2/$1/abc"},
{Server: "m.example.com", SrcMatch: regexp.MustCompile("^/api/svc2/(.*)"),
Dst: "http://127.0.0.2:8080/blah2/$1/abc"},
}, nil
},
IDFunc: func() ProviderID {
return PIFile
},
}
p2 := &ProviderMock{
EventsFunc: func(ctx context.Context) <-chan struct{} {
@ -73,6 +91,9 @@ func TestService_Match(t *testing.T) {
{SrcMatch: regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"},
}, nil
},
IDFunc: func() ProviderID {
return PIDocker
},
}
svc := NewService([]Provider{p1, p2})
@ -84,19 +105,23 @@ func TestService_Match(t *testing.T) {
assert.Equal(t, context.DeadlineExceeded, err)
assert.Equal(t, 3, len(svc.mappers))
{
res, ok := svc.Match("/api/svc3/xyz")
assert.True(t, ok)
assert.Equal(t, "http://127.0.0.3:8080/blah3/xyz", res)
tbl := []struct {
server, src string
dest string
ok bool
}{
{"example.com", "/api/svc3/xyz", "http://127.0.0.3:8080/blah3/xyz", true},
{"abc.example.com", "/api/svc1/1234", "http://127.0.0.1:8080/blah1/1234", true},
{"zzz.example.com", "/aaa/api/svc1/1234", "/aaa/api/svc1/1234", false},
{"m.example.com", "/api/svc2/1234", "http://127.0.0.2:8080/blah2/1234/abc", true},
{"m1.example.com", "/api/svc2/1234", "/api/svc2/1234", false},
}
{
res, ok := svc.Match("/api/svc1/1234")
assert.True(t, ok)
assert.Equal(t, "http://127.0.0.1:8080/blah1/1234", res)
}
{
res, ok := svc.Match("/aaa/api/svc1/1234")
assert.False(t, ok)
assert.Equal(t, "/aaa/api/svc1/1234", res)
for i, tt := range tbl {
t.Run(strconv.Itoa(i), func(t *testing.T) {
res, ok := svc.Match(tt.server, tt.src)
assert.Equal(t, tt.ok, ok)
assert.Equal(t, tt.dest, res)
})
}
}

View File

@ -42,11 +42,11 @@ var (
)
// Channel gets eventsCh with all containers events
func (s *Docker) Events(ctx context.Context) (res <-chan struct{}) {
func (d *Docker) Events(ctx context.Context) (res <-chan struct{}) {
eventsCh := make(chan struct{})
go func() {
for {
err := s.events(ctx, s.DockerClient, eventsCh)
err := d.events(ctx, d.DockerClient, eventsCh)
if err == context.Canceled || err == context.DeadlineExceeded {
close(eventsCh)
return
@ -59,8 +59,8 @@ func (s *Docker) Events(ctx context.Context) (res <-chan struct{}) {
}
// List all containers and make url mappers
func (s *Docker) List() ([]discovery.UrlMapper, error) {
containers, err := s.listContainers()
func (d *Docker) List() ([]discovery.UrlMapper, error) {
containers, err := d.listContainers()
if err != nil {
return nil, err
}
@ -69,28 +69,31 @@ func (s *Docker) List() ([]discovery.UrlMapper, error) {
for _, c := range containers {
srcURL := fmt.Sprintf("^/api/%s/(.*)", c.Name)
destURL := fmt.Sprintf("http://%s:8080/$1", c.Name)
server := "*"
if v, ok := c.Labels["dpx.route"]; ok {
srcURL = v
}
if v, ok := c.Labels["dpx.dest"]; ok {
destURL = fmt.Sprintf("http://%s:8080%s", c.Name, v)
}
if v, ok := c.Labels["dpx.server"]; ok {
server = v
}
srcRegex, err := regexp.Compile(srcURL)
if err != nil {
return nil, errors.Wrapf(err, "invalid src regex %s", srcURL)
}
res = append(res, discovery.UrlMapper{SrcMatch: srcRegex, Dst: destURL})
res = append(res, discovery.UrlMapper{Server: server, SrcMatch: srcRegex, Dst: destURL})
}
return res, nil
}
func (s *Docker) ID() string { return "docker" }
func (d *Docker) ID() discovery.ProviderID { return discovery.PIDocker }
// activate starts blocking listener for all docker events
// filters everything except "container" type, detects stop/start events and publishes signals to eventsCh
func (s *Docker) events(ctx context.Context, client DockerClient, eventsCh chan struct{}) error {
func (d *Docker) events(ctx context.Context, client DockerClient, eventsCh chan struct{}) error {
dockerEventsCh := make(chan *dclient.APIEvents)
if err := client.AddEventListener(dockerEventsCh); err != nil {
return errors.Wrap(err, "can't add even listener")
@ -113,7 +116,7 @@ func (s *Docker) events(ctx context.Context, client DockerClient, eventsCh chan
log.Printf("[DEBUG] api event %+v", ev)
containerName := strings.TrimPrefix(ev.Actor.Attributes["name"], "/")
if contains(containerName, s.Excludes) {
if contains(containerName, d.Excludes) {
log.Printf("[DEBUG] container %s excluded", containerName)
continue
}
@ -123,9 +126,9 @@ func (s *Docker) events(ctx context.Context, client DockerClient, eventsCh chan
}
}
func (s *Docker) listContainers() (res []containerInfo, err error) {
func (d *Docker) listContainers() (res []containerInfo, err error) {
containers, err := s.DockerClient.ListContainers(dclient.ListContainersOptions{All: false})
containers, err := d.DockerClient.ListContainers(dclient.ListContainersOptions{All: false})
if err != nil {
return nil, errors.Wrap(err, "can't list containers")
}
@ -136,7 +139,7 @@ func (s *Docker) listContainers() (res []containerInfo, err error) {
continue
}
containerName := strings.TrimPrefix(c.Names[0], "/")
if contains(containerName, s.Excludes) {
if contains(containerName, d.Excludes) {
log.Printf("[DEBUG] container %s excluded", containerName)
continue
}

View File

@ -15,7 +15,7 @@ func TestDocker_List(t *testing.T) {
ListContainersFunc: func(opts dclient.ListContainersOptions) ([]dclient.APIContainers, error) {
return []dclient.APIContainers{
{Names: []string{"c1"}, Status: "start",
Labels: map[string]string{"dpx.route": "^/api/123/(.*)", "dpx.dest": "/blah/$1"}},
Labels: map[string]string{"dpx.route": "^/api/123/(.*)", "dpx.dest": "/blah/$1", "dpx.server": "example.com"}},
{Names: []string{"c2"}, Status: "start"},
{Names: []string{"c3"}, Status: "stop"},
}, nil
@ -29,9 +29,12 @@ func TestDocker_List(t *testing.T) {
assert.Equal(t, "^/api/123/(.*)", res[0].SrcMatch.String())
assert.Equal(t, "http://c1:8080/blah/$1", res[0].Dst)
assert.Equal(t, "example.com", res[0].Server)
assert.Equal(t, "^/api/c2/(.*)", res[1].SrcMatch.String())
assert.Equal(t, "http://c2:8080/$1", res[1].Dst)
assert.Equal(t, "*", res[1].Server)
}
func TestDocker_Events(t *testing.T) {

View File

@ -1,15 +1,14 @@
package provider
import (
"bufio"
"context"
"os"
"regexp"
"strings"
"time"
log "github.com/go-pkgz/lgr"
"github.com/pkg/errors"
"gopkg.in/yaml.v3"
"github.com/umputun/docker-proxy/app/discovery"
)
@ -66,26 +65,32 @@ func (d *File) Events(ctx context.Context) <-chan struct{} {
// List all src dst pairs
func (d *File) List() (res []discovery.UrlMapper, err error) {
var fileConf []struct {
SourceServer string `yaml:"server"`
SourceRoute string `yaml:"route"`
Dest string `yaml:"dest"`
}
fh, err := os.Open(d.FileName)
if err != nil {
return nil, errors.Wrapf(err, "can't open %s", d.FileName)
}
defer fh.Close()
s := bufio.NewScanner(fh)
for s.Scan() {
line := s.Text()
elems := strings.Fields(line)
if len(elems) != 2 {
continue
}
rx, err := regexp.Compile(elems[0])
if err != nil {
return nil, errors.Wrapf(err, "can't parse regex %s", elems[0])
}
res = append(res, discovery.UrlMapper{SrcMatch: rx, Dst: elems[1]})
if err = yaml.NewDecoder(fh).Decode(&fileConf); err != nil {
return nil, errors.Wrapf(err, "can't parse %s", d.FileName)
}
return res, s.Err()
log.Printf("[DEBUG] file provider %+v", res)
for _, f := range fileConf {
rx, err := regexp.Compile(f.SourceRoute)
if err != nil {
return nil, errors.Wrapf(err, "can't parse regex %s", f.SourceRoute)
}
res = append(res, discovery.UrlMapper{Server: f.SourceServer, SrcMatch: rx, Dst: f.Dest})
}
return res, nil
}
func (d *File) ID() string { return "file" }
func (d *File) ID() discovery.ProviderID { return discovery.PIFile }

View File

@ -52,7 +52,7 @@ func TestFile_Events(t *testing.T) {
}
func TestFile_List(t *testing.T) {
f := File{FileName: "testdata/routes.txt"}
f := File{FileName: "testdata/config.yml"}
res, err := f.List()
require.NoError(t, err)

View File

@ -10,13 +10,13 @@ import (
"github.com/umputun/docker-proxy/app/discovery"
)
// Static provider, rules are from::to
// Static provider, rules are server,from,to
type Static struct {
Rules []string
}
// Events returns channel updating on file change only
func (s *Static) Events(ctx context.Context) <-chan struct{} {
// Events returns channel updating once
func (s *Static) Events(_ context.Context) <-chan struct{} {
res := make(chan struct{}, 1)
res <- struct{}{}
return res
@ -24,18 +24,34 @@ func (s *Static) Events(ctx context.Context) <-chan struct{} {
// List all src dst pairs
func (s *Static) List() (res []discovery.UrlMapper, err error) {
parse := func(inp string) (discovery.UrlMapper, error) {
elems := strings.Split(inp, ",")
switch len(elems) {
case 2:
rx, err := regexp.Compile(strings.TrimSpace(elems[0]))
if err != nil {
return discovery.UrlMapper{}, errors.Wrapf(err, "can't parse regex %s", elems[0])
}
return discovery.UrlMapper{Server: "*", SrcMatch: rx, Dst: strings.TrimSpace(elems[1])}, nil
case 3:
rx, err := regexp.Compile(strings.TrimSpace(elems[1]))
if err != nil {
return discovery.UrlMapper{}, errors.Wrapf(err, "can't parse regex %s", elems[1])
}
return discovery.UrlMapper{Server: strings.TrimSpace(elems[0]), SrcMatch: rx, Dst: strings.TrimSpace(elems[2])}, nil
}
return discovery.UrlMapper{}, errors.Errorf("can't parse entry %s", inp)
}
for _, r := range s.Rules {
elems := strings.Split(r, "::")
if len(elems) != 2 {
continue
}
rx, err := regexp.Compile(elems[0])
um, err := parse(r)
if err != nil {
return nil, errors.Wrapf(err, "can't parse regex %s", elems[0])
return nil, err
}
res = append(res, discovery.UrlMapper{SrcMatch: rx, Dst: elems[1]})
res = append(res, um)
}
return res, nil
}
func (s *Static) ID() string { return "static" }
func (s *Static) ID() discovery.ProviderID { return discovery.PIStatic }

View File

@ -0,0 +1,40 @@
package provider
import (
"strconv"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestStatic_List(t *testing.T) {
tbl := []struct {
rule string
server, src, dst string
err bool
}{
{"example.com,123,456", "example.com", "123", "456", false},
{"*,123,456", "*", "123", "456", false},
{"123,456", "*", "123", "456", false},
{"123", "", "", "", true},
{"example.com , 123, 456 ", "example.com", "123", "456", false},
}
for i, tt := range tbl {
t.Run(strconv.Itoa(i), func(t *testing.T) {
s := Static{Rules: []string{tt.rule}}
res, err := s.List()
if tt.err {
require.Error(t, err)
return
}
require.Equal(t, 1, len(res))
assert.Equal(t, tt.server, res[0].Server)
assert.Equal(t, tt.src, res[0].SrcMatch.String())
assert.Equal(t, tt.dst, res[0].Dst)
})
}
}

View File

@ -0,0 +1,3 @@
- {server: "*", route: "^/api/svc1/(.*)", dest: "http://127.0.0.1:8080/blah1/$1"}
- {server: "srv.example.com", route: "^/api/svc2/(.*)", dest: "http://127.0.0.2:8080/blah2/$1/abc"}
- {server: "*", route: "/api/svc3/xyz", dest: "http://127.0.0.3:8080/blah3/xyz"}

View File

@ -1,4 +0,0 @@
^/api/svc1/(.*) http://127.0.0.1:8080/blah1/$1
^/api/svc2/(.*) http://127.0.0.2:8080/blah2/$1/abc
/api/svc3/xyz http://127.0.0.3:8080/blah3/xyz

View File

@ -21,7 +21,7 @@ var _ Provider = &ProviderMock{}
// EventsFunc: func(ctx context.Context) <-chan struct{} {
// panic("mock out the Events method")
// },
// IDFunc: func() string {
// IDFunc: func() ProviderID {
// panic("mock out the ID method")
// },
// ListFunc: func() ([]UrlMapper, error) {
@ -38,7 +38,7 @@ type ProviderMock struct {
EventsFunc func(ctx context.Context) <-chan struct{}
// IDFunc mocks the ID method.
IDFunc func() string
IDFunc func() ProviderID
// ListFunc mocks the List method.
ListFunc func() ([]UrlMapper, error)
@ -94,7 +94,7 @@ func (mock *ProviderMock) EventsCalls() []struct {
}
// ID calls IDFunc.
func (mock *ProviderMock) ID() string {
func (mock *ProviderMock) ID() ProviderID {
if mock.IDFunc == nil {
panic("ProviderMock.IDFunc: method is nil but Provider.ID was just called")
}

View File

@ -13,10 +13,11 @@ import (
docker "github.com/fsouza/go-dockerclient"
"github.com/go-pkgz/lgr"
"github.com/pkg/errors"
"github.com/umputun/go-flags"
"github.com/umputun/docker-proxy/app/discovery"
"github.com/umputun/docker-proxy/app/discovery/provider"
"github.com/umputun/docker-proxy/app/proxy"
"github.com/umputun/go-flags"
)
var opts struct {
@ -39,7 +40,7 @@ var opts struct {
File struct {
Enabled bool `long:"enabled" env:"ENABLED" description:"enable file provider"`
Name string `long:"name" env:"NAME" default:"dpx.conf" description:"file name"`
Name string `long:"name" env:"NAME" default:"dpx.yml" description:"file name"`
CheckInterval time.Duration `long:"interval" env:"INTERVAL" default:"3s" description:"file check interval"`
Delay time.Duration `long:"delay" env:"DELAY" default:"500ms" description:"file event delay"`
} `group:"file" namespace:"file" env-namespace:"FILE"`

View File

@ -6,7 +6,7 @@ import (
)
// Headers middleware adds headers to request
func Headers(headers []string) func(http.Handler) http.Handler {
func Headers(headers ...string) func(http.Handler) http.Handler {
return func(h http.Handler) http.Handler {

View File

@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httputil"
"net/url"
"strings"
"time"
"github.com/go-pkgz/rest"
@ -26,7 +27,7 @@ type Http struct {
}
type Matcher interface {
Match(url string) (string, bool)
Match(srv, src string) (string, bool)
}
func (h *Http) Do(ctx context.Context) error {
@ -42,7 +43,7 @@ func (h *Http) Do(ctx context.Context) error {
rest.Ping,
logger.New(logger.Prefix("[DEBUG] PROXY")).Handler,
rest.SizeLimit(h.MaxBodySize),
middleware.Headers(h.ProxyHeaders),
middleware.Headers(h.ProxyHeaders...),
h.gzipHandler(),
),
ReadHeaderTimeout: 5 * time.Second,
@ -117,7 +118,8 @@ func (h *Http) proxyHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
u, ok := h.Match(r.URL.Path)
server := strings.Split(r.Host, ":")[0]
u, ok := h.Match(server, r.URL.Path)
if !ok {
assetsHandler.ServeHTTP(w, r)
return