mirror of
https://github.com/wader/fq.git
synced 2025-01-07 14:48:14 +03:00
postgres: refactoring
This commit is contained in:
parent
067f8d569d
commit
d7bcca0a4a
@ -1,69 +1,15 @@
|
||||
package postgres14
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/wader/fq/format/postgres/common"
|
||||
"github.com/wader/fq/pkg/decode"
|
||||
"github.com/wader/fq/pkg/scalar"
|
||||
)
|
||||
|
||||
//nolint:revive
|
||||
const (
|
||||
XLOG_BLCKSZ = 8192
|
||||
XLP_LONG_HEADER = 2
|
||||
)
|
||||
|
||||
//nolint:revive
|
||||
const (
|
||||
BKPBLOCK_FORK_MASK = 0x0F
|
||||
/* block data is an XLogRecordBlockImage */
|
||||
BKPBLOCK_HAS_IMAGE = 0x10
|
||||
BKPBLOCK_HAS_DATA = 0x20
|
||||
/* redo will re-init the page */
|
||||
BKPBLOCK_WILL_INIT = 0x40
|
||||
/* RelFileNode omitted, same as previous */
|
||||
BKPBLOCK_SAME_REL = 0x80
|
||||
)
|
||||
|
||||
/* Information stored in bimg_info */
|
||||
//nolint:revive
|
||||
const (
|
||||
/* page image has "hole" */
|
||||
BKPIMAGE_HAS_HOLE = 0x01
|
||||
/* page image is compressed */
|
||||
BKPIMAGE_IS_COMPRESSED = 0x02
|
||||
/* page image should be restored during replay */
|
||||
BKPIMAGE_APPLY = 0x04
|
||||
)
|
||||
|
||||
var rmgrIds = scalar.UToScalar{
|
||||
0: {Sym: "XLOG", Description: "RM_XLOG_ID"},
|
||||
1: {Sym: "Transaction", Description: "RM_XACT_ID"},
|
||||
2: {Sym: "Storage", Description: "RM_SMGR_ID"},
|
||||
3: {Sym: "CLOG", Description: "RM_CLOG_ID"},
|
||||
4: {Sym: "Database", Description: "RM_DBASE_ID"},
|
||||
5: {Sym: "Tablespace", Description: "RM_TBLSPC_ID"},
|
||||
6: {Sym: "MultiXact", Description: "RM_MULTIXACT_ID"},
|
||||
7: {Sym: "RelMap", Description: "RM_RELMAP_ID"},
|
||||
8: {Sym: "Standby", Description: "RM_STANDBY_ID"},
|
||||
9: {Sym: "Heap2", Description: "RM_HEAP2_ID"},
|
||||
10: {Sym: "Heap", Description: "RM_HEAP_ID"},
|
||||
11: {Sym: "Btree", Description: "RM_BTREE_ID"},
|
||||
12: {Sym: "Hash", Description: "RM_HASH_ID"},
|
||||
13: {Sym: "Gin", Description: "RM_GIN_ID"},
|
||||
14: {Sym: "Gist", Description: "RM_GIST_ID"},
|
||||
15: {Sym: "Sequence", Description: "RM_SEQ_ID"},
|
||||
16: {Sym: "SPGist", Description: "RM_SPGIST_ID"},
|
||||
17: {Sym: "BRIN", Description: "RM_BRIN_ID"},
|
||||
18: {Sym: "CommitTs", Description: "RM_COMMIT_TS_ID"},
|
||||
19: {Sym: "ReplicationOrigin", Description: "RM_REPLORIGIN_ID"},
|
||||
20: {Sym: "Generic", Description: "RM_GENERIC_ID"},
|
||||
21: {Sym: "LogicalMessage", Description: "RM_LOGICALMSG_ID"},
|
||||
}
|
||||
|
||||
const (
|
||||
XLOG_PAGE_MAGIC_MASK = 0xD000
|
||||
XLOG_PAGE_MAGIC_POSTGRES14 = 0xD10D
|
||||
XLOG_BLCKSZ = 8192
|
||||
XLP_LONG_HEADER = 2
|
||||
XLOG_PAGE_MAGIC_MASK = 0xD000
|
||||
)
|
||||
|
||||
// struct XLogLongPageHeaderData {
|
||||
@ -93,37 +39,9 @@ const (
|
||||
/* 17 | 1 */ // RmgrId xl_rmid
|
||||
/* XXX 2-byte hole */
|
||||
/* 20 | 4 */ // pg_crc32c xl_crc
|
||||
|
||||
//
|
||||
/* total size (bytes): 24 */
|
||||
|
||||
func decodeXLogPageHeaderData(d *decode.D) {
|
||||
/* 0 | 2 */ // uint16 xlp_magic;
|
||||
/* 2 | 2 */ // uint16 xlp_info;
|
||||
/* 4 | 4 */ // TimeLineID xlp_tli;
|
||||
/* 8 | 8 */ // XLogRecPtr xlp_pageaddr;
|
||||
/* 16 | 4 */ // uint32 xlp_rem_len;
|
||||
/* XXX 4-byte padding */
|
||||
xlpMagic := d.FieldU16("xlp_magic")
|
||||
xlpInfo := d.FieldU16("xlp_info")
|
||||
d.FieldU32("xlp_timeline")
|
||||
d.FieldU64("xlp_pageaddr")
|
||||
d.FieldU32("xlp_rem_len")
|
||||
d.FieldU32("padding0")
|
||||
|
||||
if (xlpMagic & XLOG_PAGE_MAGIC_MASK) == 0 {
|
||||
d.Fatalf("invalid xlp_magic = %X\n", xlpMagic)
|
||||
}
|
||||
|
||||
if xlpInfo&XLP_LONG_HEADER != 0 {
|
||||
// Long header
|
||||
d.FieldStruct("XLogLongPageHeaderData", func(d *decode.D) {
|
||||
d.FieldU64("xlp_sysid")
|
||||
d.FieldU32("xlp_seg_size")
|
||||
d.FieldU32("xlp_xlog_blcksz")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type walD struct {
|
||||
maxOffset int64
|
||||
|
||||
@ -137,11 +55,11 @@ type walD struct {
|
||||
}
|
||||
|
||||
func DecodePgwal(d *decode.D, maxOffset uint32) any {
|
||||
pages := d.FieldArrayValue("Pages")
|
||||
pages := d.FieldArrayValue("pages")
|
||||
wal := &walD{
|
||||
maxOffset: int64(maxOffset),
|
||||
pages: pages,
|
||||
records: d.FieldArrayValue("Records"),
|
||||
records: d.FieldArrayValue("records"),
|
||||
recordRemLenBytes: -1, // -1 means not initialized
|
||||
}
|
||||
|
||||
@ -169,7 +87,7 @@ func DecodePgwal(d *decode.D, maxOffset uint32) any {
|
||||
|
||||
func decodeXLogPage(wal *walD, d *decode.D) {
|
||||
|
||||
xLogPage := d.FieldStructValue("Page")
|
||||
xLogPage := d.FieldStructValue("page")
|
||||
|
||||
// type = struct XLogPageHeaderData {
|
||||
/* 0 | 2 */ // uint16 xlp_magic;
|
||||
@ -178,7 +96,7 @@ func decodeXLogPage(wal *walD, d *decode.D) {
|
||||
/* 8 | 8 */ // XLogRecPtr xlp_pageaddr;
|
||||
/* 16 | 4 */ // uint32 xlp_rem_len;
|
||||
/* XXX 4-byte padding */
|
||||
header := xLogPage.FieldStructValue("XLogPageHeaderData")
|
||||
header := xLogPage.FieldStructValue("xlog_page_header_data")
|
||||
|
||||
xlpMagic := header.FieldU16("xlp_magic")
|
||||
xlpInfo := header.FieldU16("xlp_info")
|
||||
@ -193,7 +111,7 @@ func decodeXLogPage(wal *walD, d *decode.D) {
|
||||
|
||||
if xlpInfo&XLP_LONG_HEADER != 0 {
|
||||
// Long header
|
||||
header.FieldStruct("XLogLongPageHeaderData", func(d *decode.D) {
|
||||
header.FieldStruct("xlog_long_page_header_data", func(d *decode.D) {
|
||||
d.FieldU64("xlp_sysid")
|
||||
d.FieldU32("xlp_seg_size")
|
||||
d.FieldU32("xlp_xlog_blcksz")
|
||||
@ -220,7 +138,7 @@ func decodeXLogPage(wal *walD, d *decode.D) {
|
||||
if checkPosBytes >= XLOG_BLCKSZ {
|
||||
d.Fatalf("invalid pos for RawBytesOfPreviousWalFile, it must be on first page only, pos = %d\n", checkPosBytes)
|
||||
}
|
||||
xLogPage.FieldRawLen("RawBytesOfPreviousWalFile", remLen)
|
||||
xLogPage.FieldRawLen("raw_bytes_of_previous_wal_file", remLen)
|
||||
} else {
|
||||
// record of previous page
|
||||
decodeXLogRecord(wal, remLenBytesAligned)
|
||||
@ -234,7 +152,7 @@ func decodeXLogPage(wal *walD, d *decode.D) {
|
||||
}
|
||||
|
||||
xLogPage.SeekAbs(pos2)
|
||||
pageRecords := xLogPage.FieldArrayValue("Records")
|
||||
pageRecords := xLogPage.FieldArrayValue("records")
|
||||
|
||||
wal.pageRecords = pageRecords
|
||||
|
||||
@ -246,7 +164,6 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
|
||||
|
||||
posBytes := d.Pos() / 8
|
||||
posMaxOfPageBytes := int64(common.TypeAlign(XLOG_BLCKSZ, uint64(posBytes)))
|
||||
fmt.Printf("posMaxOfPageBytes = %d\n", posMaxOfPageBytes)
|
||||
|
||||
for {
|
||||
/* 0 | 4 */ // uint32 xl_tot_len
|
||||
@ -280,17 +197,12 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
|
||||
|
||||
d.SeekAbs(posBytes1Aligned * 8)
|
||||
|
||||
record := pageRecords.FieldStructValue("XLogRecord")
|
||||
record := pageRecords.FieldStructValue("xlog_record")
|
||||
wal.record = record
|
||||
wal.records.AddChild(record.Value)
|
||||
|
||||
//xLogRecordBegin := record.Pos()
|
||||
xlTotLen := record.FieldU32("xl_tot_len")
|
||||
|
||||
xlTotLen1Bytes := xlTotLen - 4
|
||||
//xlTotLen1 := xlTotLen1Bytes * 8
|
||||
|
||||
//pos2 := d.Pos()
|
||||
pos2Bytes := d.Pos() / 8
|
||||
|
||||
remOnPage := posMaxOfPageBytes - pos2Bytes
|
||||
@ -299,7 +211,6 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
|
||||
}
|
||||
|
||||
if remOnPage < int64(xlTotLen1Bytes) {
|
||||
//record.FieldRawLen("xLogBody", remOnPage*8)
|
||||
decodeXLogRecord(wal, remOnPage)
|
||||
wal.recordRemLenBytes = int64(xlTotLen1Bytes) - remOnPage
|
||||
break
|
||||
@ -310,37 +221,9 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
|
||||
d.Fatalf("xlTotLen1Bytes is negative, xLogBodyLen = %d\n", xLogBodyLen)
|
||||
}
|
||||
|
||||
//record.FieldRawLen("xLogBody", xLogBodyLen)
|
||||
decodeXLogRecord(wal, int64(xlTotLen1Bytes))
|
||||
wal.record = nil
|
||||
wal.recordRemLenBytes = 0
|
||||
|
||||
//xLogRecordBegin := record.Pos()
|
||||
//xlTotLen := record.FieldU32("xl_tot_len")
|
||||
//record.FieldU32("xl_xid")
|
||||
//record.FieldU64("xl_prev")
|
||||
//record.FieldU8("xl_info")
|
||||
//record.FieldU8("xl_rmid")
|
||||
//record.U16()
|
||||
//record.FieldU32("xl_crc")
|
||||
//xLogRecordEnd := record.Pos()
|
||||
//sizeOfXLogRecord := (xLogRecordEnd - xLogRecordBegin) / 8
|
||||
//
|
||||
//xLogRecordBodyLen := xlTotLen - uint64(sizeOfXLogRecord)
|
||||
//
|
||||
//rawLen := int64(common.TypeAlign8(xLogRecordBodyLen))
|
||||
//pos1Bytes := d.Pos() / 8
|
||||
//
|
||||
//remOnPage := posMaxOfPageBytes - pos1Bytes
|
||||
//if remOnPage < rawLen {
|
||||
// record.FieldRawLen("xLogBody", remOnPage*8)
|
||||
// wal.recordRemLenBytes = rawLen - remOnPage
|
||||
// break
|
||||
//}
|
||||
//
|
||||
//record.FieldRawLen("xLogBody", rawLen*8)
|
||||
//wal.recordRemLenBytes = -1
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -361,12 +244,12 @@ func decodeXLogRecord(wal *walD, maxBytes int64) {
|
||||
|
||||
pos0 := record.Pos()
|
||||
maxLen := maxBytes * 8
|
||||
if record.FieldGet("xLogBody0") == nil {
|
||||
if record.FieldGet("xlog_body0") == nil {
|
||||
// body on first page
|
||||
record.FieldRawLen("xLogBody0", maxLen)
|
||||
record.FieldRawLen("xlog_body0", maxLen)
|
||||
} else {
|
||||
// body on second page
|
||||
record.FieldRawLen("xLogBody1", maxLen)
|
||||
record.FieldRawLen("xlog_body1", maxLen)
|
||||
}
|
||||
pos1 := record.Pos()
|
||||
posMax := pos1
|
||||
@ -411,11 +294,11 @@ func decodeXLogRecord(wal *walD, maxBytes int64) {
|
||||
record.FieldU8("xl_rmid")
|
||||
}
|
||||
|
||||
if record.FieldGet("hole1") == nil {
|
||||
if record.FieldGet("hole0") == nil {
|
||||
if isEnd(record, posMax, 16) {
|
||||
return
|
||||
}
|
||||
record.FieldU16("hole1")
|
||||
record.FieldU16("hole0")
|
||||
}
|
||||
|
||||
if record.FieldGet("xl_crc") == nil {
|
||||
@ -429,100 +312,3 @@ func decodeXLogRecord(wal *walD, maxBytes int64) {
|
||||
|
||||
record.SeekAbs(posMax)
|
||||
}
|
||||
|
||||
func DecodePgwalOri(d *decode.D, in any) any {
|
||||
d.SeekAbs(0)
|
||||
|
||||
pageHeaders := d.FieldArrayValue("XLogPageHeaders")
|
||||
header := pageHeaders.FieldStruct("XLogPageHeaderData", decodeXLogPageHeaderData)
|
||||
|
||||
xlpRemLen, ok := header.FieldGet("xlp_rem_len").V.(uint32)
|
||||
if !ok {
|
||||
d.Fatalf("can't get xlp_rem_len\n")
|
||||
}
|
||||
|
||||
d.FieldRawLen("prev_file_rec", int64(xlpRemLen*8))
|
||||
d.FieldRawLen("prev_file_rec_padding", int64(d.AlignBits(64)))
|
||||
|
||||
d.FieldArray("XLogRecords", func(d *decode.D) {
|
||||
for {
|
||||
d.FieldStruct("XLogRecord", func(d *decode.D) {
|
||||
recordPos := uint64(d.Pos()) >> 3
|
||||
recordLen := d.FieldU32("xl_tot_len")
|
||||
recordEnd := recordPos + recordLen
|
||||
headerPos := recordEnd - recordEnd%XLOG_BLCKSZ
|
||||
d.FieldU32("xl_xid")
|
||||
d.FieldU64("xl_prev", scalar.ActualHex)
|
||||
d.FieldU8("xl_info")
|
||||
d.FieldU8("xl_rmid", rmgrIds)
|
||||
d.FieldRawLen("padding", int64(d.AlignBits(32)))
|
||||
d.FieldU32("xl_crc", scalar.ActualHex)
|
||||
|
||||
var lengths []uint64
|
||||
|
||||
d.FieldArray("XLogRecordBlockHeader", func(d *decode.D) {
|
||||
for blkheaderid := uint64(0); d.PeekBits(8) == blkheaderid; blkheaderid++ {
|
||||
d.FieldStruct("XlogRecordBlockHeader", func(d *decode.D) {
|
||||
/* block reference ID */
|
||||
d.FieldU8("id", d.AssertU(blkheaderid))
|
||||
/* fork within the relation, and flags */
|
||||
forkFlags := d.FieldU8("fork_flags")
|
||||
/* number of payload bytes (not including page image) */
|
||||
lengths = append(lengths, d.FieldU16("data_length"))
|
||||
if forkFlags&BKPBLOCK_HAS_IMAGE != 0 {
|
||||
d.FieldStruct("XLogRecordBlockImageHeader", func(d *decode.D) {
|
||||
/* number of page image bytes */
|
||||
d.FieldU16("length")
|
||||
/* number of bytes before "hole" */
|
||||
d.FieldU16("hole_offset")
|
||||
/* flag bits, see below */
|
||||
bimgInfo := d.FieldU8("bimg_info")
|
||||
d.FieldRawLen("padding", int64(d.AlignBits(16)))
|
||||
if bimgInfo&BKPIMAGE_HAS_HOLE != 0 &&
|
||||
bimgInfo&BKPIMAGE_IS_COMPRESSED != 0 {
|
||||
d.FieldU16("hole_length")
|
||||
}
|
||||
})
|
||||
}
|
||||
if forkFlags&BKPBLOCK_SAME_REL == 0 {
|
||||
d.FieldStruct("RelFileNode", func(d *decode.D) {
|
||||
/* tablespace */
|
||||
d.FieldU32("spcNode")
|
||||
/* database */
|
||||
d.FieldU32("dbNode")
|
||||
/* relation */
|
||||
d.FieldU32("relNode")
|
||||
})
|
||||
d.FieldU32("BlockNum")
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
if d.PeekBits(8) == 0xff {
|
||||
d.FieldStruct("XLogRecordDataHeaderShort", func(d *decode.D) {
|
||||
d.FieldU8("id", d.AssertU(0xff))
|
||||
lengths = append(lengths, d.FieldU8("data_length"))
|
||||
})
|
||||
}
|
||||
|
||||
d.FieldArray("data", func(d *decode.D) {
|
||||
for _, x := range lengths {
|
||||
pos := uint64(d.Pos()) >> 3
|
||||
if pos < headerPos && (headerPos < pos+x) {
|
||||
d.FieldRawLen("data", int64((headerPos-pos)*8))
|
||||
header := pageHeaders.FieldStruct("XLogPageHeaderData", decodeXLogPageHeaderData)
|
||||
_ = header.FieldGet("xlp_rem_len").TryScalarFn(d.ValidateU(recordEnd - headerPos))
|
||||
d.FieldRawLen("data", int64((x+pos-headerPos)*8))
|
||||
} else {
|
||||
d.FieldRawLen("data", int64(x*8))
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
d.FieldRawLen("ending_padding", int64(d.AlignBits(64)))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -1,31 +0,0 @@
|
||||
package postgres14_test
|
||||
|
||||
import (
|
||||
"github.com/wader/fq/format/postgres/flavours/postgres14"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseLsn(t *testing.T) {
|
||||
lsn1, err := postgres14.ParseLsn("0/4E394440")
|
||||
if err != nil {
|
||||
t.Fatalf("TestParseLsn 1, err = %v\n", err)
|
||||
}
|
||||
if lsn1 != 0x4E394440 {
|
||||
t.Fatalf("TestParseLsn 2, invalid lsn value\n")
|
||||
}
|
||||
|
||||
lsn2, err := postgres14.ParseLsn("0/4469E930")
|
||||
if err != nil {
|
||||
t.Fatalf("TestParseLsn 3, err = %v\n", err)
|
||||
}
|
||||
if lsn2 != 0x4469E930 {
|
||||
t.Fatalf("TestParseLsn 4, invalid lsn value\n")
|
||||
}
|
||||
}
|
||||
|
||||
func TestXLogSegmentOffset(t *testing.T) {
|
||||
offset := postgres14.XLogSegmentOffset(0x4E394440)
|
||||
if offset == 0 {
|
||||
t.Fatalf("TestXLogSegmentOffset 1, invalid offset\n")
|
||||
}
|
||||
}
|
@ -1,12 +1,13 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/wader/fq/format"
|
||||
"github.com/wader/fq/format/postgres/flavours/postgres14"
|
||||
"github.com/wader/fq/pkg/decode"
|
||||
"github.com/wader/fq/pkg/interp"
|
||||
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
@ -26,7 +27,8 @@ func init() {
|
||||
})
|
||||
}
|
||||
|
||||
//// https://pgpedia.info/x/XLOG_PAGE_MAGIC.html
|
||||
// https://pgpedia.info/x/XLOG_PAGE_MAGIC.html
|
||||
//nolint:revive
|
||||
const (
|
||||
XLOG_PAGE_MAGIC_15 = uint16(0xD10F)
|
||||
XLOG_PAGE_MAGIC_14 = uint16(0xD10D)
|
||||
@ -43,7 +45,7 @@ func ParseLsn(lsn string) (uint32, error) {
|
||||
if strings.Contains(lsn, "/") {
|
||||
parts := strings.Split(lsn, "/")
|
||||
if len(parts) != 2 {
|
||||
return 0, errors.New(fmt.Sprintf("Invalid lsn = %s", lsn))
|
||||
return 0, fmt.Errorf("invalid lsn = %s", lsn)
|
||||
}
|
||||
str1 = parts[1]
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package postgres_test
|
||||
|
||||
import (
|
||||
"github.com/wader/fq/format/postgres"
|
||||
|
||||
"testing"
|
||||
)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user