1
1
mirror of https://github.com/wader/fq.git synced 2025-01-05 20:54:25 +03:00

postgres: wal impl

This commit is contained in:
Pavel Safonov 2022-10-26 09:56:00 +03:00
parent a4c1c5b811
commit c105fcdd90

View File

@ -1,7 +1,6 @@
package postgres14
import (
"fmt"
"github.com/wader/fq/format/postgres/common"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/scalar"
@ -136,6 +135,7 @@ func decodeXLogPageHeaderData(d *decode.D) {
type walD struct {
maxOffset int64
page *walPage
pages *decode.D
records *decode.D
@ -150,6 +150,10 @@ type walState struct {
recordRemLenBytes int64
}
type walPage struct {
xlpPageAddr uint64
}
func DecodePgwal(d *decode.D, maxOffset uint32) any {
pages := d.FieldArrayValue("Pages")
wal := &walD{
@ -181,8 +185,17 @@ func DecodePgwal(d *decode.D, maxOffset uint32) any {
}
func decodeXLogPage(wal *walD, d *decode.D) {
xLogPage := d.FieldStructValue("Page")
pos0 := d.Pos()
d.SeekRel(8 * 8)
xlpPageAddr0 := d.U64()
d.SeekAbs(pos0)
if wal.page != nil {
xlpPageAddr1 := wal.page.xlpPageAddr + XLOG_BLCKSZ
if xlpPageAddr0 != xlpPageAddr1 {
d.Fatalf("invalid xlp_pageaddr expected = %d, actual = %d\n", xlpPageAddr1, xlpPageAddr0)
}
}
wal.page = &walPage{}
// type = struct XLogPageHeaderData {
/* 0 | 2 */ // uint16 xlp_magic;
@ -191,12 +204,13 @@ func decodeXLogPage(wal *walD, d *decode.D) {
/* 8 | 8 */ // XLogRecPtr xlp_pageaddr;
/* 16 | 4 */ // uint32 xlp_rem_len;
/* XXX 4-byte padding */
xLogPage := d.FieldStructValue("Page")
header := xLogPage.FieldStructValue("XLogPageHeaderData")
xlpMagic := header.FieldU16("xlp_magic")
xlpInfo := header.FieldU16("xlp_info")
header.FieldU32("xlp_tli")
header.FieldU64("xlp_pageaddr")
wal.page.xlpPageAddr = header.FieldU64("xlp_pageaddr")
remLenBytes := header.FieldU32("xlp_rem_len")
header.FieldU32("padding0")
@ -259,7 +273,6 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
posBytes := d.Pos() / 8
posMaxOfPageBytes := int64(common.TypeAlign(XLOG_BLCKSZ, uint64(posBytes)))
fmt.Printf("posMaxOfPageBytes = %d\n", posMaxOfPageBytes)
for {
/* 0 | 4 */ // uint32 xl_tot_len
@ -297,7 +310,15 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
}
wal.records.AddChild(record.Value)
lsn0 := uint64(d.Pos() / 8)
lsn1 := lsn0 % XLOG_BLCKSZ
lsn := lsn1 + wal.page.xlpPageAddr
record.FieldValueU("lsn", lsn, common.XLogRecPtrMapper)
xlTotLen := record.FieldU32("xl_tot_len")
if xlTotLen < 4 {
d.Fatalf("xl_tot_len is less than 4\n")
}
xlTotLen1Bytes := xlTotLen - 4
pos2Bytes := d.Pos() / 8
@ -315,7 +336,8 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
xLogBodyLen := int64(xlTotLen1Bytes) * 8
if xLogBodyLen <= 0 {
d.Fatalf("xlTotLen1Bytes is negative, xLogBodyLen = %d\n", xLogBodyLen)
errPos := record.Pos() / 8
d.Fatalf("xlTotLen1Bytes is negative, xLogBodyLen = %d, pos = %X\n", xLogBodyLen, errPos)
}
//record.FieldRawLen("xLogBody", xLogBodyLen)
@ -399,7 +421,7 @@ func decodeXLogRecord(wal *walD, maxBytes int64) {
if isEnd(record, posMax, 64) {
return
}
record.FieldU64("xl_prev")
record.FieldU64("xl_prev", common.XLogRecPtrMapper)
}
if record.FieldGet("xl_info") == nil {
@ -449,9 +471,9 @@ func decodeXLogRecord(wal *walD, maxBytes int64) {
//XLR_BLOCK_ID_ORIGIN = 253
//XLR_BLOCK_ID_TOPLEVEL_XID = 252
mainDataLen := uint64(0)
recordOrigin := uint64(0)
toplevelXid := uint64(0)
//mainDataLen := uint64(0)
//recordOrigin := uint64(0)
//toplevelXid := uint64(0)
if blockId == XLR_BLOCK_ID_DATA_SHORT {
//typedef struct XLogRecordDataHeaderShort
//{
@ -460,30 +482,34 @@ func decodeXLogRecord(wal *walD, maxBytes int64) {
//}
//
// COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
mainDataLen, end = fieldTryGetScalarActualU(record, "main_data_len", posMax, 8)
if end {
return
}
//
//mainDataLen, end = fieldTryGetScalarActualU(record, "main_data_len", posMax, 8)
//if end {
// return
//}
} else if blockId == XLR_BLOCK_ID_DATA_LONG {
// COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
mainDataLen, end = fieldTryGetScalarActualU(record, "main_data_len", posMax, 32)
if end {
return
}
//
//mainDataLen, end = fieldTryGetScalarActualU(record, "main_data_len", posMax, 32)
//if end {
// return
//}
} else if blockId == XLR_BLOCK_ID_ORIGIN {
// COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
// unsigned short - 2 bytes
recordOrigin, end = fieldTryGetScalarActualU(record, "record_origin", posMax, 16)
if end {
return
}
//
//recordOrigin, end = fieldTryGetScalarActualU(record, "record_origin", posMax, 16)
//if end {
// return
//}
} else if blockId == XLR_BLOCK_ID_TOPLEVEL_XID {
// COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
// 4 bytes
toplevelXid, end = fieldTryGetScalarActualU(record, "record_origin", posMax, 32)
if end {
return
}
//
//toplevelXid, end = fieldTryGetScalarActualU(record, "record_origin", posMax, 32)
//if end {
// return
//}
} else if blockId >= XLR_MAX_BLOCK_ID {
record.Fatalf("catched blockId = %d\n", blockId)
} else if blockId < XLR_MAX_BLOCK_ID {
@ -622,7 +648,9 @@ func decodeXLogRecord(wal *walD, maxBytes int64) {
}
}
fmt.Printf("mainDataLen = %d, recordOrigin = %d, toplevelXid = %d\n", mainDataLen, recordOrigin, toplevelXid)
//if mainDataLen == 77 && recordOrigin == 88 && toplevelXid == 99 {
// fmt.Printf("mainDataLen = %d, recordOrigin = %d, toplevelXid = %d\n", mainDataLen, recordOrigin, toplevelXid)
//}
record.SeekAbs(posMax)
}