mirror of
https://github.com/wader/fq.git
synced 2024-11-23 09:56:07 +03:00
postgres: first correct read of WAL file
This commit is contained in:
parent
586c803fa5
commit
1d9ef300b9
@ -323,4 +323,5 @@ type BitCoinBlockIn struct {
|
||||
|
||||
type PostgresIn struct {
|
||||
Flavour string `doc:"PostgreSQL flavour: postgres, postgres13, pgpro..."`
|
||||
Lsn string `doc:"Current LSN for WAL, use \"select pg_current_wal_lsn()\""`
|
||||
}
|
||||
|
@ -31,6 +31,11 @@ func TestTypeAlign(t *testing.T) {
|
||||
if expected4 != 8192*3 {
|
||||
t.Errorf("must be 8192*3\n")
|
||||
}
|
||||
|
||||
expected5 := common.TypeAlign(8192, 114720)
|
||||
if expected5 != 122880 {
|
||||
t.Errorf("must be 8192*3\n")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTypeAlign8(t *testing.T) {
|
||||
|
@ -2,7 +2,6 @@ package postgres14
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/wader/fq/format/postgres/common"
|
||||
"github.com/wader/fq/pkg/decode"
|
||||
"github.com/wader/fq/pkg/scalar"
|
||||
@ -126,6 +125,8 @@ func decodeXLogPageHeaderData(d *decode.D) {
|
||||
}
|
||||
|
||||
type walD struct {
|
||||
maxOffset int64
|
||||
|
||||
pages *decode.D
|
||||
records *decode.D
|
||||
|
||||
@ -135,26 +136,32 @@ type walD struct {
|
||||
recordRemLenBytes int64
|
||||
}
|
||||
|
||||
func DecodePgwal(d *decode.D) any {
|
||||
func DecodePgwal(d *decode.D, maxOffset uint32) any {
|
||||
pages := d.FieldArrayValue("Pages")
|
||||
walD := &walD{
|
||||
wal := &walD{
|
||||
maxOffset: int64(maxOffset),
|
||||
pages: pages,
|
||||
records: d.FieldArrayValue("Records"),
|
||||
recordRemLenBytes: -1,
|
||||
}
|
||||
|
||||
for {
|
||||
decodeXLogPage(walD, pages)
|
||||
|
||||
posBytes := pages.Pos() / 8
|
||||
remBytes := posBytes % XLOG_BLCKSZ
|
||||
if remBytes != 0 {
|
||||
d.Fatalf("invalid page remBytes = %d\n", remBytes)
|
||||
}
|
||||
decodeXLogPage(wal, pages)
|
||||
|
||||
if pages.End() {
|
||||
break
|
||||
}
|
||||
|
||||
posBytes := pages.Pos() / 8
|
||||
if posBytes >= wal.maxOffset {
|
||||
d.FieldRawLen("unused", d.BitsLeft())
|
||||
break
|
||||
}
|
||||
|
||||
remBytes := posBytes % XLOG_BLCKSZ
|
||||
if remBytes != 0 {
|
||||
d.Fatalf("invalid page remBytes = %d\n", remBytes)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -233,75 +240,86 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
|
||||
/* 20 | 4 */ // pg_crc32c xl_crc
|
||||
|
||||
posBytes1 := d.Pos() / 8
|
||||
posBytes1Aligned := common.TypeAlign8(uint64(posBytes1))
|
||||
d.SeekAbs(int64(posBytes1Aligned * 8))
|
||||
posBytes1Aligned := int64(common.TypeAlign8(uint64(posBytes1)))
|
||||
// check aligned - this is correct
|
||||
// record header is 8 byte aligned
|
||||
if posBytes1Aligned >= wal.maxOffset {
|
||||
d.FieldRawLen("unused", d.BitsLeft())
|
||||
break
|
||||
}
|
||||
|
||||
// check what we cat read xl_tot_len on this page
|
||||
if posMaxOfPageBytes < posBytes1Aligned+4 {
|
||||
remOnPage := posMaxOfPageBytes - posBytes1
|
||||
d.FieldRawLen("page_padding0", remOnPage*8)
|
||||
// can't read xl_tot_len on this page
|
||||
// can't create row in this page
|
||||
// continue on next page
|
||||
wal.record = nil
|
||||
wal.recordRemLenBytes = -1
|
||||
return
|
||||
}
|
||||
|
||||
d.SeekAbs(posBytes1Aligned * 8)
|
||||
|
||||
record := pageRecords.FieldStructValue("XLogRecord")
|
||||
wal.record = record
|
||||
wal.records.AddChild(record.Value)
|
||||
|
||||
xLogRecordBegin := record.Pos()
|
||||
//xLogRecordBegin := record.Pos()
|
||||
xlTotLen := record.FieldU32("xl_tot_len")
|
||||
record.FieldU32("xl_xid")
|
||||
record.FieldU64("xl_prev")
|
||||
record.FieldU8("xl_info")
|
||||
record.FieldU8("xl_rmid")
|
||||
record.U16()
|
||||
record.FieldU32("xl_crc")
|
||||
xLogRecordEnd := record.Pos()
|
||||
sizeOfXLogRecord := (xLogRecordEnd - xLogRecordBegin) / 8
|
||||
|
||||
xLogRecordBodyLen := xlTotLen - uint64(sizeOfXLogRecord)
|
||||
xlTotLen1Bytes := xlTotLen - 4
|
||||
//xlTotLen1 := xlTotLen1Bytes * 8
|
||||
|
||||
rawLen := int64(common.TypeAlign8(xLogRecordBodyLen))
|
||||
pos1Bytes := d.Pos() / 8
|
||||
//pos2 := d.Pos()
|
||||
pos2Bytes := d.Pos() / 8
|
||||
|
||||
remOnPage := posMaxOfPageBytes - pos1Bytes
|
||||
if remOnPage < rawLen {
|
||||
remOnPage := posMaxOfPageBytes - pos2Bytes
|
||||
if remOnPage <= 0 {
|
||||
d.Fatalf("remOnPage is negative\n")
|
||||
}
|
||||
|
||||
if remOnPage < int64(xlTotLen1Bytes) {
|
||||
record.FieldRawLen("xLogBody", remOnPage*8)
|
||||
wal.recordRemLenBytes = rawLen - remOnPage
|
||||
wal.recordRemLenBytes = int64(xlTotLen1Bytes) - remOnPage
|
||||
break
|
||||
}
|
||||
|
||||
record.FieldRawLen("xLogBody", rawLen*8)
|
||||
xLogBodyLen := int64(xlTotLen1Bytes) * 8
|
||||
if xLogBodyLen <= 0 {
|
||||
d.Fatalf("xlTotLen1Bytes is negative, xLogBodyLen = %d\n", xLogBodyLen)
|
||||
}
|
||||
|
||||
record.FieldRawLen("xLogBody", xLogBodyLen)
|
||||
wal.recordRemLenBytes = -1
|
||||
|
||||
//xLogRecordBegin := record.Pos()
|
||||
//xlTotLen := record.FieldU32("xl_tot_len")
|
||||
//record.FieldU32("xl_xid")
|
||||
//record.FieldU64("xl_prev")
|
||||
//record.FieldU8("xl_info")
|
||||
//record.FieldU8("xl_rmid")
|
||||
//record.U16()
|
||||
//record.FieldU32("xl_crc")
|
||||
//xLogRecordEnd := record.Pos()
|
||||
//sizeOfXLogRecord := (xLogRecordEnd - xLogRecordBegin) / 8
|
||||
//
|
||||
//xLogRecordBodyLen := xlTotLen - uint64(sizeOfXLogRecord)
|
||||
//
|
||||
//rawLen := int64(common.TypeAlign8(xLogRecordBodyLen))
|
||||
//pos1Bytes := d.Pos() / 8
|
||||
//if pos1Bytes > posMaxOfPageBytes {
|
||||
// d.Fatalf("out of page, error in logic!")
|
||||
//}
|
||||
|
||||
//pos := d.Pos() / 8
|
||||
//if pos >= posMaxOfPage {
|
||||
//
|
||||
//remOnPage := posMaxOfPageBytes - pos1Bytes
|
||||
//if remOnPage < rawLen {
|
||||
// record.FieldRawLen("xLogBody", remOnPage*8)
|
||||
// wal.recordRemLenBytes = rawLen - remOnPage
|
||||
// break
|
||||
//}
|
||||
//
|
||||
//pageRecords.FieldStruct("XLogRecord", func(d *decode.D) {
|
||||
// record := d
|
||||
// wal.record = record
|
||||
// wal.records.AddChild(record.Value)
|
||||
//
|
||||
// xLogRecordBegin := record.Pos()
|
||||
// xlTotLen := record.FieldU32("xl_tot_len")
|
||||
// record.FieldU32("xl_xid")
|
||||
// record.FieldU64("xl_prev")
|
||||
// record.FieldU8("xl_info")
|
||||
// record.FieldU8("xl_rmid")
|
||||
// record.U16()
|
||||
// record.FieldU32("xl_crc")
|
||||
// xLogRecordEnd := record.Pos()
|
||||
// sizeOfXLogRecord := (xLogRecordEnd - xLogRecordBegin) / 8
|
||||
//
|
||||
// xLogRecordBodyLen := xlTotLen - uint64(sizeOfXLogRecord)
|
||||
//
|
||||
// rawLen := int64(common.TypeAlign8(xLogRecordBodyLen))
|
||||
// record.FieldRawLen("xLogBody", rawLen*8)
|
||||
//})
|
||||
//record.FieldRawLen("xLogBody", rawLen*8)
|
||||
//wal.recordRemLenBytes = -1
|
||||
|
||||
//pos := d.Pos()
|
||||
//if pos >= (4000 * 8) {
|
||||
// break
|
||||
//}
|
||||
}
|
||||
}
|
||||
|
||||
|
31
format/postgres/flavours/postgres14/pg_wal_test.go
Normal file
31
format/postgres/flavours/postgres14/pg_wal_test.go
Normal file
@ -0,0 +1,31 @@
|
||||
package postgres14_test
|
||||
|
||||
import (
|
||||
"github.com/wader/fq/format/postgres/flavours/postgres14"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseLsn(t *testing.T) {
|
||||
lsn1, err := postgres14.ParseLsn("0/4E394440")
|
||||
if err != nil {
|
||||
t.Fatalf("TestParseLsn 1, err = %v\n", err)
|
||||
}
|
||||
if lsn1 != 0x4E394440 {
|
||||
t.Fatalf("TestParseLsn 2, invalid lsn value\n")
|
||||
}
|
||||
|
||||
lsn2, err := postgres14.ParseLsn("0/4469E930")
|
||||
if err != nil {
|
||||
t.Fatalf("TestParseLsn 3, err = %v\n", err)
|
||||
}
|
||||
if lsn2 != 0x4469E930 {
|
||||
t.Fatalf("TestParseLsn 4, invalid lsn value\n")
|
||||
}
|
||||
}
|
||||
|
||||
func TestXLogSegmentOffset(t *testing.T) {
|
||||
offset := postgres14.XLogSegmentOffset(0x4E394440)
|
||||
if offset == 0 {
|
||||
t.Fatalf("TestXLogSegmentOffset 1, invalid offset\n")
|
||||
}
|
||||
}
|
@ -1,10 +1,14 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/wader/fq/format"
|
||||
"github.com/wader/fq/format/postgres/flavours/postgres14"
|
||||
"github.com/wader/fq/pkg/decode"
|
||||
"github.com/wader/fq/pkg/interp"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// TO DO
|
||||
@ -17,6 +21,7 @@ func init() {
|
||||
DecodeFn: decodePgwal,
|
||||
DecodeInArg: format.PostgresIn{
|
||||
Flavour: "default",
|
||||
Lsn: "",
|
||||
},
|
||||
})
|
||||
}
|
||||
@ -32,6 +37,29 @@ const (
|
||||
XLOG_PAGE_MAGIC_96 = uint16(0xD093)
|
||||
)
|
||||
|
||||
func ParseLsn(lsn string) (uint32, error) {
|
||||
// check for 0/4E394440
|
||||
str1 := lsn
|
||||
if strings.Contains(lsn, "/") {
|
||||
parts := strings.Split(lsn, "/")
|
||||
if len(parts) != 2 {
|
||||
return 0, errors.New(fmt.Sprintf("Invalid lsn = %s", lsn))
|
||||
}
|
||||
str1 = parts[1]
|
||||
}
|
||||
// parse hex to coded file name + file offset
|
||||
r1, err := strconv.ParseInt(str1, 16, 64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return uint32(r1), err
|
||||
}
|
||||
|
||||
func XLogSegmentOffset(xLogPtr uint32) uint32 {
|
||||
const walSegSizeBytes = 16 * 1024 * 1024
|
||||
return xLogPtr & (walSegSizeBytes - 1)
|
||||
}
|
||||
|
||||
func decodePgwal(d *decode.D, in any) any {
|
||||
d.Endian = decode.LittleEndian
|
||||
|
||||
@ -40,19 +68,28 @@ func decodePgwal(d *decode.D, in any) any {
|
||||
d.Fatalf("DecodeInArg must be PostgresIn!\n")
|
||||
}
|
||||
|
||||
maxOffset := uint32(0xFFFFFFFF)
|
||||
if pgIn.Lsn != "" {
|
||||
lsn, err := ParseLsn(pgIn.Lsn)
|
||||
if err != nil {
|
||||
d.Fatalf("Failed to ParseLsn, err = %v\n", err)
|
||||
}
|
||||
maxOffset = XLogSegmentOffset(lsn)
|
||||
}
|
||||
|
||||
switch pgIn.Flavour {
|
||||
//case PG_FLAVOUR_POSTGRES11:
|
||||
// return postgres11.DecodePgControl(d, in)
|
||||
case PG_FLAVOUR_POSTGRES14, PG_FLAVOUR_POSTGRES:
|
||||
return postgres14.DecodePgwal(d)
|
||||
return postgres14.DecodePgwal(d, maxOffset)
|
||||
//case PG_FLAVOUR_PGPROEE14:
|
||||
// return pgproee14.DecodePgControl(d, in)
|
||||
}
|
||||
|
||||
return probePgwal(d, in)
|
||||
return probePgwal(d, maxOffset)
|
||||
}
|
||||
|
||||
func probePgwal(d *decode.D, in any) any {
|
||||
func probePgwal(d *decode.D, maxOffset uint32) any {
|
||||
// read version
|
||||
xlpMagic := uint16(d.U16())
|
||||
|
||||
@ -61,7 +98,7 @@ func probePgwal(d *decode.D, in any) any {
|
||||
|
||||
switch xlpMagic {
|
||||
case XLOG_PAGE_MAGIC_14:
|
||||
return postgres14.DecodePgwal(d)
|
||||
return postgres14.DecodePgwal(d, maxOffset)
|
||||
}
|
||||
|
||||
d.Fatalf("unsupported xlp_magic = %X\n", xlpMagic)
|
||||
|
31
format/postgres/pg_wal_test.go
Normal file
31
format/postgres/pg_wal_test.go
Normal file
@ -0,0 +1,31 @@
|
||||
package postgres_test
|
||||
|
||||
import (
|
||||
"github.com/wader/fq/format/postgres"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseLsn(t *testing.T) {
|
||||
lsn1, err := postgres.ParseLsn("0/4E394440")
|
||||
if err != nil {
|
||||
t.Fatalf("TestParseLsn 1, err = %v\n", err)
|
||||
}
|
||||
if lsn1 != 0x4E394440 {
|
||||
t.Fatalf("TestParseLsn 2, invalid lsn value\n")
|
||||
}
|
||||
|
||||
lsn2, err := postgres.ParseLsn("0/4469E930")
|
||||
if err != nil {
|
||||
t.Fatalf("TestParseLsn 3, err = %v\n", err)
|
||||
}
|
||||
if lsn2 != 0x4469E930 {
|
||||
t.Fatalf("TestParseLsn 4, invalid lsn value\n")
|
||||
}
|
||||
}
|
||||
|
||||
func TestXLogSegmentOffset(t *testing.T) {
|
||||
offset := postgres.XLogSegmentOffset(0x4E394440)
|
||||
if offset == 0 {
|
||||
t.Fatalf("TestXLogSegmentOffset 1, invalid offset\n")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user