From 1d9ef300b9e16f5bba2631816f6414e145c4987d Mon Sep 17 00:00:00 2001
From: Pavel Safonov
Date: Fri, 9 Sep 2022 15:16:48 +0300
Subject: [PATCH] postgres: first correct read of WAL file
---
format/format.go | 1 +
format/postgres/common/utils_test.go | 5 +
format/postgres/flavours/postgres14/pg_wal.go | 136 ++++++++++--------
.../flavours/postgres14/pg_wal_test.go | 31 ++++
format/postgres/pg_wal.go | 45 +++++-
format/postgres/pg_wal_test.go | 31 ++++
6 files changed, 186 insertions(+), 63 deletions(-)
create mode 100644 format/postgres/flavours/postgres14/pg_wal_test.go
create mode 100644 format/postgres/pg_wal_test.go
diff --git a/format/format.go b/format/format.go
index d6a5c7a8..f13b07b2 100644
--- a/format/format.go
+++ b/format/format.go
@@ -323,4 +323,5 @@ type BitCoinBlockIn struct {
type PostgresIn struct {
Flavour string `doc:"PostgreSQL flavour: postgres, postgres13, pgpro..."`
+ Lsn string `doc:"Current LSN for WAL, use \"select pg_current_wal_lsn()\""`
}
diff --git a/format/postgres/common/utils_test.go b/format/postgres/common/utils_test.go
index ccfc68b5..b7a80a79 100644
--- a/format/postgres/common/utils_test.go
+++ b/format/postgres/common/utils_test.go
@@ -31,6 +31,11 @@ func TestTypeAlign(t *testing.T) {
if expected4 != 8192*3 {
t.Errorf("must be 8192*3\n")
}
+
+ expected5 := common.TypeAlign(8192, 114720)
+ if expected5 != 122880 {
+ t.Errorf("must be 8192*3\n")
+ }
}
func TestTypeAlign8(t *testing.T) {
diff --git a/format/postgres/flavours/postgres14/pg_wal.go b/format/postgres/flavours/postgres14/pg_wal.go
index d0d6617a..0117ba6d 100644
--- a/format/postgres/flavours/postgres14/pg_wal.go
+++ b/format/postgres/flavours/postgres14/pg_wal.go
@@ -2,7 +2,6 @@ package postgres14
import (
"fmt"
-
"github.com/wader/fq/format/postgres/common"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/scalar"
@@ -126,6 +125,8 @@ func decodeXLogPageHeaderData(d *decode.D) {
}
type walD struct {
+ maxOffset int64
+
pages *decode.D
records *decode.D
@@ -135,26 +136,32 @@ type walD struct {
recordRemLenBytes int64
}
-func DecodePgwal(d *decode.D) any {
+func DecodePgwal(d *decode.D, maxOffset uint32) any {
pages := d.FieldArrayValue("Pages")
- walD := &walD{
+ wal := &walD{
+ maxOffset: int64(maxOffset),
pages: pages,
records: d.FieldArrayValue("Records"),
recordRemLenBytes: -1,
}
for {
- decodeXLogPage(walD, pages)
-
- posBytes := pages.Pos() / 8
- remBytes := posBytes % XLOG_BLCKSZ
- if remBytes != 0 {
- d.Fatalf("invalid page remBytes = %d\n", remBytes)
- }
+ decodeXLogPage(wal, pages)
if pages.End() {
break
}
+
+ posBytes := pages.Pos() / 8
+ if posBytes >= wal.maxOffset {
+ d.FieldRawLen("unused", d.BitsLeft())
+ break
+ }
+
+ remBytes := posBytes % XLOG_BLCKSZ
+ if remBytes != 0 {
+ d.Fatalf("invalid page remBytes = %d\n", remBytes)
+ }
}
return nil
@@ -233,75 +240,86 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
/* 20 | 4 */ // pg_crc32c xl_crc
posBytes1 := d.Pos() / 8
- posBytes1Aligned := common.TypeAlign8(uint64(posBytes1))
- d.SeekAbs(int64(posBytes1Aligned * 8))
+ posBytes1Aligned := int64(common.TypeAlign8(uint64(posBytes1)))
+ // check aligned - this is correct
+ // record header is 8 byte aligned
+ if posBytes1Aligned >= wal.maxOffset {
+ d.FieldRawLen("unused", d.BitsLeft())
+ break
+ }
+
+ // check what we cat read xl_tot_len on this page
+ if posMaxOfPageBytes < posBytes1Aligned+4 {
+ remOnPage := posMaxOfPageBytes - posBytes1
+ d.FieldRawLen("page_padding0", remOnPage*8)
+ // can't read xl_tot_len on this page
+ // can't create row in this page
+ // continue on next page
+ wal.record = nil
+ wal.recordRemLenBytes = -1
+ return
+ }
+
+ d.SeekAbs(posBytes1Aligned * 8)
record := pageRecords.FieldStructValue("XLogRecord")
wal.record = record
wal.records.AddChild(record.Value)
- xLogRecordBegin := record.Pos()
+ //xLogRecordBegin := record.Pos()
xlTotLen := record.FieldU32("xl_tot_len")
- record.FieldU32("xl_xid")
- record.FieldU64("xl_prev")
- record.FieldU8("xl_info")
- record.FieldU8("xl_rmid")
- record.U16()
- record.FieldU32("xl_crc")
- xLogRecordEnd := record.Pos()
- sizeOfXLogRecord := (xLogRecordEnd - xLogRecordBegin) / 8
- xLogRecordBodyLen := xlTotLen - uint64(sizeOfXLogRecord)
+ xlTotLen1Bytes := xlTotLen - 4
+ //xlTotLen1 := xlTotLen1Bytes * 8
- rawLen := int64(common.TypeAlign8(xLogRecordBodyLen))
- pos1Bytes := d.Pos() / 8
+ //pos2 := d.Pos()
+ pos2Bytes := d.Pos() / 8
- remOnPage := posMaxOfPageBytes - pos1Bytes
- if remOnPage < rawLen {
+ remOnPage := posMaxOfPageBytes - pos2Bytes
+ if remOnPage <= 0 {
+ d.Fatalf("remOnPage is negative\n")
+ }
+
+ if remOnPage < int64(xlTotLen1Bytes) {
record.FieldRawLen("xLogBody", remOnPage*8)
- wal.recordRemLenBytes = rawLen - remOnPage
+ wal.recordRemLenBytes = int64(xlTotLen1Bytes) - remOnPage
break
}
- record.FieldRawLen("xLogBody", rawLen*8)
+ xLogBodyLen := int64(xlTotLen1Bytes) * 8
+ if xLogBodyLen <= 0 {
+ d.Fatalf("xlTotLen1Bytes is negative, xLogBodyLen = %d\n", xLogBodyLen)
+ }
+
+ record.FieldRawLen("xLogBody", xLogBodyLen)
wal.recordRemLenBytes = -1
+ //xLogRecordBegin := record.Pos()
+ //xlTotLen := record.FieldU32("xl_tot_len")
+ //record.FieldU32("xl_xid")
+ //record.FieldU64("xl_prev")
+ //record.FieldU8("xl_info")
+ //record.FieldU8("xl_rmid")
+ //record.U16()
+ //record.FieldU32("xl_crc")
+ //xLogRecordEnd := record.Pos()
+ //sizeOfXLogRecord := (xLogRecordEnd - xLogRecordBegin) / 8
+ //
+ //xLogRecordBodyLen := xlTotLen - uint64(sizeOfXLogRecord)
+ //
+ //rawLen := int64(common.TypeAlign8(xLogRecordBodyLen))
//pos1Bytes := d.Pos() / 8
- //if pos1Bytes > posMaxOfPageBytes {
- // d.Fatalf("out of page, error in logic!")
- //}
-
- //pos := d.Pos() / 8
- //if pos >= posMaxOfPage {
+ //
+ //remOnPage := posMaxOfPageBytes - pos1Bytes
+ //if remOnPage < rawLen {
+ // record.FieldRawLen("xLogBody", remOnPage*8)
+ // wal.recordRemLenBytes = rawLen - remOnPage
// break
//}
//
- //pageRecords.FieldStruct("XLogRecord", func(d *decode.D) {
- // record := d
- // wal.record = record
- // wal.records.AddChild(record.Value)
- //
- // xLogRecordBegin := record.Pos()
- // xlTotLen := record.FieldU32("xl_tot_len")
- // record.FieldU32("xl_xid")
- // record.FieldU64("xl_prev")
- // record.FieldU8("xl_info")
- // record.FieldU8("xl_rmid")
- // record.U16()
- // record.FieldU32("xl_crc")
- // xLogRecordEnd := record.Pos()
- // sizeOfXLogRecord := (xLogRecordEnd - xLogRecordBegin) / 8
- //
- // xLogRecordBodyLen := xlTotLen - uint64(sizeOfXLogRecord)
- //
- // rawLen := int64(common.TypeAlign8(xLogRecordBodyLen))
- // record.FieldRawLen("xLogBody", rawLen*8)
- //})
+ //record.FieldRawLen("xLogBody", rawLen*8)
+ //wal.recordRemLenBytes = -1
- //pos := d.Pos()
- //if pos >= (4000 * 8) {
- // break
- //}
}
}
diff --git a/format/postgres/flavours/postgres14/pg_wal_test.go b/format/postgres/flavours/postgres14/pg_wal_test.go
new file mode 100644
index 00000000..02eafaca
--- /dev/null
+++ b/format/postgres/flavours/postgres14/pg_wal_test.go
@@ -0,0 +1,31 @@
+package postgres14_test
+
+import (
+ "github.com/wader/fq/format/postgres/flavours/postgres14"
+ "testing"
+)
+
+func TestParseLsn(t *testing.T) {
+ lsn1, err := postgres14.ParseLsn("0/4E394440")
+ if err != nil {
+ t.Fatalf("TestParseLsn 1, err = %v\n", err)
+ }
+ if lsn1 != 0x4E394440 {
+ t.Fatalf("TestParseLsn 2, invalid lsn value\n")
+ }
+
+ lsn2, err := postgres14.ParseLsn("0/4469E930")
+ if err != nil {
+ t.Fatalf("TestParseLsn 3, err = %v\n", err)
+ }
+ if lsn2 != 0x4469E930 {
+ t.Fatalf("TestParseLsn 4, invalid lsn value\n")
+ }
+}
+
+func TestXLogSegmentOffset(t *testing.T) {
+ offset := postgres14.XLogSegmentOffset(0x4E394440)
+ if offset == 0 {
+ t.Fatalf("TestXLogSegmentOffset 1, invalid offset\n")
+ }
+}
diff --git a/format/postgres/pg_wal.go b/format/postgres/pg_wal.go
index 30818d66..c308d98f 100644
--- a/format/postgres/pg_wal.go
+++ b/format/postgres/pg_wal.go
@@ -1,10 +1,14 @@
package postgres
import (
+ "errors"
+ "fmt"
"github.com/wader/fq/format"
"github.com/wader/fq/format/postgres/flavours/postgres14"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/interp"
+ "strconv"
+ "strings"
)
// TO DO
@@ -17,6 +21,7 @@ func init() {
DecodeFn: decodePgwal,
DecodeInArg: format.PostgresIn{
Flavour: "default",
+ Lsn: "",
},
})
}
@@ -32,6 +37,29 @@ const (
XLOG_PAGE_MAGIC_96 = uint16(0xD093)
)
+func ParseLsn(lsn string) (uint32, error) {
+ // check for 0/4E394440
+ str1 := lsn
+ if strings.Contains(lsn, "/") {
+ parts := strings.Split(lsn, "/")
+ if len(parts) != 2 {
+ return 0, errors.New(fmt.Sprintf("Invalid lsn = %s", lsn))
+ }
+ str1 = parts[1]
+ }
+ // parse hex to coded file name + file offset
+ r1, err := strconv.ParseInt(str1, 16, 64)
+ if err != nil {
+ return 0, err
+ }
+ return uint32(r1), err
+}
+
+func XLogSegmentOffset(xLogPtr uint32) uint32 {
+ const walSegSizeBytes = 16 * 1024 * 1024
+ return xLogPtr & (walSegSizeBytes - 1)
+}
+
func decodePgwal(d *decode.D, in any) any {
d.Endian = decode.LittleEndian
@@ -40,19 +68,28 @@ func decodePgwal(d *decode.D, in any) any {
d.Fatalf("DecodeInArg must be PostgresIn!\n")
}
+ maxOffset := uint32(0xFFFFFFFF)
+ if pgIn.Lsn != "" {
+ lsn, err := ParseLsn(pgIn.Lsn)
+ if err != nil {
+ d.Fatalf("Failed to ParseLsn, err = %v\n", err)
+ }
+ maxOffset = XLogSegmentOffset(lsn)
+ }
+
switch pgIn.Flavour {
//case PG_FLAVOUR_POSTGRES11:
// return postgres11.DecodePgControl(d, in)
case PG_FLAVOUR_POSTGRES14, PG_FLAVOUR_POSTGRES:
- return postgres14.DecodePgwal(d)
+ return postgres14.DecodePgwal(d, maxOffset)
//case PG_FLAVOUR_PGPROEE14:
// return pgproee14.DecodePgControl(d, in)
}
- return probePgwal(d, in)
+ return probePgwal(d, maxOffset)
}
-func probePgwal(d *decode.D, in any) any {
+func probePgwal(d *decode.D, maxOffset uint32) any {
// read version
xlpMagic := uint16(d.U16())
@@ -61,7 +98,7 @@ func probePgwal(d *decode.D, in any) any {
switch xlpMagic {
case XLOG_PAGE_MAGIC_14:
- return postgres14.DecodePgwal(d)
+ return postgres14.DecodePgwal(d, maxOffset)
}
d.Fatalf("unsupported xlp_magic = %X\n", xlpMagic)
diff --git a/format/postgres/pg_wal_test.go b/format/postgres/pg_wal_test.go
new file mode 100644
index 00000000..71c5fdcf
--- /dev/null
+++ b/format/postgres/pg_wal_test.go
@@ -0,0 +1,31 @@
+package postgres_test
+
+import (
+ "github.com/wader/fq/format/postgres"
+ "testing"
+)
+
+func TestParseLsn(t *testing.T) {
+ lsn1, err := postgres.ParseLsn("0/4E394440")
+ if err != nil {
+ t.Fatalf("TestParseLsn 1, err = %v\n", err)
+ }
+ if lsn1 != 0x4E394440 {
+ t.Fatalf("TestParseLsn 2, invalid lsn value\n")
+ }
+
+ lsn2, err := postgres.ParseLsn("0/4469E930")
+ if err != nil {
+ t.Fatalf("TestParseLsn 3, err = %v\n", err)
+ }
+ if lsn2 != 0x4469E930 {
+ t.Fatalf("TestParseLsn 4, invalid lsn value\n")
+ }
+}
+
+func TestXLogSegmentOffset(t *testing.T) {
+ offset := postgres.XLogSegmentOffset(0x4E394440)
+ if offset == 0 {
+ t.Fatalf("TestXLogSegmentOffset 1, invalid offset\n")
+ }
+}