1
1
mirror of https://github.com/wader/fq.git synced 2024-11-26 21:55:57 +03:00

postgres: remove lsn parameter in pg_wal

This commit is contained in:
Pavel Safonov 2022-11-07 12:05:33 +03:00
parent e87d5a6b71
commit 5bb8654423
5 changed files with 8 additions and 42 deletions

View File

@ -333,8 +333,3 @@ type PostgresHeapIn struct {
PageNumber int `doc:"First page number in file, default is 0"` PageNumber int `doc:"First page number in file, default is 0"`
SegmentNumber int `doc:"Segment file number (16790.1 is 1), default is 0"` SegmentNumber int `doc:"Segment file number (16790.1 is 1), default is 0"`
} }
type PostgresWalIn struct {
Flavour string `doc:"PostgreSQL flavour: postgres14, pgproee14.., postgres10"`
Lsn string `doc:"Current LSN for WAL, use \"select pg_current_wal_lsn()\""`
}

View File

@ -5,9 +5,8 @@ import (
"github.com/wader/fq/pkg/decode" "github.com/wader/fq/pkg/decode"
) )
func DecodePGWAL(d *decode.D, maxOffset uint32) any { func DecodePGWAL(d *decode.D) any {
wal := &postgres.Wal{ wal := &postgres.Wal{
MaxOffset: int64(maxOffset),
DecodeXLogRecord: decodeXLogRecord, DecodeXLogRecord: decodeXLogRecord,
} }
return postgres.Decode(d, wal) return postgres.Decode(d, wal)

View File

@ -2,9 +2,8 @@ package postgres
import "github.com/wader/fq/pkg/decode" import "github.com/wader/fq/pkg/decode"
func DecodePGWAL(d *decode.D, maxOffset uint32) any { func DecodePGWAL(d *decode.D) any {
wal := &Wal{ wal := &Wal{
MaxOffset: int64(maxOffset),
DecodeXLogRecord: decodeXLogRecord, DecodeXLogRecord: decodeXLogRecord,
} }
return Decode(d, wal) return Decode(d, wal)

View File

@ -49,8 +49,7 @@ const (
/* total size (bytes): 12 */ /* total size (bytes): 12 */
type Wal struct { type Wal struct {
MaxOffset int64 page *walPage
page *walPage
pageRecords *decode.D pageRecords *decode.D
@ -79,11 +78,6 @@ func Decode(d *decode.D, wal *Wal) any {
} }
posBytes := pages.Pos() / 8 posBytes := pages.Pos() / 8
if posBytes >= wal.MaxOffset {
d.FieldRawLen("unused", d.BitsLeft())
break
}
remBytes := posBytes % XLOG_BLCKSZ remBytes := posBytes % XLOG_BLCKSZ
if remBytes != 0 { if remBytes != 0 {
d.Fatalf("invalid page remBytes = %d\n", remBytes) d.Fatalf("invalid page remBytes = %d\n", remBytes)
@ -193,12 +187,6 @@ func decodeXLogRecords(wal *Wal, d *decode.D) {
/* 20 | 4 */ // pg_crc32c xl_crc /* 20 | 4 */ // pg_crc32c xl_crc
posBytes1 := d.Pos() / 8 posBytes1 := d.Pos() / 8
posBytes1Aligned := int64(common.TypeAlign8(uint64(posBytes1))) posBytes1Aligned := int64(common.TypeAlign8(uint64(posBytes1)))
// check aligned - this is correct
// record header is 8 byte aligned
if posBytes1Aligned >= wal.MaxOffset {
d.FieldRawLen("unused", d.BitsLeft())
break
}
// check what we cat read xl_tot_len on this page // check what we cat read xl_tot_len on this page
if posMaxOfPageBytes < posBytes1Aligned+4 { if posMaxOfPageBytes < posBytes1Aligned+4 {

View File

@ -21,9 +21,8 @@ func init() {
Name: format.PG_WAL, Name: format.PG_WAL,
Description: "PostgreSQL write-ahead log file", Description: "PostgreSQL write-ahead log file",
DecodeFn: decodePGWAL, DecodeFn: decodePGWAL,
DecodeInArg: format.PostgresWalIn{ DecodeInArg: format.PostgresIn{
Flavour: PG_FLAVOUR_POSTGRES14, Flavour: PG_FLAVOUR_POSTGRES14,
Lsn: "",
}, },
RootArray: true, RootArray: true,
RootName: "pages", RootName: "pages",
@ -48,28 +47,14 @@ func ParseLsn(lsn string) (uint32, error) {
return uint32(r1), err return uint32(r1), err
} }
func XLogSegmentOffset(xLogPtr uint32) uint32 {
const walSegSizeBytes = 16 * 1024 * 1024
return xLogPtr & (walSegSizeBytes - 1)
}
func decodePGWAL(d *decode.D, in any) any { func decodePGWAL(d *decode.D, in any) any {
d.Endian = decode.LittleEndian d.Endian = decode.LittleEndian
pgIn, ok := in.(format.PostgresWalIn) pgIn, ok := in.(format.PostgresIn)
if !ok { if !ok {
d.Fatalf("DecodeInArg must be PostgresIn!\n") d.Fatalf("DecodeInArg must be PostgresIn!\n")
} }
maxOffset := uint32(0xFFFFFFFF)
if pgIn.Lsn != "" {
lsn, err := ParseLsn(pgIn.Lsn)
if err != nil {
d.Fatalf("Failed to ParseLsn, err = %v\n", err)
}
maxOffset = XLogSegmentOffset(lsn)
}
switch pgIn.Flavour { switch pgIn.Flavour {
case PG_FLAVOUR_POSTGRES10, case PG_FLAVOUR_POSTGRES10,
PG_FLAVOUR_POSTGRES11, PG_FLAVOUR_POSTGRES11,
@ -81,18 +66,18 @@ func decodePGWAL(d *decode.D, in any) any {
PG_FLAVOUR_PGPRO12, PG_FLAVOUR_PGPRO12,
PG_FLAVOUR_PGPRO13, PG_FLAVOUR_PGPRO13,
PG_FLAVOUR_PGPRO14: PG_FLAVOUR_PGPRO14:
return postgres.DecodePGWAL(d, maxOffset) return postgres.DecodePGWAL(d)
case PG_FLAVOUR_PGPROEE10, case PG_FLAVOUR_PGPROEE10,
PG_FLAVOUR_PGPROEE11, PG_FLAVOUR_PGPROEE11,
PG_FLAVOUR_PGPROEE12, PG_FLAVOUR_PGPROEE12,
PG_FLAVOUR_PGPROEE13, PG_FLAVOUR_PGPROEE13,
PG_FLAVOUR_PGPROEE14: PG_FLAVOUR_PGPROEE14:
return pgproee.DecodePGWAL(d, maxOffset) return pgproee.DecodePGWAL(d)
default: default:
break break
} }
return postgres.DecodePGWAL(d, maxOffset) return postgres.DecodePGWAL(d)
} }