1
1
mirror of https://github.com/wader/fq.git synced 2025-01-08 15:39:56 +03:00

postgres: try to implement wal

This commit is contained in:
Pavel Safonov 2022-09-09 07:55:36 +03:00
parent c2591ac80a
commit 586c803fa5
5 changed files with 216 additions and 138 deletions

View File

@ -6,6 +6,33 @@ import (
"github.com/wader/fq/format/postgres/common"
)
func TestTypeAlign(t *testing.T) {
expected0 := common.TypeAlign(8192, 8192+0)
if expected0 != 8192 {
t.Errorf("must be 8192\n")
}
expected1 := common.TypeAlign(8192, 8192+100)
if expected1 != 8192*2 {
t.Errorf("must be 8192*2\n")
}
expected2 := common.TypeAlign(8192, 0)
if expected2 != 0 {
t.Errorf("must be 0\n")
}
expected3 := common.TypeAlign(8192, 700)
if expected3 != 8192 {
t.Errorf("must be 8192\n")
}
expected4 := common.TypeAlign(8192, 8192*2+5000)
if expected4 != 8192*3 {
t.Errorf("must be 8192*3\n")
}
}
func TestTypeAlign8(t *testing.T) {
expected39 := common.TypeAlign8(39)
if expected39 != 40 {

View File

@ -8,14 +8,6 @@ import (
"github.com/wader/fq/pkg/scalar"
)
//func init() {
// interp.RegisterFormat(decode.Format{
// Name: format.PG_WAL,
// Description: "PostgreSQL write-ahead log file",
// DecodeFn: DecodePgwal,
// })
//}
//nolint:revive
const (
XLOG_BLCKSZ = 8192
@ -70,7 +62,16 @@ var rmgrIds = scalar.UToScalar{
21: {Sym: "LogicalMessage", Description: "RM_LOGICALMSG_ID"},
}
// type = struct XLogPageHeaderData {
// struct XLogLongPageHeaderData {
// /* 0 | 24 */ XLogPageHeaderData std;
// /* 24 | 8 */ uint64 xlp_sysid;
// /* 32 | 4 */ uint32 xlp_seg_size;
// /* 36 | 4 */ uint32 xlp_xlog_blcksz;
//
// /* total size (bytes): 40 */
//}
// struct XLogPageHeaderData {
/* 0 | 2 */ // uint16 xlp_magic;
/* 2 | 2 */ // uint16 xlp_info;
/* 4 | 4 */ // TimeLineID xlp_tli;
@ -80,7 +81,7 @@ var rmgrIds = scalar.UToScalar{
//
/* total size (bytes): 24 */
// type = struct XLogRecord {
// struct XLogRecord {
/* 0 | 4 */ // uint32 xl_tot_len
/* 4 | 4 */ // TransactionId xl_xid
/* 8 | 8 */ // XLogRecPtr xl_prev
@ -129,26 +130,40 @@ type walD struct {
records *decode.D
pageRecords *decode.D
remLen uint32
record *decode.D
record *decode.D
recordRemLenBytes int64
}
func DecodePgwal(d *decode.D, in any) any {
func DecodePgwal(d *decode.D) any {
pages := d.FieldArrayValue("Pages")
walD := &walD{
pages: d.FieldArrayValue("pages"),
records: d.FieldArrayValue("records"),
pages: pages,
records: d.FieldArrayValue("Records"),
recordRemLenBytes: -1,
}
d.SeekAbs(0)
d.FieldArray("XLogPages", func(d *decode.D) {
decodeXLogPage(walD, d)
})
for {
decodeXLogPage(walD, pages)
posBytes := pages.Pos() / 8
remBytes := posBytes % XLOG_BLCKSZ
if remBytes != 0 {
d.Fatalf("invalid page remBytes = %d\n", remBytes)
}
if pages.End() {
break
}
}
return nil
}
func decodeXLogPage(wal *walD, d *decode.D) {
xLogPage := d.FieldStructValue("Page")
// type = struct XLogPageHeaderData {
/* 0 | 2 */ // uint16 xlp_magic;
/* 2 | 2 */ // uint16 xlp_info;
@ -156,51 +171,57 @@ func decodeXLogPage(wal *walD, d *decode.D) {
/* 8 | 8 */ // XLogRecPtr xlp_pageaddr;
/* 16 | 4 */ // uint32 xlp_rem_len;
/* XXX 4-byte padding */
page := wal.pages.FieldStructValue("XLogPageHeaderData")
header := xLogPage.FieldStructValue("XLogPageHeaderData")
page.FieldU16("xlp_magic")
xlpInfo := page.FieldU16("xlp_info")
page.FieldU32("xlp_tli")
page.FieldU64("xlp_pageaddr")
page.FieldU32("xlp_rem_len")
page.U32()
header.FieldU16("xlp_magic")
xlpInfo := header.FieldU16("xlp_info")
header.FieldU32("xlp_tli")
header.FieldU64("xlp_pageaddr")
remLenBytes := header.FieldU32("xlp_rem_len")
header.FieldU32("padding0")
if xlpInfo&XLP_LONG_HEADER != 0 {
// Long header
d.FieldStruct("XLogLongPageHeaderData", func(d *decode.D) {
header.FieldStruct("XLogLongPageHeaderData", func(d *decode.D) {
d.FieldU64("xlp_sysid")
d.FieldU32("xlp_seg_size")
d.FieldU32("xlp_xlog_blcksz")
})
}
remLen := 40
wal.remLen = uint32(remLen)
record := wal.record
if record == nil {
rawLen := int64(common.TypeAlign8(uint64(remLen)))
page.FieldRawLen("prev_file_rec", rawLen*8)
if wal.recordRemLenBytes >= 0 {
if wal.recordRemLenBytes != int64(remLenBytes) {
d.Fatalf("incorrect wal.recordRemLenBytes = %d, remLenBytes = %d", wal.recordRemLenBytes, remLenBytes)
}
}
pageRecords := page.FieldArrayValue("records")
remLenBytesAligned := common.TypeAlign8(remLenBytes)
remLen := remLenBytesAligned * 8
pos1 := header.Pos()
xLogPage.SeekAbs(pos1)
// TODO
xLogPage.FieldRawLen("RecordOfPreviousPage", int64(remLen))
pos2 := xLogPage.Pos()
if wal.record != nil {
wal.record.SeekAbs(pos1)
}
xLogPage.SeekAbs(pos2)
pageRecords := xLogPage.FieldArrayValue("Records")
wal.pageRecords = pageRecords
decodeXLogRecords(wal, d)
//page.Pos()
//for {
//
//}
//fmt.Printf("d pos = %d\n", d.Pos())
}
func decodeXLogRecords(wal *walD, d *decode.D) {
pageRecords := wal.pageRecords
pos := d.Pos() / 8
posMaxOfPage := int64(common.TypeAlign(8192, uint64(pos)))
fmt.Printf("posMaxOfPage = %d\n", posMaxOfPage)
posBytes := d.Pos() / 8
posMaxOfPageBytes := int64(common.TypeAlign(8192, uint64(posBytes)))
fmt.Printf("posMaxOfPageBytes = %d\n", posMaxOfPageBytes)
for {
/* 0 | 4 */ // uint32 xl_tot_len
@ -211,52 +232,71 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
/* XXX 2-byte hole */
/* 20 | 4 */ // pg_crc32c xl_crc
//record := page.FieldStructValue("XLogRecord")
//wal.record = record
//wal.records.AddChild(record.Value)
//
//xLogRecordBegin := record.Pos()
//xlTotLen := record.FieldU32("xl_tot_len")
//record.FieldU32("xl_xid")
//record.FieldU64("xl_prev")
//record.FieldU8("xl_info")
//record.FieldU8("xl_rmid")
//record.U16()
//record.FieldU32("xl_crc")
//xLogRecordEnd := record.Pos()
//sizeOfXLogRecord := (xLogRecordEnd - xLogRecordBegin) / 8
//
//xLogRecordBodyLen := xlTotLen - uint64(sizeOfXLogRecord)
//
//rawLen := int64(TypeAlign8(xLogRecordBodyLen))
//page.FieldRawLen("xLogBody", rawLen*8)
posBytes1 := d.Pos() / 8
posBytes1Aligned := common.TypeAlign8(uint64(posBytes1))
d.SeekAbs(int64(posBytes1Aligned * 8))
pos := d.Pos() / 8
if pos >= posMaxOfPage {
record := pageRecords.FieldStructValue("XLogRecord")
wal.record = record
wal.records.AddChild(record.Value)
xLogRecordBegin := record.Pos()
xlTotLen := record.FieldU32("xl_tot_len")
record.FieldU32("xl_xid")
record.FieldU64("xl_prev")
record.FieldU8("xl_info")
record.FieldU8("xl_rmid")
record.U16()
record.FieldU32("xl_crc")
xLogRecordEnd := record.Pos()
sizeOfXLogRecord := (xLogRecordEnd - xLogRecordBegin) / 8
xLogRecordBodyLen := xlTotLen - uint64(sizeOfXLogRecord)
rawLen := int64(common.TypeAlign8(xLogRecordBodyLen))
pos1Bytes := d.Pos() / 8
remOnPage := posMaxOfPageBytes - pos1Bytes
if remOnPage < rawLen {
record.FieldRawLen("xLogBody", remOnPage*8)
wal.recordRemLenBytes = rawLen - remOnPage
break
}
pageRecords.FieldStruct("XLogRecord", func(d *decode.D) {
record := d
wal.record = record
wal.records.AddChild(record.Value)
record.FieldRawLen("xLogBody", rawLen*8)
wal.recordRemLenBytes = -1
xLogRecordBegin := record.Pos()
xlTotLen := record.FieldU32("xl_tot_len")
record.FieldU32("xl_xid")
record.FieldU64("xl_prev")
record.FieldU8("xl_info")
record.FieldU8("xl_rmid")
record.U16()
record.FieldU32("xl_crc")
xLogRecordEnd := record.Pos()
sizeOfXLogRecord := (xLogRecordEnd - xLogRecordBegin) / 8
//pos1Bytes := d.Pos() / 8
//if pos1Bytes > posMaxOfPageBytes {
// d.Fatalf("out of page, error in logic!")
//}
xLogRecordBodyLen := xlTotLen - uint64(sizeOfXLogRecord)
rawLen := int64(common.TypeAlign8(xLogRecordBodyLen))
record.FieldRawLen("xLogBody", rawLen*8)
})
//pos := d.Pos() / 8
//if pos >= posMaxOfPage {
// break
//}
//
//pageRecords.FieldStruct("XLogRecord", func(d *decode.D) {
// record := d
// wal.record = record
// wal.records.AddChild(record.Value)
//
// xLogRecordBegin := record.Pos()
// xlTotLen := record.FieldU32("xl_tot_len")
// record.FieldU32("xl_xid")
// record.FieldU64("xl_prev")
// record.FieldU8("xl_info")
// record.FieldU8("xl_rmid")
// record.U16()
// record.FieldU32("xl_crc")
// xLogRecordEnd := record.Pos()
// sizeOfXLogRecord := (xLogRecordEnd - xLogRecordBegin) / 8
//
// xLogRecordBodyLen := xlTotLen - uint64(sizeOfXLogRecord)
//
// rawLen := int64(common.TypeAlign8(xLogRecordBodyLen))
// record.FieldRawLen("xLogBody", rawLen*8)
//})
//pos := d.Pos()
//if pos >= (4000 * 8) {

View File

@ -1,60 +1,69 @@
package postgres
import (
"github.com/wader/fq/format"
"github.com/wader/fq/format/postgres/flavours/postgres14"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/interp"
)
// TO DO
// not ready yet
//func init() {
// interp.RegisterFormat(decode.Format{
// Name: format.PG_WAL,
// Description: "PostgreSQL write-ahead log file",
// DecodeFn: decodePgwal,
// DecodeInArg: format.PostgresIn{
// Flavour: "default",
// },
// })
//}
//
func init() {
interp.RegisterFormat(decode.Format{
Name: format.PG_WAL,
Description: "PostgreSQL write-ahead log file",
DecodeFn: decodePgwal,
DecodeInArg: format.PostgresIn{
Flavour: "default",
},
})
}
//// https://pgpedia.info/x/XLOG_PAGE_MAGIC.html
//const (
// XLOG_PAGE_MAGIC_15 = uint16(0xD10F)
// XLOG_PAGE_MAGIC_14 = uint16(0xD10D)
// XLOG_PAGE_MAGIC_13 = uint16(0xD106)
// XLOG_PAGE_MAGIC_12 = uint16(0xD101)
// XLOG_PAGE_MAGIC_11 = uint16(0xD098)
// XLOG_PAGE_MAGIC_10 = uint16(0xD097)
// XLOG_PAGE_MAGIC_96 = uint16(0xD093)
//)
//
//func decodePgwal(d *decode.D, in any) any {
// d.Endian = decode.LittleEndian
//
// flavour := in.(format.PostgresIn).Flavour
// switch flavour {
// //case PG_FLAVOUR_POSTGRES11:
// // return postgres11.DecodePgControl(d, in)
// case PG_FLAVOUR_POSTGRES14, PG_FLAVOUR_POSTGRES:
// return postgres14.DecodePgwal(d, in)
// //case PG_FLAVOUR_PGPROEE14:
// // return pgproee14.DecodePgControl(d, in)
// default:
// break
// }
//
// return probePgwal(d, in)
//}
//
//func probePgwal(d *decode.D, in any) any {
// // read version
// xlp_magic := uint16(d.U16())
//
// // restore position
// d.SeekAbs(0)
//
// switch xlp_magic {
// case XLOG_PAGE_MAGIC_14:
// return postgres14.DecodePgwal(d, in)
// default:
// d.Fatalf("unsupported xlp_magic = %X\n", xlp_magic)
// }
// return nil
//}
const (
XLOG_PAGE_MAGIC_15 = uint16(0xD10F)
XLOG_PAGE_MAGIC_14 = uint16(0xD10D)
XLOG_PAGE_MAGIC_13 = uint16(0xD106)
XLOG_PAGE_MAGIC_12 = uint16(0xD101)
XLOG_PAGE_MAGIC_11 = uint16(0xD098)
XLOG_PAGE_MAGIC_10 = uint16(0xD097)
XLOG_PAGE_MAGIC_96 = uint16(0xD093)
)
func decodePgwal(d *decode.D, in any) any {
d.Endian = decode.LittleEndian
pgIn, ok := in.(format.PostgresIn)
if !ok {
d.Fatalf("DecodeInArg must be PostgresIn!\n")
}
switch pgIn.Flavour {
//case PG_FLAVOUR_POSTGRES11:
// return postgres11.DecodePgControl(d, in)
case PG_FLAVOUR_POSTGRES14, PG_FLAVOUR_POSTGRES:
return postgres14.DecodePgwal(d)
//case PG_FLAVOUR_PGPROEE14:
// return pgproee14.DecodePgControl(d, in)
}
return probePgwal(d, in)
}
func probePgwal(d *decode.D, in any) any {
// read version
xlpMagic := uint16(d.U16())
// restore position
d.SeekAbs(0)
switch xlpMagic {
case XLOG_PAGE_MAGIC_14:
return postgres14.DecodePgwal(d)
}
d.Fatalf("unsupported xlp_magic = %X\n", xlpMagic)
return nil
}

View File

@ -0,0 +1 @@
0000*

View File

@ -0,0 +1 @@
0000*