From 712023112d96a4c0a951442bfb1dcbbcd7179786 Mon Sep 17 00:00:00 2001 From: Andrey Meshkov Date: Thu, 20 Feb 2020 19:38:11 +0300 Subject: [PATCH] *(dnsforward): finished new qlog_file implementation --- querylog/qlog_file.go | 107 +++++++++++++++++--------- querylog/qlog_file_test.go | 150 ++++++++++++++++++++++++++++++++++++- 2 files changed, 216 insertions(+), 41 deletions(-) diff --git a/querylog/qlog_file.go b/querylog/qlog_file.go index 83f0d30d..63557098 100644 --- a/querylog/qlog_file.go +++ b/querylog/qlog_file.go @@ -6,12 +6,14 @@ import ( "sync" "time" + "github.com/AdguardTeam/golibs/log" + "github.com/pkg/errors" ) var ErrSeekNotFound = errors.New("Seek not found the record") -const bufferSize = 64 * 1024 // 64 KB is the buffer size +const bufferSize = 256 * 1024 // 256 KB is the buffer size type QLogFile struct { file *os.File // the query log file @@ -48,12 +50,16 @@ func NewQLogFile(path string) (*QLogFile, error) { // it shifts seek position to 3/4 of the file. Otherwise, to 1/4 of the file. // 5. It performs the search again, every time the search scope is narrowed twice. // -// It returns the position of the line with the timestamp we were looking for. +// It returns the position of the the line with the timestamp we were looking for +// so that when we call "ReadNext" this line was returned. // If we could not find it, it returns 0 and ErrSeekNotFound func (q *QLogFile) Seek(timestamp uint64) (int64, error) { q.lock.Lock() defer q.lock.Unlock() + // Empty the buffer + q.buffer = nil + // First of all, check the file size fileInfo, err := q.file.Stat() if err != nil { @@ -61,38 +67,64 @@ func (q *QLogFile) Seek(timestamp uint64) (int64, error) { } // Define the search scope - start := int64(0) - end := fileInfo.Size() - probe := (end - start) / 2 + start := int64(0) // start of the search interval (position in the file) + end := fileInfo.Size() // end of the search interval (position in the file) + probe := (end - start) / 2 // probe -- approximate index of the line we'll try to check + var line string + var lineIdx int64 // index of the probe line in the file + var lastProbeLineIdx int64 // index of the last probe line - // Get the line - line, _, err := q.readProbeLine(probe) - if err != nil { - return 0, err + // Count seek depth in order to detect mistakes + // If depth is too large, we should stop the search + depth := 0 + + for { + // Get the line at the specified position + line, lineIdx, err = q.readProbeLine(probe) + if err != nil { + return 0, err + } + + // Get the timestamp from the query log record + ts := q.readTimestamp(line) + + if ts == 0 { + return 0, ErrSeekNotFound + } + + if ts == timestamp { + // Hurray, returning the result + break + } + + if lastProbeLineIdx == lineIdx { + // If we're testing the same line twice then most likely + // the scope is too narrow and we won't find anything anymore + return 0, ErrSeekNotFound + } + + // Narrow the scope and repeat the search + if ts > timestamp { + // If the timestamp we're looking for is OLDER than what we found + // Then the line is somewhere on the LEFT side from the current probe position + end = probe + probe = start + (end-start)/2 + } else { + // If the timestamp we're looking for is NEWER than what we found + // Then the line is somewhere on the RIGHT side from the current probe position + start = probe + probe = start + (end-start)/2 + } + + depth++ + if depth >= 100 { + log.Error("Seek depth is too high, aborting. File %s, ts %v", q.file.Name(), timestamp) + return 0, ErrSeekNotFound + } } - // Get the timestamp from the query log record - ts := q.readTimestamp(line) - - if ts == timestamp { - // Hurray, returning the result - return probe, nil - } - - // Narrow the scope and repeat the search - if ts > timestamp { - end := probe - probe = (end - start) / 2 - } else { - start := probe - probe = (end - start) / 2 - } - - // TODO: temp - q.position = probe - - // TODO: Check start/stop/probe values and loop this - return 0, ErrSeekNotFound + q.position = lineIdx + int64(len(line)) + return q.position, nil } // SeekStart changes the current position to the end of the file @@ -102,6 +134,9 @@ func (q *QLogFile) SeekStart() (int64, error) { q.lock.Lock() defer q.lock.Unlock() + // Empty the buffer + q.buffer = nil + // First of all, check the file size fileInfo, err := q.file.Stat() if err != nil { @@ -111,7 +146,6 @@ func (q *QLogFile) SeekStart() (int64, error) { // Place the position to the very end of file q.position = fileInfo.Size() - 1 if q.position < 0 { - // TODO: test empty file q.position = 0 } return q.position, nil @@ -160,12 +194,13 @@ func (q *QLogFile) Close() error { // 4. read the line from the buffer func (q *QLogFile) readNextLine(position int64) (string, int64, error) { relativePos := position - q.bufferStart - if q.buffer == nil || relativePos < maxEntrySize { + if q.buffer == nil || (relativePos < maxEntrySize && q.bufferStart != 0) { // Time to re-init the buffer err := q.initBuffer(position) if err != nil { return "", 0, err } + relativePos = position - q.bufferStart } // Look for the end of the prev line @@ -201,7 +236,6 @@ func (q *QLogFile) initBuffer(position int64) error { q.buffer = make([]byte, bufferSize) } q.bufferLen, err = q.file.Read(q.buffer) - // TODO: validate bufferLen if err != nil { return err } @@ -218,7 +252,6 @@ func (q *QLogFile) readProbeLine(position int64) (string, int64, error) { seekPosition := int64(0) relativePos := position // position relative to the buffer we're going to read if (position - maxEntrySize) > 0 { - // TODO: cover this case in tests seekPosition = position - maxEntrySize relativePos = maxEntrySize } @@ -267,12 +300,12 @@ func (q *QLogFile) readTimestamp(str string) uint64 { } if len(val) == 0 { - // TODO: log + log.Error("Couldn't find timestamp in %s: %s", q.file.Name(), str) return 0 } tm, err := time.Parse(time.RFC3339, val) if err != nil { - // TODO: log + log.Error("Couldn't parse timestamp in %s: %s", q.file.Name(), val) return 0 } return uint64(tm.UnixNano()) diff --git a/querylog/qlog_file_test.go b/querylog/qlog_file_test.go index 6ba5780f..e9bc70e8 100644 --- a/querylog/qlog_file_test.go +++ b/querylog/qlog_file_test.go @@ -14,15 +14,157 @@ import ( ) func TestQLogFileEmpty(t *testing.T) { - // TODO: test empty file + testDir := prepareTestDir() + defer func() { _ = os.RemoveAll(testDir) }() + testFile := prepareTestFile(testDir, 0) + + // create the new QLogFile instance + q, err := NewQLogFile(testFile) + assert.Nil(t, err) + assert.NotNil(t, q) + + // seek to the start + pos, err := q.SeekStart() + assert.Nil(t, err) + assert.Equal(t, int64(0), pos) + + // try reading anyway + line, err := q.ReadNext() + assert.Equal(t, io.EOF, err) + assert.Equal(t, "", line) } func TestQLogFileLarge(t *testing.T) { - // TODO: test reading large file + // should be large enough + count := 50000 + + testDir := prepareTestDir() + defer func() { _ = os.RemoveAll(testDir) }() + testFile := prepareTestFile(testDir, count) + + // create the new QLogFile instance + q, err := NewQLogFile(testFile) + assert.Nil(t, err) + assert.NotNil(t, q) + + // seek to the start + pos, err := q.SeekStart() + assert.Nil(t, err) + assert.NotEqual(t, int64(0), pos) + + read := 0 + var line string + for err == nil { + line, err = q.ReadNext() + if err == nil { + assert.True(t, len(line) > 0) + read += 1 + } + } + + assert.Equal(t, count, read) + assert.Equal(t, io.EOF, err) } -func TestQLogFileSeek(t *testing.T) { - // TODO: test seek method on a small file +func TestQLogFileSeekLargeFile(t *testing.T) { + // more or less big file + count := 10000 + + testDir := prepareTestDir() + defer func() { _ = os.RemoveAll(testDir) }() + testFile := prepareTestFile(testDir, count) + + // create the new QLogFile instance + q, err := NewQLogFile(testFile) + assert.Nil(t, err) + assert.NotNil(t, q) + + // CASE 1: NOT TOO OLD LINE + testSeekLine(t, q, 300) + + // CASE 2: OLD LINE + testSeekLine(t, q, count-300) + + // CASE 3: FIRST LINE + testSeekLine(t, q, 0) + + // CASE 4: LAST LINE + testSeekLine(t, q, count) + + // CASE 5: Seek non-existent (too low) + _, err = q.Seek(123) + assert.NotNil(t, err) + + // CASE 6: Seek non-existent (too high) + ts, _ := time.Parse(time.RFC3339, "2100-01-02T15:04:05Z07:00") + _, err = q.Seek(uint64(ts.UnixNano())) + assert.NotNil(t, err) +} + +func TestQLogFileSeekSmallFile(t *testing.T) { + // more or less big file + count := 10 + + testDir := prepareTestDir() + defer func() { _ = os.RemoveAll(testDir) }() + testFile := prepareTestFile(testDir, count) + + // create the new QLogFile instance + q, err := NewQLogFile(testFile) + assert.Nil(t, err) + assert.NotNil(t, q) + + // CASE 1: NOT TOO OLD LINE + testSeekLine(t, q, 2) + + // CASE 2: OLD LINE + testSeekLine(t, q, count-2) + + // CASE 3: FIRST LINE + testSeekLine(t, q, 0) + + // CASE 4: LAST LINE + testSeekLine(t, q, count) + + // CASE 5: Seek non-existent (too low) + _, err = q.Seek(123) + assert.NotNil(t, err) + + // CASE 6: Seek non-existent (too high) + ts, _ := time.Parse(time.RFC3339, "2100-01-02T15:04:05Z07:00") + _, err = q.Seek(uint64(ts.UnixNano())) + assert.NotNil(t, err) +} + +func testSeekLine(t *testing.T, q *QLogFile, lineNumber int) { + line, err := getQLogLine(q, lineNumber) + assert.Nil(t, err) + ts := q.readTimestamp(line) + assert.NotEqual(t, uint64(0), ts) + + // try seeking to that line now + pos, err := q.Seek(ts) + assert.Nil(t, err) + assert.NotEqual(t, int64(0), pos) + + testLine, err := q.ReadNext() + assert.Nil(t, err) + assert.Equal(t, line, testLine) +} + +func getQLogLine(q *QLogFile, lineNumber int) (string, error) { + _, err := q.SeekStart() + if err != nil { + return "", err + } + + for i := 1; i < lineNumber; i++ { + _, err := q.ReadNext() + if err != nil { + return "", err + } + } + return q.ReadNext() } // Check adding and loading (with filtering) entries from disk and memory