1
1
mirror of https://github.com/wader/fq.git synced 2024-11-26 21:55:57 +03:00
fq/format/rtmp/rtmp.go
Mattias Wadman 87b2c6c10c help,doc: Use markdown for format documentation again
Markdown is used as is in online documentation and in cli the markdown decoder
is used to decode and the some jq code massages it into something cli friendly.

Was just too much of a mess to have doc in jq.
2022-09-11 00:53:29 +02:00

485 lines
14 KiB
Go

package rtmp
// https://rtmp.veriskope.com/docs/spec/
// https://rtmp.veriskope.com/pdf/video_file_format_spec_v10.pdf
// TODO: split to rtmp/rtmp_message?
// TODO: support to skip handshake?
// TODO: keep track of message stream format, decode aac etc
import (
"bytes"
"embed"
"github.com/wader/fq/format"
"github.com/wader/fq/pkg/bitio"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/interp"
"github.com/wader/fq/pkg/scalar"
)
var rtmpAmf0Group decode.Group
var rtmpMpegASCFormat decode.Group
//go:embed rtmp.md
var rtmpFS embed.FS
func init() {
interp.RegisterFormat(decode.Format{
Name: format.RTMP,
Description: "Real-Time Messaging Protocol",
Groups: []string{
format.TCP_STREAM,
},
DecodeFn: rtmpDecode,
Dependencies: []decode.Dependency{
{Names: []string{format.AMF0}, Group: &rtmpAmf0Group},
{Names: []string{format.MPEG_ASC}, Group: &rtmpMpegASCFormat},
},
})
interp.RegisterFS(rtmpFS)
}
// from RTMP spec
const defaultChunkSize = 128
// names from RTMP spec
const (
messageTypeSetChunkSize = 1
messageTypeAbortMessage = 2
messageTypeAcknowledgment = 3
messageTypeUserControlMessage = 4
messageTypeWindowAcknowledgementSize = 5
messageTypeSetPeerBandwidth = 6
messageTypeVirtualControl = 7 // TODO: not in spec but in wikipedia article
messageTypeAudioMessage = 8
messageTypeVideoMessage = 9
messageTypeDataMessageExtended = 15
messageTypeSharedObjectMessageExtended = 16
messageTypeCommandMessageExtended = 17
messageTypeDataMessage = 18
messageTypeSharedObjectMessage = 19
messageTypeCommandMessage = 20
messageTypeUDP = 21 // TODO: not in spec but in wikipedia article
messageTypeAggregateMessage = 22
messageTypePresent = 23 // TODO: not in spec but in wikipedia article
)
var rtmpMessageTypeIDNames = scalar.UToSymStr{
messageTypeSetChunkSize: "set_chunk_size",
messageTypeAbortMessage: "abort_message",
messageTypeAcknowledgment: "acknowledgment",
messageTypeUserControlMessage: "user_control_message",
messageTypeWindowAcknowledgementSize: "window_acknowledgement_size",
messageTypeSetPeerBandwidth: "set_peer_bandwidth",
messageTypeVirtualControl: "virtual_control",
messageTypeAudioMessage: "audio_message",
messageTypeVideoMessage: "video_message",
messageTypeDataMessageExtended: "data_message_extended",
messageTypeSharedObjectMessageExtended: "shared_object_message_extended",
messageTypeCommandMessageExtended: "command_message_extended",
messageTypeDataMessage: "data_message",
messageTypeSharedObjectMessage: "shared_object_message",
messageTypeCommandMessage: "command_message",
messageTypeUDP: "udp",
messageTypeAggregateMessage: "aggregate_message",
messageTypePresent: "present",
}
const (
userControlEvenTypeStreamBegin = 0
userControlEvenTypeStreamEOF = 1
userControlEvenTypeStreamDry = 2
userControlEvenTypeSetBufferLength = 3
userControlEvenTypeStreamIsRecorded = 4
userControlEvenTypePingRequest = 6
userControlEvenTypePingResponse = 7
)
var userControlEvenTypNames = scalar.UToSymStr{
userControlEvenTypeStreamBegin: "stream_begin",
userControlEvenTypeStreamEOF: "stream_eof",
userControlEvenTypeStreamDry: "stream_dry",
userControlEvenTypeSetBufferLength: "set_buffer_length",
userControlEvenTypeStreamIsRecorded: "stream_is_recorded",
userControlEvenTypePingRequest: "ping_request",
userControlEvenTypePingResponse: "ping_response",
}
var setPeerBandwidthLimitTypeName = scalar.UToSymStr{
0: "hard",
1: "soft",
2: "dynamic",
}
const timestampExtended = 0xff_ff_ff
var timestampDescription = scalar.UToDescription{
timestampExtended: "extended",
}
const (
audioMessageCodecAAC = 10
)
// based on https://github.com/wireshark/wireshark/blob/master/epan/dissectors/packet-rtmpt.c
// which in turn is based on rtmp and swf specifications and FLV v10.1 section E.4.3.1
var audioMessageCodecNames = scalar.UToSymStr{
0: "uncompressed",
1: "adpcm",
2: "mp3",
3: "uncompressed_le",
4: "nellymoser_16khz",
5: "nellymoser_8khz",
6: "nellymoser",
7: "g711a",
8: "g711u",
9: "nellymoser_16khz",
audioMessageCodecAAC: "aac",
11: "speex",
}
const (
audioMessageAACPacketTypeASC = 0
audioMessageAACPacketTypeRaw = 1
)
var audioMessageAACPacketTypeNames = scalar.UToSymStr{
audioMessageAACPacketTypeASC: "asc",
audioMessageAACPacketTypeRaw: "raw",
}
var audioMessageRateNames = scalar.UToSymU{
0: 5500,
1: 11025,
2: 22050,
3: 44100,
}
var audioMessageSampleSize = scalar.UToSymU{
0: 8,
1: 16,
}
var audioMessageChannels = scalar.UToSymU{
0: 1,
1: 2,
}
var videoMessageTypeNames = scalar.UToSymStr{
1: "keyframe",
2: "inter_frame",
3: "disposable_inter_frame",
4: "generated_key_frame",
5: "video_info_or_command_frame",
}
const (
videoMessageCodecH264 = 7
)
var videoMessageCodecNames = scalar.UToSymStr{
2: "h263",
3: "screen_video",
4: "vp6",
5: "vp6_alpha",
6: "screen_video_v2",
videoMessageCodecH264: "h264",
}
var videoMessageH264PacketTypeNames = scalar.UToSymStr{
0: "dcr",
1: "au", // TODO: is access unit?
2: "empty",
}
// TODO: invalid warning that timestampDelta is unused
//nolint:unused
type messageHeader struct {
timestamp uint64
timestampDelta uint64
messageStreamID uint64
messageLength uint64
messageTypeID uint64
}
func rtmpDecodeMessageType(d *decode.D, typ int, chunkSize *int) {
switch typ {
case messageTypeSetChunkSize:
// TODO: zero bit, verify size? message size is 24 bit
*chunkSize = int(d.FieldU32("size"))
case messageTypeAbortMessage:
d.FieldU32("chunk_stream_id")
case messageTypeAcknowledgment:
d.FieldU32("sequence_number")
case messageTypeUserControlMessage:
typ := d.FieldU16("type", userControlEvenTypNames)
switch typ {
case userControlEvenTypeStreamBegin:
d.FieldU32("stream_id")
case userControlEvenTypeStreamEOF:
d.FieldU32("stream_id")
case userControlEvenTypeStreamDry:
d.FieldU32("stream_id")
case userControlEvenTypeSetBufferLength:
d.FieldU32("stream_id")
d.FieldU32("length")
case userControlEvenTypeStreamIsRecorded:
d.FieldU32("stream_id")
case userControlEvenTypePingRequest:
d.FieldU32("timestamp")
case userControlEvenTypePingResponse:
d.FieldU32("timestamp")
default:
d.FieldRawLen("data", d.BitsLeft())
}
case messageTypeWindowAcknowledgementSize:
d.FieldU32("window_size")
case messageTypeSetPeerBandwidth:
d.FieldU32("chunk_size")
d.FieldU8("limit_type", setPeerBandwidthLimitTypeName)
case messageTypeDataMessage:
d.FieldArray("messages", func(d *decode.D) {
for !d.End() {
d.FieldFormat("message", rtmpAmf0Group, nil)
}
})
case messageTypeCommandMessage:
d.FieldFormat("command_name", rtmpAmf0Group, nil)
d.FieldFormat("transaction_id", rtmpAmf0Group, nil)
d.FieldFormat("command_object", rtmpAmf0Group, nil)
d.FieldArray("arguments", func(d *decode.D) {
for !d.End() {
d.FieldFormat("argument", rtmpAmf0Group, nil)
}
})
case messageTypeAggregateMessage:
d.FieldArray("messages", func(d *decode.D) {
for !d.End() {
d.FieldStruct("message", func(d *decode.D) {
var h messageHeader
h.messageTypeID = d.FieldU8("message_type_id", rtmpMessageTypeIDNames)
h.messageLength = d.FieldU24("message_length")
h.timestamp = d.FieldU32("timestamp", timestampDescription)
h.messageStreamID = d.FieldU24("message_stream_id")
// TODO: possible to set chunk size in aggregated message?
d.FramedFn(int64(h.messageLength*8), func(d *decode.D) {
rtmpDecodeMessageType(d, int(h.messageTypeID), chunkSize)
})
d.FieldU32("back_pointer")
})
}
})
case messageTypeAudioMessage:
if d.BitsLeft() == 0 {
return
}
codec := d.FieldU4("codec", audioMessageCodecNames)
d.FieldU2("sample_rate", audioMessageRateNames)
d.FieldU1("sample_size", audioMessageSampleSize)
d.FieldU1("channels", audioMessageChannels)
if codec == audioMessageCodecAAC {
switch d.FieldU8("type", audioMessageAACPacketTypeNames) {
case audioMessageAACPacketTypeASC:
d.FieldFormat("data", rtmpMpegASCFormat, nil)
default:
d.FieldRawLen("data", d.BitsLeft())
}
} else {
d.FieldRawLen("data", d.BitsLeft())
}
case messageTypeVideoMessage:
if d.BitsLeft() == 0 {
return
}
d.FieldU4("type", videoMessageTypeNames)
codec := d.FieldU4("codec", videoMessageCodecNames)
// TODO: flv header + h263 format?
// TODO: ffmpeg rtmp proto seems to recrate a flv stream and demux it
if codec == videoMessageCodecH264 {
d.FieldU8("type", videoMessageH264PacketTypeNames)
}
d.FieldRawLen("data", d.BitsLeft())
default:
d.FieldRawLen("data", d.BitsLeft())
}
}
func rtmpDecode(d *decode.D, in any) any {
var isClient bool
if tsi, ok := in.(format.TCPStreamIn); ok {
tsi.MustIsPort(d.Fatalf, format.TCPPortRTMP)
isClient = tsi.IsClient
}
// chunk size is global for one direction
chunkSize := defaultChunkSize
name := "s"
if isClient {
name = "c"
}
// TODO: 1536 byte blobs instead?
d.FieldStruct("handshake", func(d *decode.D) {
d.FieldStruct(name+"0", func(d *decode.D) {
d.FieldU8("version")
})
d.FieldStruct(name+"1", func(d *decode.D) {
d.FieldU32("time")
d.FieldU32("zero") // TODO: does not seems to be zero sometimes?
d.FieldRawLen("random", 1528*8)
})
d.FieldStruct(name+"2", func(d *decode.D) {
d.FieldU32("time")
d.FieldU32("time2")
d.FieldRawLen("random", 1528*8)
})
})
type messageHeader struct {
timestamp uint64
timestampDelta uint64
messageStreamID uint64
messageLength uint64
messageTypeID uint64
}
type message struct {
l uint64
b bytes.Buffer
typ uint64
}
type chunkStream struct {
messageSteams map[uint64]*message
prevHeader messageHeader
}
chunkStreams := map[uint64]*chunkStream{}
messages := d.FieldArrayValue("messages")
d.FieldArray("chunks", func(d *decode.D) {
for !d.End() {
d.FieldStruct("chunk", func(d *decode.D) {
var chunkSteamID uint64
fmt := d.FieldU2("fmt")
switch d.PeekBits(6) {
case 0:
// 64-319: 2 byte
d.FieldU6("chunk_stream_id_prefix")
chunkSteamID = d.FieldU8("chunk_stream_id", scalar.ActualUAdd(64))
case 1:
// 64-65599: 3 byte
d.FieldU6("chunk_stream_id_prefix")
chunkSteamID = d.FieldU16("chunk_stream_id", scalar.ActualUAdd(64))
default:
// 2-63: 1 byte
chunkSteamID = d.FieldU6("chunk_stream_id")
}
cs, ok := chunkStreams[chunkSteamID]
if !ok {
cs = &chunkStream{
messageSteams: map[uint64]*message{},
}
chunkStreams[chunkSteamID] = cs
}
var h messageHeader
switch fmt {
case 0:
h.timestamp = d.FieldU24("timestamp", timestampDescription)
h.messageLength = d.FieldU24("message_length")
h.messageTypeID = d.FieldU8("message_type_id", rtmpMessageTypeIDNames)
h.messageStreamID = d.FieldU32LE("message_stream_id")
if h.timestamp == timestampExtended {
h.timestamp = d.FieldU32("extended_timestamp")
}
case 1:
h.timestampDelta = d.FieldU24("timestamp_delta", timestampDescription)
h.messageLength = d.FieldU24("message_length")
h.messageTypeID = d.FieldU8("message_type_id", rtmpMessageTypeIDNames)
if h.timestamp == timestampExtended {
h.timestampDelta = d.FieldU32("extended_timestamp")
}
h.timestamp = cs.prevHeader.timestamp
h.messageStreamID = cs.prevHeader.messageStreamID
d.FieldValueU("message_stream_id", h.messageStreamID, scalar.Description("previous"))
case 2:
h.timestampDelta = d.FieldU24("timestamp_delta", timestampDescription)
if h.timestamp == timestampExtended {
h.timestampDelta = d.FieldU32("extended_timestamp")
}
h.timestamp = cs.prevHeader.timestamp
h.messageLength = cs.prevHeader.messageLength
h.messageStreamID = cs.prevHeader.messageStreamID
h.messageTypeID = cs.prevHeader.messageTypeID
d.FieldValueU("message_length", h.messageLength, scalar.Description("previous"))
d.FieldValueU("message_type_id", h.messageTypeID, scalar.Description("previous"))
d.FieldValueU("message_stream_id", h.messageStreamID, scalar.Description("previous"))
case 3:
h.timestamp = cs.prevHeader.timestamp
h.timestampDelta = cs.prevHeader.timestampDelta
h.messageLength = cs.prevHeader.messageLength
h.messageStreamID = cs.prevHeader.messageStreamID
h.messageTypeID = cs.prevHeader.messageTypeID
d.FieldValueU("message_length", h.messageLength, scalar.Description("previous"))
d.FieldValueU("message_type_id", h.messageTypeID, scalar.Description("previous"))
d.FieldValueU("message_stream_id", h.messageStreamID, scalar.Description("previous"))
}
h.timestamp += h.timestampDelta
d.FieldValueU("calculated_timestamp", h.timestamp)
m, ok := cs.messageSteams[h.messageStreamID]
if !ok {
m = &message{
l: h.messageLength,
typ: h.messageTypeID,
}
cs.messageSteams[h.messageStreamID] = m
}
payloadLength := int64(chunkSize)
messageLeft := int64(m.l) - int64(m.b.Len())
if messageLeft < payloadLength {
payloadLength = messageLeft
}
// support decoding interrupted rtmp stream
// TODO: throw away message buffer? currently only do tcp so no point?
payloadLength *= 8
if payloadLength > d.BitsLeft() {
payloadLength = d.BitsLeft()
}
if payloadLength > 0 {
d.CopyBits(&m.b, d.FieldRawLen("data", payloadLength))
}
if m.l == uint64(m.b.Len()) {
messageBR := bitio.NewBitReader(m.b.Bytes(), -1)
messages.FieldStructRootBitBufFn("message", messageBR, func(d *decode.D) {
d.FieldValueU("message_stream_id", h.messageStreamID)
d.FieldValueU("message_type_id", m.typ, rtmpMessageTypeIDNames)
rtmpDecodeMessageType(d, int(m.typ), &chunkSize)
})
// delete so that we create a new message{} with a new bytes.Buffer to
// not share byte slice
delete(cs.messageSteams, h.messageStreamID)
}
cs.prevHeader = h
})
}
})
return nil
}