mirror of
https://github.com/umputun/reproxy.git
synced 2024-11-30 18:35:11 +03:00
Detect changes in running containers
* Naive attempt to detect changes based on container metadata * And my best effort to test it
This commit is contained in:
parent
a91fb6816f
commit
76be67d39c
@ -177,12 +177,48 @@ func (d *Docker) events(ctx context.Context, eventsCh chan<- discovery.ProviderI
|
|||||||
ticker := time.NewTicker(dockerPollingInterval)
|
ticker := time.NewTicker(dockerPollingInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
// Keep track of running containers
|
||||||
|
saved := make(map[string]containerInfo)
|
||||||
|
|
||||||
|
update := func() {
|
||||||
|
containers, err := d.listContainers()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[ERROR] failed to fetch running containers: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
refresh := false
|
||||||
|
seen := make(map[string]bool)
|
||||||
|
|
||||||
|
for _, c := range containers {
|
||||||
|
old, exists := saved[c.ID]
|
||||||
|
|
||||||
|
if !exists || c.IP != old.IP || c.State != old.State || !c.TS.Equal(old.TS) {
|
||||||
|
refresh = true
|
||||||
|
}
|
||||||
|
|
||||||
|
seen[c.ID] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(saved) != len(seen) || refresh {
|
||||||
|
log.Printf("[INFO] changes in running containers detected: refreshing routes")
|
||||||
|
for k := range saved {
|
||||||
|
delete(saved, k)
|
||||||
|
}
|
||||||
|
for _, c := range containers {
|
||||||
|
saved[c.ID] = c
|
||||||
|
}
|
||||||
|
eventsCh <- discovery.PIDocker
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
update() // Refresh immediately
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
eventsCh <- discovery.PIDocker
|
update()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
package provider
|
package provider
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"strings"
|
"strings"
|
||||||
@ -11,6 +13,7 @@ import (
|
|||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/umputun/reproxy/app/discovery"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDocker_List(t *testing.T) {
|
func TestDocker_List(t *testing.T) {
|
||||||
@ -66,6 +69,62 @@ func TestDocker_List(t *testing.T) {
|
|||||||
assert.Equal(t, "*", res[2].Server)
|
assert.Equal(t, "*", res[2].Server)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDocker_refresh(t *testing.T) {
|
||||||
|
containers := make(chan []containerInfo)
|
||||||
|
|
||||||
|
d := Docker{DockerClient: &DockerClientMock{
|
||||||
|
ListContainersFunc: func() ([]containerInfo, error) {
|
||||||
|
return <-containers, nil
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
|
||||||
|
events := make(chan discovery.ProviderID)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
stub := func(id string) containerInfo {
|
||||||
|
return containerInfo{ID: id, Name: id, State: "running", IP: "127.0.0." + id, Ports: []int{12345}}
|
||||||
|
}
|
||||||
|
|
||||||
|
recv := func() {
|
||||||
|
select {
|
||||||
|
case <-events:
|
||||||
|
return
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("No refresh notification was received after 1s")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
dockerPollingInterval = time.Microsecond
|
||||||
|
if err := d.events(ctx, events); err != context.Canceled {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Start some
|
||||||
|
containers <- []containerInfo{stub("1"), stub("2")}
|
||||||
|
recv()
|
||||||
|
|
||||||
|
// Nothing changed
|
||||||
|
containers <- []containerInfo{stub("1"), stub("2")}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
assert.Empty(t, events, "unexpected refresh notification")
|
||||||
|
|
||||||
|
// Stopped
|
||||||
|
containers <- []containerInfo{stub("1")}
|
||||||
|
recv()
|
||||||
|
|
||||||
|
// One changed
|
||||||
|
containers <- []containerInfo{
|
||||||
|
{ID: "1", Name: "1", State: "running", IP: "127.42.42.42", Ports: []int{12345}},
|
||||||
|
}
|
||||||
|
recv()
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
assert.Empty(t, events, "unexpect refresh notification from events channel")
|
||||||
|
}
|
||||||
|
|
||||||
func TestDockerClient(t *testing.T) {
|
func TestDockerClient(t *testing.T) {
|
||||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
require.Equal(t, "/v1.41/containers/json", r.URL.Path)
|
require.Equal(t, "/v1.41/containers/json", r.URL.Path)
|
||||||
|
Loading…
Reference in New Issue
Block a user