1
1
mirror of https://github.com/wader/fq.git synced 2024-09-11 12:05:39 +03:00
This commit is contained in:
Mattias Wadman 2023-01-21 18:33:48 +01:00
parent 35f8379e08
commit 03316988aa
3 changed files with 165 additions and 124 deletions

View File

@ -403,6 +403,10 @@ type Pg_BTree_In struct {
Page int `doc:"First page number in file, default is 0"`
}
type MpegTsIn struct {
MaxSyncSeek int `doc:"Max byte distance to next sync"`
}
type MpegTsStream struct {
ProgramPid int
Type int
@ -417,10 +421,12 @@ type MpegTsProgram struct {
type MpegTsPacketIn struct {
ProgramMap map[int]MpegTsProgram
StreamMap map[int]MpegTsStream
ContinuityMap map[int]int
}
type MpegTsPacketOut struct {
Pid int
TransportErrorIndicator bool
ContinuityCounter int
TransportScramblingControl int
PayloadUnitStart bool

View File

@ -8,6 +8,7 @@ package mpeg
// TODO: mpeg_pes, share code?
// TODO: mpeg_pes_packet, length 0 for video?
// TODO: dup start?
// TODO: transport error indicator, count somehow? now mpeg_ts_packet fails
// ffmpeg $(for i in $(seq 0 50); do echo "-f lavfi -i sine"; done) -t 100ms $(for i in $(seq 0 50); do echo "-map $i:0"; done) test2.ts
@ -45,6 +46,9 @@ func init() {
{Groups: []*decode.Group{format.MPEG_TS_PMT}, Out: &mpegTsMpegTsPmtGroup},
{Groups: []*decode.Group{format.MPEG_PES_Packet}, Out: &mpegTsMpegPesPacketGroup},
},
DefaultInArg: format.MpegTsIn{
MaxSyncSeek: 100 * 1024,
},
})
}
@ -65,18 +69,19 @@ func (tb *tsBuffer) Reset() {
// new bytes buffer to not share byte slice
tb.buf = bytes.Buffer{}
tb.packetIndexes = nil
}
type tsContinuityMap map[int]int
func (tcm tsContinuityMap) Update(pid int, n int) bool {
current, currentOk := tcm[pid]
tcm[pid] = n
if currentOk {
return (current+1)&0xf == n
func tsContinuityUpdate(tcm map[int]int, pid int, current int) bool {
prev, prevFound := tcm[pid]
valid := (prevFound && ((prev+1)&0xf == current)) || current == 0
if valid {
tcm[pid] = current
return true
}
return n == 0
if prevFound {
delete(tcm, pid)
}
return valid
}
func tsPesDecode(d *decode.D, pid int, programPid int, streamType int, pesBuf *tsBuffer) {
@ -93,29 +98,46 @@ func tsPesDecode(d *decode.D, pid int, programPid int, streamType int, pesBuf *t
}
func tsDecode(d *decode.D) any {
var ti format.MpegTsIn
d.ArgAs(&ti)
var tableReassemble = map[int]*tsBuffer{}
var pesReassemble = map[int]*tsBuffer{}
pidProgramMap := map[int]format.MpegTsProgram{}
pidStreamMap := map[int]format.MpegTsStream{}
continuityMap := tsContinuityMap(map[int]int{})
continuityMap := map[int]int{}
packetIndex := 0
decodeFailures := 0
tablesD := d.FieldArrayValue("tables")
pesD := d.FieldArrayValue("pes")
d.FieldArray("packets", func(d *decode.D) {
for !d.End() {
_, v, err := d.TryFieldFormat(
syncLen, _, err := d.TryPeekFind(8, 8, int64(ti.MaxSyncSeek), func(v uint64) bool {
return v == 0x47
})
if err != nil || syncLen < 0 {
break
}
if syncLen > 0 {
d.SeekRel(syncLen)
}
_, v, err := d.TryFieldFormatLen(
"packet",
tsPacketLength,
&mpegTsMpegTsPacketGroup,
format.MpegTsPacketIn{
ProgramMap: pidProgramMap,
StreamMap: pidStreamMap,
ContinuityMap: continuityMap,
},
)
if err != nil {
// TODO: malformted packet, how?
d.FieldRawLen("packet", tsPacketLength)
decodeFailures++
d.SeekRel(8)
continue
}
mtpo, mtpoOk := v.(format.MpegTsPacketOut)
@ -123,7 +145,7 @@ func tsDecode(d *decode.D) any {
panic("packet is not a MpegTsPacketOut")
}
isContinous := continuityMap.Update(mtpo.Pid, mtpo.ContinuityCounter)
isContinous := tsContinuityUpdate(continuityMap, mtpo.Pid, mtpo.ContinuityCounter)
isTable := tsPidIsTable(mtpo.Pid, pidProgramMap)
stream, isStream := pidStreamMap[mtpo.Pid]

View File

@ -22,13 +22,13 @@ func tsPacketDecode(d *decode.D) any {
if d.ArgAs(&mtpi) {
mtpi.ProgramMap = map[int]format.MpegTsProgram{}
mtpi.StreamMap = map[int]format.MpegTsStream{}
mtpi.ContinuityMap = map[int]int{}
}
var mtpo format.MpegTsPacketOut
d.FramedFn(tsPacketLength, func(d *decode.D) {
d.FieldU8("sync", scalar.UintHex) // TODO: sometimes not 0x47? d.UintAssert(0x47)
d.FieldBool("transport_error_indicator")
d.FieldU8("sync", scalar.UintHex, d.UintAssert(0x47))
mtpo.TransportErrorIndicator = d.FieldBool("transport_error_indicator", d.BoolAssert(false))
mtpo.PayloadUnitStart = d.FieldBool("payload_unit_start")
d.FieldBool("transport_priority")
pid := d.FieldU13("pid", tsPidMap, scalar.UintHex)
@ -53,7 +53,21 @@ func tsPacketDecode(d *decode.D) any {
adaptationFieldControlAdaptationFieldOnly: "adaptation_field_only",
adaptationFieldControlAdaptationFieldAndPayload: "adaptation_and_payload",
})
mtpo.ContinuityCounter = int(d.FieldU4("continuity_counter"))
mtpo.ContinuityCounter = int(d.FieldU4("continuity_counter", scalar.UintFn(func(s scalar.Uint) (scalar.Uint, error) {
prev, prevFound := mtpi.ContinuityMap[int(pid)]
current := int(s.Actual)
switch {
case prevFound && (prev+1)&0xf == current:
s.Description = "valid"
case prevFound:
s.Description = "invalid"
default:
s.Description = "unknown"
}
return s, nil
})))
switch adaptationFieldControl {
case adaptationFieldControlAdaptationFieldOnly,
@ -134,7 +148,6 @@ func tsPacketDecode(d *decode.D) any {
// TODO: unknown adaption control flags
d.FieldRawLen("unknown", d.BitsLeft())
}
})
return mtpo
}