1
1
mirror of https://github.com/wader/fq.git synced 2024-11-29 23:27:12 +03:00

postgres: wal refactoing

This commit is contained in:
Pavel Safonov 2022-10-26 10:01:21 +03:00
parent c105fcdd90
commit b09ec2fc94

View File

@ -207,16 +207,16 @@ func decodeXLogPage(wal *walD, d *decode.D) {
xLogPage := d.FieldStructValue("Page")
header := xLogPage.FieldStructValue("XLogPageHeaderData")
xlpMagic := header.FieldU16("xlp_magic")
header.FieldU16("xlp_magic")
xlpInfo := header.FieldU16("xlp_info")
header.FieldU32("xlp_tli")
wal.page.xlpPageAddr = header.FieldU64("xlp_pageaddr")
remLenBytes := header.FieldU32("xlp_rem_len")
header.FieldU32("padding0")
if (xlpMagic & XLOG_PAGE_MAGIC_MASK) == 0 {
d.Fatalf("invalid xlp_magic = %X\n", xlpMagic)
}
//if (xlpMagic & XLOG_PAGE_MAGIC_MASK) == 0 {
// d.Fatalf("invalid xlp_magic = %X\n", xlpMagic)
//}
if (xlpInfo & XLP_LONG_HEADER) != 0 {
// Long header
@ -358,31 +358,6 @@ func isEnd(d *decode.D, posMax int64, bitsCount int64) bool {
return result
}
func fieldTryGetScalarActualU(d *decode.D, name string, posMax int64, bitsCount int64) (value uint64, end bool) {
if ok, val := d.FieldTryGetScalarActualU("block_id"); ok {
value = val
} else {
if isEnd(d, posMax, bitsCount) {
return 0, true
}
switch bitsCount {
case 8:
value = d.FieldU8(name)
case 16:
value = d.FieldU16(name)
case 24:
value = d.FieldU24(name)
case 32:
value = d.FieldU32(name)
case 64:
value = d.FieldU64(name)
default:
d.Fatalf("not implemented bitsCount = %d\n", bitsCount)
}
}
return value, false
}
func decodeXLogRecord(wal *walD, maxBytes int64) {
record := wal.state.record
@ -452,206 +427,6 @@ func decodeXLogRecord(wal *walD, maxBytes int64) {
record.FieldU32("xl_crc")
}
//blockId := uint64(0)
//if ok, val := record.FieldTryGetScalarActualU("block_id"); ok {
// blockId = val
//} else {
// if isEnd(record, posMax, 8) {
// return
// }
// blockId = record.FieldU8("block_id")
//}
blockId, end := fieldTryGetScalarActualU(record, "block_id", posMax, 8)
if end {
return
}
//XLR_BLOCK_ID_DATA_SHORT = 255
//XLR_BLOCK_ID_DATA_LONG = 254
//XLR_BLOCK_ID_ORIGIN = 253
//XLR_BLOCK_ID_TOPLEVEL_XID = 252
//mainDataLen := uint64(0)
//recordOrigin := uint64(0)
//toplevelXid := uint64(0)
if blockId == XLR_BLOCK_ID_DATA_SHORT {
//typedef struct XLogRecordDataHeaderShort
//{
// uint8 id; /* XLR_BLOCK_ID_DATA_SHORT */
// uint8 data_length; /* number of payload bytes */
//}
//
// COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
//
//mainDataLen, end = fieldTryGetScalarActualU(record, "main_data_len", posMax, 8)
//if end {
// return
//}
} else if blockId == XLR_BLOCK_ID_DATA_LONG {
// COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
//
//mainDataLen, end = fieldTryGetScalarActualU(record, "main_data_len", posMax, 32)
//if end {
// return
//}
} else if blockId == XLR_BLOCK_ID_ORIGIN {
// COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
// unsigned short - 2 bytes
//
//recordOrigin, end = fieldTryGetScalarActualU(record, "record_origin", posMax, 16)
//if end {
// return
//}
} else if blockId == XLR_BLOCK_ID_TOPLEVEL_XID {
// COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
// 4 bytes
//
//toplevelXid, end = fieldTryGetScalarActualU(record, "record_origin", posMax, 32)
//if end {
// return
//}
} else if blockId >= XLR_MAX_BLOCK_ID {
record.Fatalf("catched blockId = %d\n", blockId)
} else if blockId < XLR_MAX_BLOCK_ID {
// COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
//forkFlags := uint64(0)
//if ok, val := record.FieldTryGetScalarActualU("fork_flags"); ok {
// forkFlags = val
//} else {
// if isEnd(record, posMax, 8) {
// return
// }
// forkFlags = record.FieldU8("fork_flags")
//}
forkFlags, end := fieldTryGetScalarActualU(record, "fork_flags", posMax, 8)
if end {
return
}
// blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
// blk->flags = fork_flags;
// blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
// blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
hasImage := uint64(0)
hasData := uint64(0)
forkNum := forkFlags & BKPBLOCK_FORK_MASK
if (forkFlags & BKPBLOCK_HAS_IMAGE) != 0 {
hasImage = 1
}
if (forkFlags & BKPBLOCK_HAS_DATA) != 0 {
hasData = 1
}
if record.FieldGet("forknum") == nil {
record.FieldValueU("forknum", forkNum)
}
if record.FieldGet("has_image") == nil {
record.FieldValueU("has_image", hasImage)
}
if record.FieldGet("has_data") == nil {
record.FieldValueU("has_data", hasData)
}
// COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
//dataLen := uint64(0)
//if ok, val := record.FieldTryGetScalarActualU("data_len"); ok {
// dataLen = val
//} else {
// if isEnd(record, posMax, 8) {
// return
// }
// dataLen = record.FieldU8("data_len")
//}
dataLen, end := fieldTryGetScalarActualU(record, "data_len", posMax, 8)
if end {
return
}
// if (blk->has_data && blk->data_len == 0)
if hasData != 0 && dataLen == 0 {
record.Fatalf("invalid record with hasData = %d, dataLen = %d\n", hasData, dataLen)
}
// if (!blk->has_data && blk->data_len != 0)
if hasData == 0 && dataLen != 0 {
record.Fatalf("invalid record with hasData = %d, dataLen = %d\n", hasData, dataLen)
}
// if (blk->has_image)
if hasImage != 0 {
// COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
bimgLen, end := fieldTryGetScalarActualU(record, "bimg_len", posMax, 16)
if end {
return
}
// COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
holeOffset, end := fieldTryGetScalarActualU(record, "hole_offset", posMax, 16)
if end {
return
}
// COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
bimgInfo, end := fieldTryGetScalarActualU(record, "bimg_info", posMax, 8)
if end {
return
}
// if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
bimgIsCompressed := uint64(0)
if (bimgInfo & BKPIMAGE_IS_COMPRESSED) != 0 {
bimgIsCompressed = 1
}
if record.FieldGet("bimg_is_compressed") == nil {
record.FieldValueU("bimg_is_compressed", bimgIsCompressed)
}
holeLength := uint64(0)
bimgHasHole := uint64(0)
if bimgIsCompressed != 0 {
if (bimgInfo & BKPIMAGE_HAS_HOLE) != 0 {
bimgHasHole = 1
}
if record.FieldGet("bimg_has_hole") == nil {
record.FieldValueU("bimg_has_hole", bimgHasHole)
}
if bimgHasHole != 0 {
// COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
holeLength, end = fieldTryGetScalarActualU(record, "hole_length", posMax, 16)
if end {
return
}
}
} else { // bimgIsCompressed is false
holeLength = XLOG_BLCKSZ - bimgLen
}
if record.FieldGet("hole_length") == nil {
record.FieldValueU("hole_length", holeLength)
}
if bimgHasHole != 0 && (holeOffset != 0 || holeLength != 0 || bimgLen == XLOG_BLCKSZ) {
record.Fatalf("check failed 1")
}
if (bimgInfo&BKPIMAGE_HAS_HOLE) == 0 && (holeOffset != 0 || holeLength != 0) {
record.Fatalf("check failed 2")
}
if (bimgInfo&BKPIMAGE_IS_COMPRESSED) != 0 && bimgLen == XLOG_BLCKSZ {
record.Fatalf("check failed 3")
}
if (bimgInfo&BKPIMAGE_HAS_HOLE) == 0 && (bimgInfo&BKPIMAGE_IS_COMPRESSED) == 0 && bimgLen != XLOG_BLCKSZ {
record.Fatalf("check failed 4")
}
if (forkFlags & BKPBLOCK_SAME_REL) == 0 {
// COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
}
}
}
//if mainDataLen == 77 && recordOrigin == 88 && toplevelXid == 99 {
// fmt.Printf("mainDataLen = %d, recordOrigin = %d, toplevelXid = %d\n", mainDataLen, recordOrigin, toplevelXid)
//}
record.SeekAbs(posMax)
}