diff --git a/app/discovery/provider/docker.go b/app/discovery/provider/docker.go index e55486b..9400db0 100644 --- a/app/discovery/provider/docker.go +++ b/app/discovery/provider/docker.go @@ -177,12 +177,48 @@ func (d *Docker) events(ctx context.Context, eventsCh chan<- discovery.ProviderI ticker := time.NewTicker(dockerPollingInterval) defer ticker.Stop() + // Keep track of running containers + saved := make(map[string]containerInfo) + + update := func() { + containers, err := d.listContainers() + if err != nil { + log.Printf("[ERROR] failed to fetch running containers: %s", err) + return + } + + refresh := false + seen := make(map[string]bool) + + for _, c := range containers { + old, exists := saved[c.ID] + + if !exists || c.IP != old.IP || c.State != old.State || !c.TS.Equal(old.TS) { + refresh = true + } + + seen[c.ID] = true + } + + if len(saved) != len(seen) || refresh { + log.Printf("[INFO] changes in running containers detected: refreshing routes") + for k := range saved { + delete(saved, k) + } + for _, c := range containers { + saved[c.ID] = c + } + eventsCh <- discovery.PIDocker + } + } + + update() // Refresh immediately for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: - eventsCh <- discovery.PIDocker + update() } } } diff --git a/app/discovery/provider/docker_test.go b/app/discovery/provider/docker_test.go index f7dbf38..13217ab 100644 --- a/app/discovery/provider/docker_test.go +++ b/app/discovery/provider/docker_test.go @@ -1,8 +1,10 @@ package provider import ( + "context" "fmt" "io/ioutil" + "log" "net/http" "net/http/httptest" "strings" @@ -11,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/umputun/reproxy/app/discovery" ) func TestDocker_List(t *testing.T) { @@ -66,6 +69,62 @@ func TestDocker_List(t *testing.T) { assert.Equal(t, "*", res[2].Server) } +func TestDocker_refresh(t *testing.T) { + containers := make(chan []containerInfo) + + d := Docker{DockerClient: &DockerClientMock{ + ListContainersFunc: func() ([]containerInfo, error) { + return <-containers, nil + }, + }} + + events := make(chan discovery.ProviderID) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + stub := func(id string) containerInfo { + return containerInfo{ID: id, Name: id, State: "running", IP: "127.0.0." + id, Ports: []int{12345}} + } + + recv := func() { + select { + case <-events: + return + case <-time.After(time.Second): + t.Fatal("No refresh notification was received after 1s") + } + } + + go func() { + dockerPollingInterval = time.Microsecond + if err := d.events(ctx, events); err != context.Canceled { + log.Fatal(err) + } + }() + + // Start some + containers <- []containerInfo{stub("1"), stub("2")} + recv() + + // Nothing changed + containers <- []containerInfo{stub("1"), stub("2")} + time.Sleep(time.Millisecond) + assert.Empty(t, events, "unexpected refresh notification") + + // Stopped + containers <- []containerInfo{stub("1")} + recv() + + // One changed + containers <- []containerInfo{ + {ID: "1", Name: "1", State: "running", IP: "127.42.42.42", Ports: []int{12345}}, + } + recv() + + time.Sleep(time.Millisecond) + assert.Empty(t, events, "unexpect refresh notification from events channel") +} + func TestDockerClient(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.Equal(t, "/v1.41/containers/json", r.URL.Path)