diff --git a/format/format.go b/format/format.go index 75f5904a..50403ae2 100644 --- a/format/format.go +++ b/format/format.go @@ -403,6 +403,10 @@ type Pg_BTree_In struct { Page int `doc:"First page number in file, default is 0"` } +type MpegTsIn struct { + MaxSyncSeek int `doc:"Max byte distance to next sync"` +} + type MpegTsStream struct { ProgramPid int Type int @@ -415,12 +419,14 @@ type MpegTsProgram struct { } type MpegTsPacketIn struct { - ProgramMap map[int]MpegTsProgram - StreamMap map[int]MpegTsStream + ProgramMap map[int]MpegTsProgram + StreamMap map[int]MpegTsStream + ContinuityMap map[int]int } type MpegTsPacketOut struct { Pid int + TransportErrorIndicator bool ContinuityCounter int TransportScramblingControl int PayloadUnitStart bool diff --git a/format/mpeg/mpeg_ts.go b/format/mpeg/mpeg_ts.go index c9fa72f9..54ef14d9 100644 --- a/format/mpeg/mpeg_ts.go +++ b/format/mpeg/mpeg_ts.go @@ -8,6 +8,7 @@ package mpeg // TODO: mpeg_pes, share code? // TODO: mpeg_pes_packet, length 0 for video? // TODO: dup start? +// TODO: transport error indicator, count somehow? now mpeg_ts_packet fails // ffmpeg $(for i in $(seq 0 50); do echo "-f lavfi -i sine"; done) -t 100ms $(for i in $(seq 0 50); do echo "-map $i:0"; done) test2.ts @@ -45,6 +46,9 @@ func init() { {Groups: []*decode.Group{format.MPEG_TS_PMT}, Out: &mpegTsMpegTsPmtGroup}, {Groups: []*decode.Group{format.MPEG_PES_Packet}, Out: &mpegTsMpegPesPacketGroup}, }, + DefaultInArg: format.MpegTsIn{ + MaxSyncSeek: 100 * 1024, + }, }) } @@ -65,18 +69,19 @@ func (tb *tsBuffer) Reset() { // new bytes buffer to not share byte slice tb.buf = bytes.Buffer{} tb.packetIndexes = nil - } -type tsContinuityMap map[int]int - -func (tcm tsContinuityMap) Update(pid int, n int) bool { - current, currentOk := tcm[pid] - tcm[pid] = n - if currentOk { - return (current+1)&0xf == n +func tsContinuityUpdate(tcm map[int]int, pid int, current int) bool { + prev, prevFound := tcm[pid] + valid := (prevFound && ((prev+1)&0xf == current)) || current == 0 + if valid { + tcm[pid] = current + return true } - return n == 0 + if prevFound { + delete(tcm, pid) + } + return valid } func tsPesDecode(d *decode.D, pid int, programPid int, streamType int, pesBuf *tsBuffer) { @@ -93,29 +98,46 @@ func tsPesDecode(d *decode.D, pid int, programPid int, streamType int, pesBuf *t } func tsDecode(d *decode.D) any { + var ti format.MpegTsIn + + d.ArgAs(&ti) + var tableReassemble = map[int]*tsBuffer{} var pesReassemble = map[int]*tsBuffer{} pidProgramMap := map[int]format.MpegTsProgram{} pidStreamMap := map[int]format.MpegTsStream{} - continuityMap := tsContinuityMap(map[int]int{}) + continuityMap := map[int]int{} packetIndex := 0 + decodeFailures := 0 tablesD := d.FieldArrayValue("tables") pesD := d.FieldArrayValue("pes") d.FieldArray("packets", func(d *decode.D) { for !d.End() { - _, v, err := d.TryFieldFormat( + syncLen, _, err := d.TryPeekFind(8, 8, int64(ti.MaxSyncSeek), func(v uint64) bool { + return v == 0x47 + }) + if err != nil || syncLen < 0 { + break + } + if syncLen > 0 { + d.SeekRel(syncLen) + } + + _, v, err := d.TryFieldFormatLen( "packet", + tsPacketLength, &mpegTsMpegTsPacketGroup, format.MpegTsPacketIn{ - ProgramMap: pidProgramMap, - StreamMap: pidStreamMap, + ProgramMap: pidProgramMap, + StreamMap: pidStreamMap, + ContinuityMap: continuityMap, }, ) if err != nil { - // TODO: malformted packet, how? - d.FieldRawLen("packet", tsPacketLength) + decodeFailures++ + d.SeekRel(8) continue } mtpo, mtpoOk := v.(format.MpegTsPacketOut) @@ -123,7 +145,7 @@ func tsDecode(d *decode.D) any { panic("packet is not a MpegTsPacketOut") } - isContinous := continuityMap.Update(mtpo.Pid, mtpo.ContinuityCounter) + isContinous := tsContinuityUpdate(continuityMap, mtpo.Pid, mtpo.ContinuityCounter) isTable := tsPidIsTable(mtpo.Pid, pidProgramMap) stream, isStream := pidStreamMap[mtpo.Pid] diff --git a/format/mpeg/mpeg_ts_packet.go b/format/mpeg/mpeg_ts_packet.go index 0795ea97..b252f948 100644 --- a/format/mpeg/mpeg_ts_packet.go +++ b/format/mpeg/mpeg_ts_packet.go @@ -22,119 +22,132 @@ func tsPacketDecode(d *decode.D) any { if d.ArgAs(&mtpi) { mtpi.ProgramMap = map[int]format.MpegTsProgram{} mtpi.StreamMap = map[int]format.MpegTsStream{} + mtpi.ContinuityMap = map[int]int{} } var mtpo format.MpegTsPacketOut - d.FramedFn(tsPacketLength, func(d *decode.D) { - d.FieldU8("sync", scalar.UintHex) // TODO: sometimes not 0x47? d.UintAssert(0x47) - d.FieldBool("transport_error_indicator") - mtpo.PayloadUnitStart = d.FieldBool("payload_unit_start") - d.FieldBool("transport_priority") - pid := d.FieldU13("pid", tsPidMap, scalar.UintHex) - if p, ok := mtpi.ProgramMap[int(pid)]; ok { + d.FieldU8("sync", scalar.UintHex, d.UintAssert(0x47)) + mtpo.TransportErrorIndicator = d.FieldBool("transport_error_indicator", d.BoolAssert(false)) + mtpo.PayloadUnitStart = d.FieldBool("payload_unit_start") + d.FieldBool("transport_priority") + pid := d.FieldU13("pid", tsPidMap, scalar.UintHex) + if p, ok := mtpi.ProgramMap[int(pid)]; ok { + d.FieldValueUint("program", uint64(p.Number), scalar.UintHex) + } else if s, ok := mtpi.StreamMap[int(pid)]; ok { + if p, ok := mtpi.ProgramMap[s.ProgramPid]; ok { d.FieldValueUint("program", uint64(p.Number), scalar.UintHex) - } else if s, ok := mtpi.StreamMap[int(pid)]; ok { - if p, ok := mtpi.ProgramMap[s.ProgramPid]; ok { - d.FieldValueUint("program", uint64(p.Number), scalar.UintHex) - } - d.FieldValueUint("stream_type", uint64(s.Type), tsStreamTypeMap) - } - mtpo.Pid = int(pid) - mtpo.TransportScramblingControl = int(d.FieldU2("transport_scrambling_control", scalar.UintMapSymStr{ - 0b00: "not_scrambled", - 0b01: "reserved", - 0b10: "even_key", - 0b11: "odd_key", - })) - adaptationFieldControl := d.FieldU2("adaptation_field_control", scalar.UintMapSymStr{ - 0b00: "reserved", - adaptationFieldControlPayloadOnly: "payload_only", - adaptationFieldControlAdaptationFieldOnly: "adaptation_field_only", - adaptationFieldControlAdaptationFieldAndPayload: "adaptation_and_payload", - }) - mtpo.ContinuityCounter = int(d.FieldU4("continuity_counter")) - - switch adaptationFieldControl { - case adaptationFieldControlAdaptationFieldOnly, - adaptationFieldControlAdaptationFieldAndPayload: - d.FieldStruct("adaptation_field", func(d *decode.D) { - length := d.FieldU8("length") // Number of bytes in the adaptation field immediately following this byte - d.FramedFn(int64(length)*8, func(d *decode.D) { - d.FieldBool("discontinuity_indicator") // Set if current TS packet is in a discontinuity state with respect to either the continuity counter or the program clock reference - d.FieldBool("random_access_indicator") // Set when the stream may be decoded without errors from this point - d.FieldBool("elementary_stream_priority_indicator") // Set when this stream should be considered "high priority" - pcrPresent := d.FieldBool("pcr_present") // Set when PCR field is present - opcrPresent := d.FieldBool("opcr_present") // Set when OPCR field is present - splicingPointPresent := d.FieldBool("splicing_point_present") // Set when splice countdown field is present - transportPrivatePresent := d.FieldBool("transport_private_present") // Set when transport private data is present - adaptationFieldExtensionPresent := d.FieldBool("adaptation_field_extension_present") // Set when adaptation extension data is present - if pcrPresent { - d.FieldU("pcr", 48) - } - if opcrPresent { - d.FieldU("opcr", 48) - } - if splicingPointPresent { - d.FieldU8("splicing_point") - } - if transportPrivatePresent { - d.FieldStruct("transport_private", func(d *decode.D) { - length := d.FieldU8("length") - d.FieldRawLen("data", int64(length)*8) - }) - } - if adaptationFieldExtensionPresent { - d.FieldStruct("adaptation_extension", func(d *decode.D) { - length := d.FieldU8("length") - d.FramedFn(int64(length)*8, func(d *decode.D) { - d.FieldBool("legal_time_window") - d.FieldBool("piecewise_rate") - d.FieldBool("seamless_splice") - d.FieldU5("reserved", scalar.UintHex) - d.FieldRawLen("data", d.BitsLeft()) - }) - }) - - // Optional fields - // LTW flag set (2 bytes) - // LTW valid flag 1 0x8000 - // LTW offset 15 0x7fff Extra information for rebroadcasters to determine the state of buffers when packets may be missing. - // Piecewise flag set (3 bytes) - // Reserved 2 0xc00000 - // Piecewise rate 22 0x3fffff The rate of the stream, measured in 188-byte packets, to define the end-time of the LTW. - // Seamless splice flag set (5 bytes) - // Splice type 4 0xf000000000 Indicates the parameters of the H.262 splice. - // DTS next access unit 36 0x0efffefffe The PES DTS of the splice point. Split up as multiple fields, 1 marker bit (0x1), 15 bits, 1 marker bit, 15 bits, and 1 marker bit, for 33 data bits total. - } - if d.BitsLeft() > 0 { - d.FieldRawLen("stuffing", d.BitsLeft()) - } - }) - }) - } - - isTable := tsPidIsTable(mtpo.Pid, mtpi.ProgramMap) - if isTable { - var payloadPointer uint64 - if mtpo.PayloadUnitStart { - payloadPointer = d.FieldU8("payload_pointer") - } - if payloadPointer > 0 { - d.FieldRawLen("stuffing", int64(payloadPointer)*8) - } - } - - switch adaptationFieldControl { - case adaptationFieldControlPayloadOnly, - adaptationFieldControlAdaptationFieldAndPayload: - payload := d.FieldRawLen("payload", d.BitsLeft()) - mtpo.Payload = d.ReadAllBits(payload) - default: - // TODO: unknown adaption control flags - d.FieldRawLen("unknown", d.BitsLeft()) } + d.FieldValueUint("stream_type", uint64(s.Type), tsStreamTypeMap) + } + mtpo.Pid = int(pid) + mtpo.TransportScramblingControl = int(d.FieldU2("transport_scrambling_control", scalar.UintMapSymStr{ + 0b00: "not_scrambled", + 0b01: "reserved", + 0b10: "even_key", + 0b11: "odd_key", + })) + adaptationFieldControl := d.FieldU2("adaptation_field_control", scalar.UintMapSymStr{ + 0b00: "reserved", + adaptationFieldControlPayloadOnly: "payload_only", + adaptationFieldControlAdaptationFieldOnly: "adaptation_field_only", + adaptationFieldControlAdaptationFieldAndPayload: "adaptation_and_payload", }) + mtpo.ContinuityCounter = int(d.FieldU4("continuity_counter", scalar.UintFn(func(s scalar.Uint) (scalar.Uint, error) { + prev, prevFound := mtpi.ContinuityMap[int(pid)] + current := int(s.Actual) + + switch { + case prevFound && (prev+1)&0xf == current: + s.Description = "valid" + case prevFound: + s.Description = "invalid" + default: + s.Description = "unknown" + } + + return s, nil + }))) + + switch adaptationFieldControl { + case adaptationFieldControlAdaptationFieldOnly, + adaptationFieldControlAdaptationFieldAndPayload: + d.FieldStruct("adaptation_field", func(d *decode.D) { + length := d.FieldU8("length") // Number of bytes in the adaptation field immediately following this byte + d.FramedFn(int64(length)*8, func(d *decode.D) { + d.FieldBool("discontinuity_indicator") // Set if current TS packet is in a discontinuity state with respect to either the continuity counter or the program clock reference + d.FieldBool("random_access_indicator") // Set when the stream may be decoded without errors from this point + d.FieldBool("elementary_stream_priority_indicator") // Set when this stream should be considered "high priority" + pcrPresent := d.FieldBool("pcr_present") // Set when PCR field is present + opcrPresent := d.FieldBool("opcr_present") // Set when OPCR field is present + splicingPointPresent := d.FieldBool("splicing_point_present") // Set when splice countdown field is present + transportPrivatePresent := d.FieldBool("transport_private_present") // Set when transport private data is present + adaptationFieldExtensionPresent := d.FieldBool("adaptation_field_extension_present") // Set when adaptation extension data is present + if pcrPresent { + d.FieldU("pcr", 48) + } + if opcrPresent { + d.FieldU("opcr", 48) + } + if splicingPointPresent { + d.FieldU8("splicing_point") + } + if transportPrivatePresent { + d.FieldStruct("transport_private", func(d *decode.D) { + length := d.FieldU8("length") + d.FieldRawLen("data", int64(length)*8) + }) + } + if adaptationFieldExtensionPresent { + d.FieldStruct("adaptation_extension", func(d *decode.D) { + length := d.FieldU8("length") + d.FramedFn(int64(length)*8, func(d *decode.D) { + d.FieldBool("legal_time_window") + d.FieldBool("piecewise_rate") + d.FieldBool("seamless_splice") + d.FieldU5("reserved", scalar.UintHex) + d.FieldRawLen("data", d.BitsLeft()) + }) + }) + + // Optional fields + // LTW flag set (2 bytes) + // LTW valid flag 1 0x8000 + // LTW offset 15 0x7fff Extra information for rebroadcasters to determine the state of buffers when packets may be missing. + // Piecewise flag set (3 bytes) + // Reserved 2 0xc00000 + // Piecewise rate 22 0x3fffff The rate of the stream, measured in 188-byte packets, to define the end-time of the LTW. + // Seamless splice flag set (5 bytes) + // Splice type 4 0xf000000000 Indicates the parameters of the H.262 splice. + // DTS next access unit 36 0x0efffefffe The PES DTS of the splice point. Split up as multiple fields, 1 marker bit (0x1), 15 bits, 1 marker bit, 15 bits, and 1 marker bit, for 33 data bits total. + } + if d.BitsLeft() > 0 { + d.FieldRawLen("stuffing", d.BitsLeft()) + } + }) + }) + } + + isTable := tsPidIsTable(mtpo.Pid, mtpi.ProgramMap) + if isTable { + var payloadPointer uint64 + if mtpo.PayloadUnitStart { + payloadPointer = d.FieldU8("payload_pointer") + } + if payloadPointer > 0 { + d.FieldRawLen("stuffing", int64(payloadPointer)*8) + } + } + + switch adaptationFieldControl { + case adaptationFieldControlPayloadOnly, + adaptationFieldControlAdaptationFieldAndPayload: + payload := d.FieldRawLen("payload", d.BitsLeft()) + mtpo.Payload = d.ReadAllBits(payload) + default: + // TODO: unknown adaption control flags + d.FieldRawLen("unknown", d.BitsLeft()) + } return mtpo }