Run from in memory cache updates (#1564)

* ConfigFile change to allowlist and blocklist

* revised names and warnings

* consistent file naming in kebab case, and generic use of blocklist and allowlist in cmoments for clarity

* update ci files

* impose maximum delay and document

* live update of servers

* update for source prefixes

* fixup test

* stop registerServers being called twice at startup

* prevent double registration at startup

* tidy function signature for loadSource

Co-authored-by: Ian Bashford <ianbashford@gmail.com>
This commit is contained in:
Ian Bashford 2021-01-01 13:04:12 +00:00 committed by GitHub
parent f19b14e74c
commit 87fb44a588
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 137 additions and 74 deletions

View File

@ -632,6 +632,24 @@ func ConfigLoad(proxy *Proxy, flags *ConfigFlags) error {
config.SourceDoH = true
}
var requiredProps stamps.ServerInformalProperties
if config.SourceRequireDNSSEC {
requiredProps |= stamps.ServerInformalPropertyDNSSEC
}
if config.SourceRequireNoLog {
requiredProps |= stamps.ServerInformalPropertyNoLog
}
if config.SourceRequireNoFilter {
requiredProps |= stamps.ServerInformalPropertyNoFilter
}
proxy.requiredProps = requiredProps
proxy.ServerNames = config.ServerNames
proxy.DisabledServerNames = config.DisabledServerNames
proxy.SourceIPv4 = config.SourceIPv4
proxy.SourceIPv6 = config.SourceIPv6
proxy.SourceDNSCrypt = config.SourceDNSCrypt
proxy.SourceDoH = config.SourceDoH
netprobeTimeout := config.NetprobeTimeout
flag.Visit(func(flag *flag.Flag) {
if flag.Name == "netprobe-timeout" && flags.NetprobeTimeoutOverride != nil {
@ -750,22 +768,13 @@ func (config *Config) printRegisteredServers(proxy *Proxy, jsonOutput bool) erro
}
func (config *Config) loadSources(proxy *Proxy) error {
var requiredProps stamps.ServerInformalProperties
if config.SourceRequireDNSSEC {
requiredProps |= stamps.ServerInformalPropertyDNSSEC
}
if config.SourceRequireNoLog {
requiredProps |= stamps.ServerInformalPropertyNoLog
}
if config.SourceRequireNoFilter {
requiredProps |= stamps.ServerInformalPropertyNoFilter
}
for cfgSourceName, cfgSource_ := range config.SourcesConfig {
cfgSource := cfgSource_
if err := config.loadSource(proxy, requiredProps, cfgSourceName, &cfgSource); err != nil {
if err := config.loadSource(proxy, cfgSourceName, &cfgSource); err != nil {
return err
}
}
proxy.updateRegisteredServers()
for name, config := range config.StaticsConfig {
if stamp, err := stamps.NewServerStampFromString(config.Stamp); err == nil {
if stamp.Proto == stamps.StampProtoTypeDNSCryptRelay || stamp.Proto == stamps.StampProtoTypeODoHRelay {
@ -801,7 +810,7 @@ func (config *Config) loadSources(proxy *Proxy) error {
return nil
}
func (config *Config) loadSource(proxy *Proxy, requiredProps stamps.ServerInformalProperties, cfgSourceName string, cfgSource *SourceConfig) error {
func (config *Config) loadSource(proxy *Proxy, cfgSourceName string, cfgSource *SourceConfig) error {
if len(cfgSource.URLs) == 0 {
if len(cfgSource.URL) == 0 {
dlog.Debugf("Missing URLs for source [%s]", cfgSourceName)
@ -820,8 +829,10 @@ func (config *Config) loadSource(proxy *Proxy, requiredProps stamps.ServerInform
}
if cfgSource.RefreshDelay <= 0 {
cfgSource.RefreshDelay = 72
} else if cfgSource.RefreshDelay > 168 {
cfgSource.RefreshDelay = 168
}
source, err := NewSource(cfgSourceName, proxy.xTransport, cfgSource.URLs, cfgSource.MinisignKeyStr, cfgSource.CacheFile, cfgSource.FormatStr, time.Duration(cfgSource.RefreshDelay)*time.Hour)
source, err := NewSource(cfgSourceName, proxy.xTransport, cfgSource.URLs, cfgSource.MinisignKeyStr, cfgSource.CacheFile, cfgSource.FormatStr, time.Duration(cfgSource.RefreshDelay)*time.Hour, cfgSource.Prefix)
if err != nil {
if len(source.in) <= 0 {
dlog.Criticalf("Unable to retrieve source [%s]: [%s]", cfgSourceName, err)
@ -830,51 +841,6 @@ func (config *Config) loadSource(proxy *Proxy, requiredProps stamps.ServerInform
dlog.Infof("Downloading [%s] failed: %v, using cache file to startup", source.name, err)
}
proxy.sources = append(proxy.sources, source)
registeredServers, err := source.Parse(cfgSource.Prefix)
if err != nil {
if len(registeredServers) == 0 {
dlog.Criticalf("Unable to use source [%s]: [%s]", cfgSourceName, err)
return err
}
dlog.Warnf("Error in source [%s]: [%s] -- Continuing with reduced server count [%d]", cfgSourceName, err, len(registeredServers))
}
for _, registeredServer := range registeredServers {
if registeredServer.stamp.Proto != stamps.StampProtoTypeDNSCryptRelay && registeredServer.stamp.Proto != stamps.StampProtoTypeODoHRelay {
if len(config.ServerNames) > 0 {
if !includesName(config.ServerNames, registeredServer.name) {
continue
}
} else if registeredServer.stamp.Props&requiredProps != requiredProps {
continue
}
}
if includesName(config.DisabledServerNames, registeredServer.name) {
continue
}
if config.SourceIPv4 || config.SourceIPv6 {
isIPv4, isIPv6 := true, false
if registeredServer.stamp.Proto == stamps.StampProtoTypeDoH {
isIPv4, isIPv6 = true, true
}
if strings.HasPrefix(registeredServer.stamp.ServerAddrStr, "[") {
isIPv4, isIPv6 = false, true
}
if !(config.SourceIPv4 == isIPv4 || config.SourceIPv6 == isIPv6) {
continue
}
}
if registeredServer.stamp.Proto == stamps.StampProtoTypeDNSCryptRelay || registeredServer.stamp.Proto == stamps.StampProtoTypeODoHRelay {
dlog.Debugf("Adding [%s] to the set of available relays", registeredServer.name)
proxy.registeredRelays = append(proxy.registeredRelays, registeredServer)
} else {
if !((config.SourceDNSCrypt && registeredServer.stamp.Proto == stamps.StampProtoTypeDNSCrypt) ||
(config.SourceDoH && registeredServer.stamp.Proto == stamps.StampProtoTypeDoH)) {
continue
}
dlog.Debugf("Adding [%s] to the set of wanted resolvers", registeredServer.name)
proxy.registeredServers = append(proxy.registeredServers, registeredServer)
}
}
return nil
}

View File

@ -629,6 +629,9 @@ cache_neg_max_ttl = 600
## If the `urls` property is missing, cache files and valid signatures
## must already be present. This doesn't prevent these cache files from
## expiring after `refresh_delay` hours.
## Cache freshness is checked every 24 hours, so values for 'refresh_delay'
## of less than 24 hours will have no effect.
## A maximum delay of 168 hours (1 week) is imposed to ensure cache freshness.
[sources]
@ -638,6 +641,7 @@ cache_neg_max_ttl = 600
urls = ['https://raw.githubusercontent.com/DNSCrypt/dnscrypt-resolvers/master/v3/public-resolvers.md', 'https://download.dnscrypt.info/resolvers-list/v3/public-resolvers.md', 'https://ipv6.download.dnscrypt.info/resolvers-list/v3/public-resolvers.md', 'https://download.dnscrypt.net/resolvers-list/v3/public-resolvers.md']
cache_file = 'public-resolvers.md'
minisign_key = 'RWQf6LRCGA9i53mlYecO4IzT51TGPpvWucNSCh1CBM0QTaLn73Y7GFO3'
refresh_delay = 72
prefix = ''
## Anonymized DNS relays

View File

@ -7,6 +7,7 @@ import (
"net"
"os"
"runtime"
"strings"
"sync/atomic"
"time"
@ -92,6 +93,13 @@ type Proxy struct {
pluginBlockUndelegated bool
child bool
daemonize bool
requiredProps stamps.ServerInformalProperties
ServerNames []string
DisabledServerNames []string
SourceIPv4 bool
SourceIPv6 bool
SourceDNSCrypt bool
SourceDoH bool
}
func (proxy *Proxy) registerUDPListener(conn *net.UDPConn) {
@ -222,9 +230,6 @@ func (proxy *Proxy) StartProxy() {
dlog.Fatal(err)
}
curve25519.ScalarBaseMult(&proxy.proxyPublicKey, &proxy.proxySecretKey)
for _, registeredServer := range proxy.registeredServers {
proxy.serversInfo.registerServer(registeredServer.name, registeredServer.stamp)
}
proxy.startAcceptingClients()
liveServers, err := proxy.serversInfo.refresh(proxy)
if liveServers > 0 {
@ -247,6 +252,7 @@ func (proxy *Proxy) StartProxy() {
go func() {
for {
clocksmith.Sleep(PrefetchSources(proxy.xTransport, proxy.sources))
proxy.updateRegisteredServers()
runtime.GC()
}
}()
@ -268,6 +274,88 @@ func (proxy *Proxy) StartProxy() {
}
}
func (proxy *Proxy) updateRegisteredServers() error {
for _, source := range proxy.sources {
//dlog.Debugf(string(source.in))
registeredServers, err := source.Parse()
if err != nil {
if len(registeredServers) == 0 {
dlog.Criticalf("Unable to use source [%s]: [%s]", source.name, err)
return err
}
dlog.Warnf("Error in source [%s]: [%s] -- Continuing with reduced server count [%d]", source.name, err, len(registeredServers))
}
for _, registeredServer := range registeredServers {
if registeredServer.stamp.Proto != stamps.StampProtoTypeDNSCryptRelay && registeredServer.stamp.Proto != stamps.StampProtoTypeODoHRelay {
if len(proxy.ServerNames) > 0 {
if !includesName(proxy.ServerNames, registeredServer.name) {
continue
}
} else if registeredServer.stamp.Props&proxy.requiredProps != proxy.requiredProps {
continue
}
}
if includesName(proxy.DisabledServerNames, registeredServer.name) {
continue
}
if proxy.SourceIPv4 || proxy.SourceIPv6 {
isIPv4, isIPv6 := true, false
if registeredServer.stamp.Proto == stamps.StampProtoTypeDoH {
isIPv4, isIPv6 = true, true
}
if strings.HasPrefix(registeredServer.stamp.ServerAddrStr, "[") {
isIPv4, isIPv6 = false, true
}
if !(proxy.SourceIPv4 == isIPv4 || proxy.SourceIPv6 == isIPv6) {
continue
}
}
if registeredServer.stamp.Proto == stamps.StampProtoTypeDNSCryptRelay || registeredServer.stamp.Proto == stamps.StampProtoTypeODoHRelay {
var found bool
for i, currentRegisteredRelay := range proxy.registeredRelays {
if currentRegisteredRelay.name == registeredServer.name {
found = true
if currentRegisteredRelay.stamp.String() != registeredServer.stamp.String() {
dlog.Infof("Updating stamp for [%s] was: %s now: %s", registeredServer.name, currentRegisteredRelay.stamp.String(), registeredServer.stamp.String())
proxy.registeredRelays[i].stamp = registeredServer.stamp
dlog.Debugf("Total count of registered relays %v", len(proxy.registeredRelays))
}
}
}
if !found {
dlog.Debugf("Adding [%s] to the set of available relays", registeredServer.name)
proxy.registeredRelays = append(proxy.registeredRelays, registeredServer)
}
} else {
if !((proxy.SourceDNSCrypt && registeredServer.stamp.Proto == stamps.StampProtoTypeDNSCrypt) ||
(proxy.SourceDoH && registeredServer.stamp.Proto == stamps.StampProtoTypeDoH)) {
continue
}
var found bool
for i, currentRegisteredServer := range proxy.registeredServers {
if currentRegisteredServer.name == registeredServer.name {
found = true
if currentRegisteredServer.stamp.String() != registeredServer.stamp.String() {
dlog.Infof("Updating stamp for [%s] was: %s now: %s", registeredServer.name, currentRegisteredServer.stamp.String(), registeredServer.stamp.String())
proxy.registeredServers[i].stamp = registeredServer.stamp
}
}
}
if !found {
dlog.Debugf("Adding [%s] to the set of wanted resolvers", registeredServer.name)
proxy.registeredServers = append(proxy.registeredServers, registeredServer)
dlog.Debugf("Total count of registered servers %v", len(proxy.registeredServers))
}
}
}
}
for _, registeredServer := range proxy.registeredServers {
proxy.serversInfo.registerServer(registeredServer.name, registeredServer.stamp)
}
return nil
}
func (proxy *Proxy) udpListener(clientPc *net.UDPConn) {
defer clientPc.Close()
for {

View File

@ -164,11 +164,14 @@ func (serversInfo *ServersInfo) refreshServer(proxy *Proxy, name string, stamp s
break
}
}
if isNew {
serversInfo.inner = append(serversInfo.inner, &newServer)
serversInfo.registeredServers = append(serversInfo.registeredServers, RegisteredServer{name: name, stamp: stamp})
}
serversInfo.Unlock()
if isNew {
serversInfo.Lock()
serversInfo.inner = append(serversInfo.inner, &newServer)
serversInfo.Unlock()
proxy.serversInfo.registerServer(name, stamp)
}
return nil
}

View File

@ -39,6 +39,7 @@ type Source struct {
cacheFile string
cacheTTL, prefetchDelay time.Duration
refresh time.Time
prefix string
}
func (source *Source) checkSignature(bin, sig []byte) (err error) {
@ -180,11 +181,11 @@ func (source *Source) fetchWithCache(xTransport *XTransport, now time.Time) (del
}
// NewSource loads a new source using the given cacheFile and urls, ensuring it has a valid signature
func NewSource(name string, xTransport *XTransport, urls []string, minisignKeyStr string, cacheFile string, formatStr string, refreshDelay time.Duration) (source *Source, err error) {
func NewSource(name string, xTransport *XTransport, urls []string, minisignKeyStr string, cacheFile string, formatStr string, refreshDelay time.Duration, prefix string) (source *Source, err error) {
if refreshDelay < DefaultPrefetchDelay {
refreshDelay = DefaultPrefetchDelay
}
source = &Source{name: name, urls: []*url.URL{}, cacheFile: cacheFile, cacheTTL: refreshDelay, prefetchDelay: DefaultPrefetchDelay}
source = &Source{name: name, urls: []*url.URL{}, cacheFile: cacheFile, cacheTTL: refreshDelay, prefetchDelay: DefaultPrefetchDelay, prefix: prefix}
if formatStr == "v2" {
source.format = SourceFormatV2
} else {
@ -223,15 +224,15 @@ func PrefetchSources(xTransport *XTransport, sources []*Source) time.Duration {
return interval
}
func (source *Source) Parse(prefix string) ([]RegisteredServer, error) {
func (source *Source) Parse() ([]RegisteredServer, error) {
if source.format == SourceFormatV2 {
return source.parseV2(prefix)
return source.parseV2()
}
dlog.Fatal("Unexpected source format")
return []RegisteredServer{}, nil
}
func (source *Source) parseV2(prefix string) ([]RegisteredServer, error) {
func (source *Source) parseV2() ([]RegisteredServer, error) {
var registeredServers []RegisteredServer
var stampErrs []string
appendStampErr := func(format string, a ...interface{}) {
@ -256,7 +257,7 @@ func (source *Source) parseV2(prefix string) ([]RegisteredServer, error) {
return registeredServers, fmt.Errorf("Invalid format for source at [%v]", source.urls)
}
subparts = subparts[1:]
name = prefix + name
name = source.prefix + name
var stampStr, description string
stampStrs := make([]string, 0)
for _, subpart := range subparts {

View File

@ -65,6 +65,7 @@ type SourceTestExpect struct {
urls []string
Source *Source
delay time.Duration
prefix string
}
func readFixture(t *testing.T, name string) []byte {
@ -388,12 +389,12 @@ func TestNewSource(t *testing.T) {
refreshDelay time.Duration
e *SourceTestExpect
}{
{"", "", 0, &SourceTestExpect{err: " ", Source: &Source{name: "short refresh delay", urls: []*url.URL{}, cacheTTL: DefaultPrefetchDelay, prefetchDelay: DefaultPrefetchDelay}}},
{"", "", 0, &SourceTestExpect{err: " ", Source: &Source{name: "short refresh delay", urls: []*url.URL{}, cacheTTL: DefaultPrefetchDelay, prefetchDelay: DefaultPrefetchDelay, prefix: ""}}},
{"v1", d.keyStr, DefaultPrefetchDelay * 2, &SourceTestExpect{err: "Unsupported source format", Source: &Source{name: "old format", urls: []*url.URL{}, cacheTTL: DefaultPrefetchDelay * 2, prefetchDelay: DefaultPrefetchDelay}}},
{"v2", "", DefaultPrefetchDelay * 3, &SourceTestExpect{err: "Invalid encoded public key", Source: &Source{name: "invalid public key", urls: []*url.URL{}, cacheTTL: DefaultPrefetchDelay * 3, prefetchDelay: DefaultPrefetchDelay}}},
} {
t.Run(tt.e.Source.name, func(t *testing.T) {
got, err := NewSource(tt.e.Source.name, d.xTransport, tt.e.urls, tt.key, tt.e.cachePath, tt.v, tt.refreshDelay)
got, err := NewSource(tt.e.Source.name, d.xTransport, tt.e.urls, tt.key, tt.e.cachePath, tt.v, tt.refreshDelay, tt.e.prefix)
checkResult(t, tt.e, got, err)
})
}
@ -403,7 +404,7 @@ func TestNewSource(t *testing.T) {
for i := range d.sources {
id, e := setupSourceTestCase(t, d, i, &cacheTest, downloadTest)
t.Run("cache "+cacheTestName+", download "+downloadTestName+"/"+id, func(t *testing.T) {
got, err := NewSource(id, d.xTransport, e.urls, d.keyStr, e.cachePath, "v2", DefaultPrefetchDelay*3)
got, err := NewSource(id, d.xTransport, e.urls, d.keyStr, e.cachePath, "v2", DefaultPrefetchDelay*3, "")
checkResult(t, e, got, err)
})
}