mirror of
https://github.com/wader/fq.git
synced 2024-11-23 09:56:07 +03:00
postgres: try to implement pg_wal
This commit is contained in:
parent
067f8d569d
commit
448c369014
@ -16,24 +16,19 @@ const (
|
||||
//nolint:revive
|
||||
const (
|
||||
BKPBLOCK_FORK_MASK = 0x0F
|
||||
/* block data is an XLogRecordBlockImage */
|
||||
BKPBLOCK_HAS_IMAGE = 0x10
|
||||
BKPBLOCK_FLAG_MASK = 0xF0
|
||||
BKPBLOCK_HAS_IMAGE = 0x10 /* block data is an XLogRecordBlockImage */
|
||||
BKPBLOCK_HAS_DATA = 0x20
|
||||
/* redo will re-init the page */
|
||||
BKPBLOCK_WILL_INIT = 0x40
|
||||
/* RelFileNode omitted, same as previous */
|
||||
BKPBLOCK_SAME_REL = 0x80
|
||||
BKPBLOCK_WILL_INIT = 0x40 /* redo will re-init the page */
|
||||
BKPBLOCK_SAME_REL = 0x80 /* RelFileNode omitted, same as previous */
|
||||
)
|
||||
|
||||
/* Information stored in bimg_info */
|
||||
//nolint:revive
|
||||
const (
|
||||
/* page image has "hole" */
|
||||
BKPIMAGE_HAS_HOLE = 0x01
|
||||
/* page image is compressed */
|
||||
BKPIMAGE_IS_COMPRESSED = 0x02
|
||||
/* page image should be restored during replay */
|
||||
BKPIMAGE_APPLY = 0x04
|
||||
BKPIMAGE_HAS_HOLE = 0x01 /* page image has "hole" */
|
||||
BKPIMAGE_IS_COMPRESSED = 0x02 /* page image is compressed */
|
||||
BKPIMAGE_APPLY = 0x04 /* page image should be restored during replay */
|
||||
)
|
||||
|
||||
var rmgrIds = scalar.UToScalar{
|
||||
@ -66,6 +61,14 @@ const (
|
||||
XLOG_PAGE_MAGIC_POSTGRES14 = 0xD10D
|
||||
)
|
||||
|
||||
const (
|
||||
XLR_MAX_BLOCK_ID = 32
|
||||
XLR_BLOCK_ID_DATA_SHORT = 255
|
||||
XLR_BLOCK_ID_DATA_LONG = 254
|
||||
XLR_BLOCK_ID_ORIGIN = 253
|
||||
XLR_BLOCK_ID_TOPLEVEL_XID = 252
|
||||
)
|
||||
|
||||
// struct XLogLongPageHeaderData {
|
||||
// /* 0 | 24 */ XLogPageHeaderData std;
|
||||
// /* 24 | 8 */ uint64 xlp_sysid;
|
||||
@ -93,9 +96,16 @@ const (
|
||||
/* 17 | 1 */ // RmgrId xl_rmid
|
||||
/* XXX 2-byte hole */
|
||||
/* 20 | 4 */ // pg_crc32c xl_crc
|
||||
|
||||
//
|
||||
/* total size (bytes): 24 */
|
||||
|
||||
// struct RelFileNode {
|
||||
/* 0 | 4 */ // Oid spcNode
|
||||
/* 4 | 4 */ // Oid dbNode
|
||||
/* 8 | 4 */ // Oid relNode
|
||||
//
|
||||
/* total size (bytes): 12 */
|
||||
|
||||
func decodeXLogPageHeaderData(d *decode.D) {
|
||||
/* 0 | 2 */ // uint16 xlp_magic;
|
||||
/* 2 | 2 */ // uint16 xlp_info;
|
||||
@ -114,7 +124,7 @@ func decodeXLogPageHeaderData(d *decode.D) {
|
||||
d.Fatalf("invalid xlp_magic = %X\n", xlpMagic)
|
||||
}
|
||||
|
||||
if xlpInfo&XLP_LONG_HEADER != 0 {
|
||||
if (xlpInfo & XLP_LONG_HEADER) != 0 {
|
||||
// Long header
|
||||
d.FieldStruct("XLogLongPageHeaderData", func(d *decode.D) {
|
||||
d.FieldU64("xlp_sysid")
|
||||
@ -191,7 +201,7 @@ func decodeXLogPage(wal *walD, d *decode.D) {
|
||||
d.Fatalf("invalid xlp_magic = %X\n", xlpMagic)
|
||||
}
|
||||
|
||||
if xlpInfo&XLP_LONG_HEADER != 0 {
|
||||
if (xlpInfo & XLP_LONG_HEADER) != 0 {
|
||||
// Long header
|
||||
header.FieldStruct("XLogLongPageHeaderData", func(d *decode.D) {
|
||||
d.FieldU64("xlp_sysid")
|
||||
@ -256,7 +266,6 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
|
||||
/* 17 | 1 */ // RmgrId xl_rmid
|
||||
/* XXX 2-byte hole */
|
||||
/* 20 | 4 */ // pg_crc32c xl_crc
|
||||
|
||||
posBytes1 := d.Pos() / 8
|
||||
posBytes1Aligned := int64(common.TypeAlign8(uint64(posBytes1)))
|
||||
// check aligned - this is correct
|
||||
@ -284,13 +293,8 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
|
||||
wal.record = record
|
||||
wal.records.AddChild(record.Value)
|
||||
|
||||
//xLogRecordBegin := record.Pos()
|
||||
xlTotLen := record.FieldU32("xl_tot_len")
|
||||
|
||||
xlTotLen1Bytes := xlTotLen - 4
|
||||
//xlTotLen1 := xlTotLen1Bytes * 8
|
||||
|
||||
//pos2 := d.Pos()
|
||||
pos2Bytes := d.Pos() / 8
|
||||
|
||||
remOnPage := posMaxOfPageBytes - pos2Bytes
|
||||
@ -314,33 +318,6 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
|
||||
decodeXLogRecord(wal, int64(xlTotLen1Bytes))
|
||||
wal.record = nil
|
||||
wal.recordRemLenBytes = 0
|
||||
|
||||
//xLogRecordBegin := record.Pos()
|
||||
//xlTotLen := record.FieldU32("xl_tot_len")
|
||||
//record.FieldU32("xl_xid")
|
||||
//record.FieldU64("xl_prev")
|
||||
//record.FieldU8("xl_info")
|
||||
//record.FieldU8("xl_rmid")
|
||||
//record.U16()
|
||||
//record.FieldU32("xl_crc")
|
||||
//xLogRecordEnd := record.Pos()
|
||||
//sizeOfXLogRecord := (xLogRecordEnd - xLogRecordBegin) / 8
|
||||
//
|
||||
//xLogRecordBodyLen := xlTotLen - uint64(sizeOfXLogRecord)
|
||||
//
|
||||
//rawLen := int64(common.TypeAlign8(xLogRecordBodyLen))
|
||||
//pos1Bytes := d.Pos() / 8
|
||||
//
|
||||
//remOnPage := posMaxOfPageBytes - pos1Bytes
|
||||
//if remOnPage < rawLen {
|
||||
// record.FieldRawLen("xLogBody", remOnPage*8)
|
||||
// wal.recordRemLenBytes = rawLen - remOnPage
|
||||
// break
|
||||
//}
|
||||
//
|
||||
//record.FieldRawLen("xLogBody", rawLen*8)
|
||||
//wal.recordRemLenBytes = -1
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -356,6 +333,31 @@ func isEnd(d *decode.D, posMax int64, bitsCount int64) bool {
|
||||
return result
|
||||
}
|
||||
|
||||
func fieldTryGetScalarActualU(d *decode.D, name string, posMax int64, bitsCount int64) (value uint64, end bool) {
|
||||
if ok, val := d.FieldTryGetScalarActualU("block_id"); ok {
|
||||
value = val
|
||||
} else {
|
||||
if isEnd(d, posMax, bitsCount) {
|
||||
return 0, true
|
||||
}
|
||||
switch bitsCount {
|
||||
case 8:
|
||||
value = d.FieldU8(name)
|
||||
case 16:
|
||||
value = d.FieldU16(name)
|
||||
case 24:
|
||||
value = d.FieldU24(name)
|
||||
case 32:
|
||||
value = d.FieldU32(name)
|
||||
case 64:
|
||||
value = d.FieldU64(name)
|
||||
default:
|
||||
d.Fatalf("not implemented bitsCount = %d\n", bitsCount)
|
||||
}
|
||||
}
|
||||
return value, false
|
||||
}
|
||||
|
||||
func decodeXLogRecord(wal *walD, maxBytes int64) {
|
||||
record := wal.record
|
||||
|
||||
@ -425,7 +427,203 @@ func decodeXLogRecord(wal *walD, maxBytes int64) {
|
||||
record.FieldU32("xl_crc")
|
||||
}
|
||||
|
||||
// TODO
|
||||
//blockId := uint64(0)
|
||||
//if ok, val := record.FieldTryGetScalarActualU("block_id"); ok {
|
||||
// blockId = val
|
||||
//} else {
|
||||
// if isEnd(record, posMax, 8) {
|
||||
// return
|
||||
// }
|
||||
// blockId = record.FieldU8("block_id")
|
||||
//}
|
||||
blockId, end := fieldTryGetScalarActualU(record, "block_id", posMax, 8)
|
||||
if end {
|
||||
return
|
||||
}
|
||||
|
||||
if blockId == XLR_BLOCK_ID_DATA_SHORT {
|
||||
//typedef struct XLogRecordDataHeaderShort
|
||||
//{
|
||||
// uint8 id; /* XLR_BLOCK_ID_DATA_SHORT */
|
||||
// uint8 data_length; /* number of payload bytes */
|
||||
//}
|
||||
//
|
||||
/* total size (bytes): 24 */
|
||||
}
|
||||
|
||||
//XLR_BLOCK_ID_DATA_SHORT = 255
|
||||
//XLR_BLOCK_ID_DATA_LONG = 254
|
||||
//XLR_BLOCK_ID_ORIGIN = 253
|
||||
//XLR_BLOCK_ID_TOPLEVEL_XID = 252
|
||||
|
||||
mainDataLen := uint64(0)
|
||||
recordOrigin := uint64(0)
|
||||
toplevelXid := uint64(0)
|
||||
if blockId == XLR_BLOCK_ID_DATA_SHORT {
|
||||
// COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
|
||||
mainDataLen, end = fieldTryGetScalarActualU(record, "main_data_len", posMax, 8)
|
||||
if end {
|
||||
return
|
||||
}
|
||||
} else if blockId == XLR_BLOCK_ID_DATA_LONG {
|
||||
// COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
|
||||
mainDataLen, end = fieldTryGetScalarActualU(record, "main_data_len", posMax, 32)
|
||||
if end {
|
||||
return
|
||||
}
|
||||
} else if blockId == XLR_BLOCK_ID_ORIGIN {
|
||||
// COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
|
||||
// unsigned short - 2 bytes
|
||||
recordOrigin, end = fieldTryGetScalarActualU(record, "record_origin", posMax, 16)
|
||||
if end {
|
||||
return
|
||||
}
|
||||
} else if blockId == XLR_BLOCK_ID_TOPLEVEL_XID {
|
||||
// COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
|
||||
// 4 bytes
|
||||
toplevelXid, end = fieldTryGetScalarActualU(record, "record_origin", posMax, 32)
|
||||
if end {
|
||||
return
|
||||
}
|
||||
} else if blockId >= XLR_MAX_BLOCK_ID {
|
||||
record.Fatalf("catched blockId = %d\n", blockId)
|
||||
} else if blockId < XLR_MAX_BLOCK_ID {
|
||||
// COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
|
||||
//forkFlags := uint64(0)
|
||||
//if ok, val := record.FieldTryGetScalarActualU("fork_flags"); ok {
|
||||
// forkFlags = val
|
||||
//} else {
|
||||
// if isEnd(record, posMax, 8) {
|
||||
// return
|
||||
// }
|
||||
// forkFlags = record.FieldU8("fork_flags")
|
||||
//}
|
||||
forkFlags, end := fieldTryGetScalarActualU(record, "fork_flags", posMax, 8)
|
||||
if end {
|
||||
return
|
||||
}
|
||||
|
||||
// blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
|
||||
// blk->flags = fork_flags;
|
||||
// blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
|
||||
// blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
|
||||
hasImage := uint64(0)
|
||||
hasData := uint64(0)
|
||||
forkNum := forkFlags & BKPBLOCK_FORK_MASK
|
||||
if (forkFlags & BKPBLOCK_HAS_IMAGE) != 0 {
|
||||
hasImage = 1
|
||||
}
|
||||
if (forkFlags & BKPBLOCK_HAS_DATA) != 0 {
|
||||
hasData = 1
|
||||
}
|
||||
if record.FieldGet("forknum") == nil {
|
||||
record.FieldValueU("forknum", forkNum)
|
||||
}
|
||||
if record.FieldGet("has_image") == nil {
|
||||
record.FieldValueU("has_image", hasImage)
|
||||
}
|
||||
if record.FieldGet("has_data") == nil {
|
||||
record.FieldValueU("has_data", hasData)
|
||||
}
|
||||
|
||||
// COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
|
||||
//dataLen := uint64(0)
|
||||
//if ok, val := record.FieldTryGetScalarActualU("data_len"); ok {
|
||||
// dataLen = val
|
||||
//} else {
|
||||
// if isEnd(record, posMax, 8) {
|
||||
// return
|
||||
// }
|
||||
// dataLen = record.FieldU8("data_len")
|
||||
//}
|
||||
dataLen, end := fieldTryGetScalarActualU(record, "data_len", posMax, 8)
|
||||
if end {
|
||||
return
|
||||
}
|
||||
|
||||
// if (blk->has_data && blk->data_len == 0)
|
||||
if hasData != 0 && dataLen == 0 {
|
||||
record.Fatalf("invalid record with hasData = %d, dataLen = %d\n", hasData, dataLen)
|
||||
}
|
||||
// if (!blk->has_data && blk->data_len != 0)
|
||||
if hasData == 0 && dataLen != 0 {
|
||||
record.Fatalf("invalid record with hasData = %d, dataLen = %d\n", hasData, dataLen)
|
||||
}
|
||||
|
||||
// if (blk->has_image)
|
||||
if hasImage != 0 {
|
||||
// COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
|
||||
bimgLen, end := fieldTryGetScalarActualU(record, "bimg_len", posMax, 16)
|
||||
if end {
|
||||
return
|
||||
}
|
||||
|
||||
// COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
|
||||
holeOffset, end := fieldTryGetScalarActualU(record, "hole_offset", posMax, 16)
|
||||
if end {
|
||||
return
|
||||
}
|
||||
|
||||
// COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
|
||||
bimgInfo, end := fieldTryGetScalarActualU(record, "bimg_info", posMax, 8)
|
||||
if end {
|
||||
return
|
||||
}
|
||||
|
||||
// if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
|
||||
bimgIsCompressed := uint64(0)
|
||||
if (bimgInfo & BKPIMAGE_IS_COMPRESSED) != 0 {
|
||||
bimgIsCompressed = 1
|
||||
}
|
||||
if record.FieldGet("bimg_is_compressed") == nil {
|
||||
record.FieldValueU("bimg_is_compressed", bimgIsCompressed)
|
||||
}
|
||||
|
||||
holeLength := uint64(0)
|
||||
bimgHasHole := uint64(0)
|
||||
if bimgIsCompressed != 0 {
|
||||
if (bimgInfo & BKPIMAGE_HAS_HOLE) != 0 {
|
||||
bimgHasHole = 1
|
||||
}
|
||||
if record.FieldGet("bimg_has_hole") == nil {
|
||||
record.FieldValueU("bimg_has_hole", bimgHasHole)
|
||||
}
|
||||
if bimgHasHole != 0 {
|
||||
// COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
|
||||
holeLength, end = fieldTryGetScalarActualU(record, "hole_length", posMax, 16)
|
||||
if end {
|
||||
return
|
||||
}
|
||||
}
|
||||
} else { // bimgIsCompressed is false
|
||||
holeLength = XLOG_BLCKSZ - bimgLen
|
||||
}
|
||||
if record.FieldGet("hole_length") == nil {
|
||||
record.FieldValueU("hole_length", holeLength)
|
||||
}
|
||||
|
||||
if bimgHasHole != 0 && (holeOffset != 0 || holeLength != 0 || bimgLen == XLOG_BLCKSZ) {
|
||||
record.Fatalf("check failed 1")
|
||||
}
|
||||
if (bimgInfo&BKPIMAGE_HAS_HOLE) == 0 && (holeOffset != 0 || holeLength != 0) {
|
||||
record.Fatalf("check failed 2")
|
||||
}
|
||||
if (bimgInfo&BKPIMAGE_IS_COMPRESSED) != 0 && bimgLen == XLOG_BLCKSZ {
|
||||
record.Fatalf("check failed 3")
|
||||
}
|
||||
if (bimgInfo&BKPIMAGE_HAS_HOLE) == 0 && (bimgInfo&BKPIMAGE_IS_COMPRESSED) == 0 && bimgLen != XLOG_BLCKSZ {
|
||||
record.Fatalf("check failed 4")
|
||||
}
|
||||
|
||||
if (forkFlags & BKPBLOCK_SAME_REL) == 0 {
|
||||
// COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("mainDataLen = %d, recordOrigin = %d, toplevelXid = %d\n", mainDataLen, recordOrigin, toplevelXid)
|
||||
|
||||
record.SeekAbs(posMax)
|
||||
}
|
||||
|
@ -755,6 +755,26 @@ func (d *D) FieldGet(name string) *Value {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *D) FieldGetScalar(name string) *scalar.S {
|
||||
v := d.FieldGet(name)
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
sr, ok := v.V.(*scalar.S)
|
||||
if !ok {
|
||||
panic("not a scalar value")
|
||||
}
|
||||
return sr
|
||||
}
|
||||
|
||||
func (d *D) FieldTryGetScalarActualU(name string) (bool, uint64) {
|
||||
sr := d.FieldGetScalar(name)
|
||||
if sr == nil {
|
||||
return false, 0
|
||||
}
|
||||
return true, sr.ActualU()
|
||||
}
|
||||
|
||||
func (d *D) FieldMustGet(name string) *Value {
|
||||
if v := d.FieldGet(name); v != nil {
|
||||
return v
|
||||
|
Loading…
Reference in New Issue
Block a user