2018-10-07 00:51:44 +03:00
package dnsfilter
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/go-test/deep"
)
var (
fileWriteLock sync . Mutex
)
2018-10-09 05:02:16 +03:00
const enableGzip = false
2018-10-10 19:44:07 +03:00
func flushToFile ( buffer [ ] * logEntry ) error {
2018-10-07 00:51:44 +03:00
if len ( buffer ) == 0 {
return nil
}
start := time . Now ( )
var b bytes . Buffer
e := json . NewEncoder ( & b )
for _ , entry := range buffer {
err := e . Encode ( entry )
if err != nil {
log . Printf ( "Failed to marshal entry: %s" , err )
return err
}
}
elapsed := time . Since ( start )
log . Printf ( "%d elements serialized via json in %v: %d kB, %v/entry, %v/entry" , len ( buffer ) , elapsed , b . Len ( ) / 1024 , float64 ( b . Len ( ) ) / float64 ( len ( buffer ) ) , elapsed / time . Duration ( len ( buffer ) ) )
err := checkBuffer ( buffer , b )
if err != nil {
log . Printf ( "failed to check buffer: %s" , err )
return err
}
var zb bytes . Buffer
2018-10-09 05:02:16 +03:00
filename := queryLogFileName
2018-10-07 00:51:44 +03:00
2018-10-09 05:02:16 +03:00
// gzip enabled?
if enableGzip {
filename += ".gz"
2018-10-07 00:51:44 +03:00
2018-10-09 05:02:16 +03:00
zw := gzip . NewWriter ( & zb )
zw . Name = queryLogFileName
zw . ModTime = time . Now ( )
2018-10-07 00:51:44 +03:00
2018-10-09 05:02:16 +03:00
_ , err = zw . Write ( b . Bytes ( ) )
if err != nil {
log . Printf ( "Couldn't compress to gzip: %s" , err )
zw . Close ( )
return err
}
if err = zw . Close ( ) ; err != nil {
log . Printf ( "Couldn't close gzip writer: %s" , err )
return err
}
} else {
zb = b
2018-10-07 00:51:44 +03:00
}
fileWriteLock . Lock ( )
defer fileWriteLock . Unlock ( )
2018-10-09 05:02:16 +03:00
f , err := os . OpenFile ( filename , os . O_WRONLY | os . O_CREATE | os . O_APPEND , 0644 )
2018-10-07 00:51:44 +03:00
if err != nil {
2018-10-09 05:02:16 +03:00
log . Printf ( "failed to create file \"%s\": %s" , filename , err )
2018-10-07 00:51:44 +03:00
return err
}
defer f . Close ( )
n , err := f . Write ( zb . Bytes ( ) )
if err != nil {
log . Printf ( "Couldn't write to file: %s" , err )
return err
}
2018-10-09 05:02:16 +03:00
log . Printf ( "ok \"%s\": %v bytes written" , filename , n )
2018-10-07 00:51:44 +03:00
return nil
}
2018-10-10 19:44:07 +03:00
func checkBuffer ( buffer [ ] * logEntry , b bytes . Buffer ) error {
2018-10-07 00:51:44 +03:00
l := len ( buffer )
d := json . NewDecoder ( & b )
i := 0
for d . More ( ) {
2018-10-11 18:06:05 +03:00
entry := & logEntry { }
err := d . Decode ( entry )
2018-10-07 00:51:44 +03:00
if err != nil {
log . Printf ( "Failed to decode: %s" , err )
return err
}
if diff := deep . Equal ( entry , buffer [ i ] ) ; diff != nil {
log . Printf ( "decoded buffer differs: %s" , diff )
return fmt . Errorf ( "decoded buffer differs: %s" , diff )
}
i ++
}
if i != l {
err := fmt . Errorf ( "check fail: %d vs %d entries" , l , i )
log . Print ( err )
return err
}
log . Printf ( "check ok: %d entries" , i )
return nil
}
func rotateQueryLog ( ) error {
2018-10-09 05:02:16 +03:00
from := queryLogFileName
to := queryLogFileName + ".1"
if enableGzip {
from = queryLogFileName + ".gz"
to = queryLogFileName + ".gz.1"
}
2018-10-07 00:51:44 +03:00
if _ , err := os . Stat ( from ) ; os . IsNotExist ( err ) {
// do nothing, file doesn't exist
return nil
}
err := os . Rename ( from , to )
if err != nil {
log . Printf ( "Failed to rename querylog: %s" , err )
return err
}
log . Printf ( "Rotated from %s to %s successfully" , from , to )
return nil
}
2018-10-08 20:02:09 +03:00
func periodicQueryLogRotate ( ) {
for range time . Tick ( queryLogRotationPeriod ) {
2018-10-07 00:51:44 +03:00
err := rotateQueryLog ( )
if err != nil {
log . Printf ( "Failed to rotate querylog: %s" , err )
// do nothing, continue rotating
}
}
}
2018-10-07 02:17:22 +03:00
2018-10-07 23:24:04 +03:00
func genericLoader ( onEntry func ( entry * logEntry ) error , needMore func ( ) bool , timeWindow time . Duration ) error {
2018-10-07 02:17:22 +03:00
now := time . Now ( )
// read from querylog files, try newest file first
2018-10-09 05:02:16 +03:00
files := [ ] string { }
if enableGzip {
files = [ ] string {
queryLogFileName + ".gz" ,
queryLogFileName + ".gz.1" ,
}
} else {
files = [ ] string {
queryLogFileName ,
queryLogFileName + ".1" ,
}
2018-10-07 02:17:22 +03:00
}
// read from all files
for _ , file := range files {
2018-10-07 23:24:04 +03:00
if ! needMore ( ) {
2018-10-07 02:17:22 +03:00
break
}
if _ , err := os . Stat ( file ) ; os . IsNotExist ( err ) {
// do nothing, file doesn't exist
continue
}
f , err := os . Open ( file )
if err != nil {
log . Printf ( "Failed to open file \"%s\": %s" , file , err )
// try next file
continue
}
defer f . Close ( )
2018-10-09 05:02:16 +03:00
var d * json . Decoder
if enableGzip {
trace ( "Creating gzip reader" )
zr , err := gzip . NewReader ( f )
if err != nil {
log . Printf ( "Failed to create gzip reader: %s" , err )
continue
}
defer zr . Close ( )
2018-10-07 02:17:22 +03:00
2018-10-09 05:02:16 +03:00
trace ( "Creating json decoder" )
d = json . NewDecoder ( zr )
} else {
d = json . NewDecoder ( f )
}
2018-10-07 02:17:22 +03:00
i := 0
// entries on file are in oldest->newest order
// we want maxLen newest
for d . More ( ) {
2018-10-07 23:24:04 +03:00
if ! needMore ( ) {
break
}
2018-10-07 02:17:22 +03:00
var entry logEntry
err := d . Decode ( & entry )
if err != nil {
log . Printf ( "Failed to decode: %s" , err )
// next entry can be fine, try more
continue
}
if now . Sub ( entry . Time ) > timeWindow {
2018-10-11 18:33:07 +03:00
// trace("skipping entry") // debug logging
2018-10-07 02:17:22 +03:00
continue
}
i ++
2018-10-07 23:24:04 +03:00
err = onEntry ( & entry )
if err != nil {
return err
2018-10-07 02:17:22 +03:00
}
}
2018-10-09 04:45:05 +03:00
elapsed := time . Since ( now )
2018-10-12 20:36:57 +03:00
var perunit time . Duration
if i > 0 {
perunit = elapsed / time . Duration ( i )
}
log . Printf ( "file \"%s\": read %d entries in %v, %v/entry" , file , i , elapsed , perunit )
2018-10-07 02:17:22 +03:00
}
2018-10-07 23:24:04 +03:00
return nil
}
2018-10-10 19:44:07 +03:00
func appendFromLogFile ( values [ ] * logEntry , maxLen int , timeWindow time . Duration ) [ ] * logEntry {
a := [ ] * logEntry { }
2018-10-07 23:24:04 +03:00
onEntry := func ( entry * logEntry ) error {
2018-10-10 19:44:07 +03:00
a = append ( a , entry )
2018-10-07 23:24:04 +03:00
if len ( a ) > maxLen {
toskip := len ( a ) - maxLen
a = a [ toskip : ]
}
return nil
}
needMore := func ( ) bool {
return true
}
err := genericLoader ( onEntry , needMore , timeWindow )
if err != nil {
log . Printf ( "Failed to load entries from querylog: %s" , err )
return values
}
2018-10-07 02:17:22 +03:00
// now that we've read all eligible entries, reverse the slice to make it go from newest->oldest
for left , right := 0 , len ( a ) - 1 ; left < right ; left , right = left + 1 , right - 1 {
a [ left ] , a [ right ] = a [ right ] , a [ left ]
}
// append it to values
values = append ( values , a ... )
// then cut off of it is bigger than maxLen
if len ( values ) > maxLen {
values = values [ : maxLen ]
}
return values
}