diff --git a/format/postgres/flavours/postgres14/pg_wal.go b/format/postgres/flavours/postgres14/pg_wal.go index 5b3f7f4e..8583afaa 100644 --- a/format/postgres/flavours/postgres14/pg_wal.go +++ b/format/postgres/flavours/postgres14/pg_wal.go @@ -1,7 +1,6 @@ package postgres14 import ( - "fmt" "github.com/wader/fq/format/postgres/common" "github.com/wader/fq/pkg/decode" "github.com/wader/fq/pkg/scalar" @@ -136,6 +135,7 @@ func decodeXLogPageHeaderData(d *decode.D) { type walD struct { maxOffset int64 + page *walPage pages *decode.D records *decode.D @@ -150,6 +150,10 @@ type walState struct { recordRemLenBytes int64 } +type walPage struct { + xlpPageAddr uint64 +} + func DecodePgwal(d *decode.D, maxOffset uint32) any { pages := d.FieldArrayValue("Pages") wal := &walD{ @@ -181,8 +185,17 @@ func DecodePgwal(d *decode.D, maxOffset uint32) any { } func decodeXLogPage(wal *walD, d *decode.D) { - - xLogPage := d.FieldStructValue("Page") + pos0 := d.Pos() + d.SeekRel(8 * 8) + xlpPageAddr0 := d.U64() + d.SeekAbs(pos0) + if wal.page != nil { + xlpPageAddr1 := wal.page.xlpPageAddr + XLOG_BLCKSZ + if xlpPageAddr0 != xlpPageAddr1 { + d.Fatalf("invalid xlp_pageaddr expected = %d, actual = %d\n", xlpPageAddr1, xlpPageAddr0) + } + } + wal.page = &walPage{} // type = struct XLogPageHeaderData { /* 0 | 2 */ // uint16 xlp_magic; @@ -191,12 +204,13 @@ func decodeXLogPage(wal *walD, d *decode.D) { /* 8 | 8 */ // XLogRecPtr xlp_pageaddr; /* 16 | 4 */ // uint32 xlp_rem_len; /* XXX 4-byte padding */ + xLogPage := d.FieldStructValue("Page") header := xLogPage.FieldStructValue("XLogPageHeaderData") xlpMagic := header.FieldU16("xlp_magic") xlpInfo := header.FieldU16("xlp_info") header.FieldU32("xlp_tli") - header.FieldU64("xlp_pageaddr") + wal.page.xlpPageAddr = header.FieldU64("xlp_pageaddr") remLenBytes := header.FieldU32("xlp_rem_len") header.FieldU32("padding0") @@ -259,7 +273,6 @@ func decodeXLogRecords(wal *walD, d *decode.D) { posBytes := d.Pos() / 8 posMaxOfPageBytes := int64(common.TypeAlign(XLOG_BLCKSZ, uint64(posBytes))) - fmt.Printf("posMaxOfPageBytes = %d\n", posMaxOfPageBytes) for { /* 0 | 4 */ // uint32 xl_tot_len @@ -297,7 +310,15 @@ func decodeXLogRecords(wal *walD, d *decode.D) { } wal.records.AddChild(record.Value) + lsn0 := uint64(d.Pos() / 8) + lsn1 := lsn0 % XLOG_BLCKSZ + lsn := lsn1 + wal.page.xlpPageAddr + record.FieldValueU("lsn", lsn, common.XLogRecPtrMapper) + xlTotLen := record.FieldU32("xl_tot_len") + if xlTotLen < 4 { + d.Fatalf("xl_tot_len is less than 4\n") + } xlTotLen1Bytes := xlTotLen - 4 pos2Bytes := d.Pos() / 8 @@ -315,7 +336,8 @@ func decodeXLogRecords(wal *walD, d *decode.D) { xLogBodyLen := int64(xlTotLen1Bytes) * 8 if xLogBodyLen <= 0 { - d.Fatalf("xlTotLen1Bytes is negative, xLogBodyLen = %d\n", xLogBodyLen) + errPos := record.Pos() / 8 + d.Fatalf("xlTotLen1Bytes is negative, xLogBodyLen = %d, pos = %X\n", xLogBodyLen, errPos) } //record.FieldRawLen("xLogBody", xLogBodyLen) @@ -399,7 +421,7 @@ func decodeXLogRecord(wal *walD, maxBytes int64) { if isEnd(record, posMax, 64) { return } - record.FieldU64("xl_prev") + record.FieldU64("xl_prev", common.XLogRecPtrMapper) } if record.FieldGet("xl_info") == nil { @@ -449,9 +471,9 @@ func decodeXLogRecord(wal *walD, maxBytes int64) { //XLR_BLOCK_ID_ORIGIN = 253 //XLR_BLOCK_ID_TOPLEVEL_XID = 252 - mainDataLen := uint64(0) - recordOrigin := uint64(0) - toplevelXid := uint64(0) + //mainDataLen := uint64(0) + //recordOrigin := uint64(0) + //toplevelXid := uint64(0) if blockId == XLR_BLOCK_ID_DATA_SHORT { //typedef struct XLogRecordDataHeaderShort //{ @@ -460,30 +482,34 @@ func decodeXLogRecord(wal *walD, maxBytes int64) { //} // // COPY_HEADER_FIELD(&main_data_len, sizeof(uint8)); - mainDataLen, end = fieldTryGetScalarActualU(record, "main_data_len", posMax, 8) - if end { - return - } + // + //mainDataLen, end = fieldTryGetScalarActualU(record, "main_data_len", posMax, 8) + //if end { + // return + //} } else if blockId == XLR_BLOCK_ID_DATA_LONG { // COPY_HEADER_FIELD(&main_data_len, sizeof(uint32)); - mainDataLen, end = fieldTryGetScalarActualU(record, "main_data_len", posMax, 32) - if end { - return - } + // + //mainDataLen, end = fieldTryGetScalarActualU(record, "main_data_len", posMax, 32) + //if end { + // return + //} } else if blockId == XLR_BLOCK_ID_ORIGIN { // COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); // unsigned short - 2 bytes - recordOrigin, end = fieldTryGetScalarActualU(record, "record_origin", posMax, 16) - if end { - return - } + // + //recordOrigin, end = fieldTryGetScalarActualU(record, "record_origin", posMax, 16) + //if end { + // return + //} } else if blockId == XLR_BLOCK_ID_TOPLEVEL_XID { // COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId)); // 4 bytes - toplevelXid, end = fieldTryGetScalarActualU(record, "record_origin", posMax, 32) - if end { - return - } + // + //toplevelXid, end = fieldTryGetScalarActualU(record, "record_origin", posMax, 32) + //if end { + // return + //} } else if blockId >= XLR_MAX_BLOCK_ID { record.Fatalf("catched blockId = %d\n", blockId) } else if blockId < XLR_MAX_BLOCK_ID { @@ -622,7 +648,9 @@ func decodeXLogRecord(wal *walD, maxBytes int64) { } } - fmt.Printf("mainDataLen = %d, recordOrigin = %d, toplevelXid = %d\n", mainDataLen, recordOrigin, toplevelXid) + //if mainDataLen == 77 && recordOrigin == 88 && toplevelXid == 99 { + // fmt.Printf("mainDataLen = %d, recordOrigin = %d, toplevelXid = %d\n", mainDataLen, recordOrigin, toplevelXid) + //} record.SeekAbs(posMax) }