mirror of
https://github.com/wader/fq.git
synced 2024-12-21 04:11:55 +03:00
e9d9f8aef9
Rename s/interface{}/any/g Preparation for using generics in decode API and native jq funcations etc Remove some unused linter ignores as linter has been fixed
485 lines
14 KiB
Go
485 lines
14 KiB
Go
package rtmp
|
|
|
|
// https://rtmp.veriskope.com/docs/spec/
|
|
// https://rtmp.veriskope.com/pdf/video_file_format_spec_v10.pdf
|
|
|
|
// TODO: split to rtmp/rtmp_message?
|
|
// TODO: support to skip handshake?
|
|
// TODO: keep track of message stream format, decode aac etc
|
|
|
|
import (
|
|
"bytes"
|
|
"embed"
|
|
|
|
"github.com/wader/fq/format"
|
|
"github.com/wader/fq/format/registry"
|
|
"github.com/wader/fq/pkg/bitio"
|
|
"github.com/wader/fq/pkg/decode"
|
|
"github.com/wader/fq/pkg/scalar"
|
|
)
|
|
|
|
var rtmpAmf0Group decode.Group
|
|
var rtmpMpegASCFormat decode.Group
|
|
|
|
//go:embed rtmp.jq
|
|
var rtmpFS embed.FS
|
|
|
|
func init() {
|
|
registry.MustRegister(decode.Format{
|
|
Name: format.RTMP,
|
|
Description: "Real-Time Messaging Protocol",
|
|
Groups: []string{
|
|
format.TCP_STREAM,
|
|
},
|
|
DecodeFn: rtmpDecode,
|
|
Dependencies: []decode.Dependency{
|
|
{Names: []string{format.AMF0}, Group: &rtmpAmf0Group},
|
|
{Names: []string{format.MPEG_ASC}, Group: &rtmpMpegASCFormat},
|
|
},
|
|
Functions: []string{"_help"},
|
|
Files: rtmpFS,
|
|
})
|
|
}
|
|
|
|
// from RTMP spec
|
|
const defaultChunkSize = 128
|
|
|
|
// names from RTMP spec
|
|
const (
|
|
messageTypeSetChunkSize = 1
|
|
messageTypeAbortMessage = 2
|
|
messageTypeAcknowledgment = 3
|
|
messageTypeUserControlMessage = 4
|
|
messageTypeWindowAcknowledgementSize = 5
|
|
messageTypeSetPeerBandwidth = 6
|
|
messageTypeVirtualControl = 7 // TODO: not in spec but in wikipedia article
|
|
messageTypeAudioMessage = 8
|
|
messageTypeVideoMessage = 9
|
|
messageTypeDataMessageExtended = 15
|
|
messageTypeSharedObjectMessageExtended = 16
|
|
messageTypeCommandMessageExtended = 17
|
|
messageTypeDataMessage = 18
|
|
messageTypeSharedObjectMessage = 19
|
|
messageTypeCommandMessage = 20
|
|
messageTypeUDP = 21 // TODO: not in spec but in wikipedia article
|
|
messageTypeAggregateMessage = 22
|
|
messageTypePresent = 23 // TODO: not in spec but in wikipedia article
|
|
)
|
|
|
|
var rtmpMessageTypeIDNames = scalar.UToSymStr{
|
|
messageTypeSetChunkSize: "set_chunk_size",
|
|
messageTypeAbortMessage: "abort_message",
|
|
messageTypeAcknowledgment: "acknowledgment",
|
|
messageTypeUserControlMessage: "user_control_message",
|
|
messageTypeWindowAcknowledgementSize: "window_acknowledgement_size",
|
|
messageTypeSetPeerBandwidth: "set_peer_bandwidth",
|
|
messageTypeVirtualControl: "virtual_control",
|
|
messageTypeAudioMessage: "audio_message",
|
|
messageTypeVideoMessage: "video_message",
|
|
messageTypeDataMessageExtended: "data_message_extended",
|
|
messageTypeSharedObjectMessageExtended: "shared_object_message_extended",
|
|
messageTypeCommandMessageExtended: "command_message_extended",
|
|
messageTypeDataMessage: "data_message",
|
|
messageTypeSharedObjectMessage: "shared_object_message",
|
|
messageTypeCommandMessage: "command_message",
|
|
messageTypeUDP: "udp",
|
|
messageTypeAggregateMessage: "aggregate_message",
|
|
messageTypePresent: "present",
|
|
}
|
|
|
|
const (
|
|
userControlEvenTypeStreamBegin = 0
|
|
userControlEvenTypeStreamEOF = 1
|
|
userControlEvenTypeStreamDry = 2
|
|
userControlEvenTypeSetBufferLength = 3
|
|
userControlEvenTypeStreamIsRecorded = 4
|
|
userControlEvenTypePingRequest = 6
|
|
userControlEvenTypePingResponse = 7
|
|
)
|
|
|
|
var userControlEvenTypNames = scalar.UToSymStr{
|
|
userControlEvenTypeStreamBegin: "stream_begin",
|
|
userControlEvenTypeStreamEOF: "stream_eof",
|
|
userControlEvenTypeStreamDry: "stream_dry",
|
|
userControlEvenTypeSetBufferLength: "set_buffer_length",
|
|
userControlEvenTypeStreamIsRecorded: "stream_is_recorded",
|
|
userControlEvenTypePingRequest: "ping_request",
|
|
userControlEvenTypePingResponse: "ping_response",
|
|
}
|
|
|
|
var setPeerBandwidthLimitTypeName = scalar.UToSymStr{
|
|
0: "hard",
|
|
1: "soft",
|
|
2: "dynamic",
|
|
}
|
|
|
|
const timestampExtended = 0xff_ff_ff
|
|
|
|
var timestampDescription = scalar.UToDescription{
|
|
timestampExtended: "extended",
|
|
}
|
|
|
|
const (
|
|
audioMessageCodecAAC = 10
|
|
)
|
|
|
|
// based on https://github.com/wireshark/wireshark/blob/master/epan/dissectors/packet-rtmpt.c
|
|
// which in turn is based on rtmp and swf specifications and FLV v10.1 section E.4.3.1
|
|
var audioMessageCodecNames = scalar.UToSymStr{
|
|
0: "uncompressed",
|
|
1: "adpcm",
|
|
2: "mp3",
|
|
3: "uncompressed_le",
|
|
4: "nellymoser_16khz",
|
|
5: "nellymoser_8khz",
|
|
6: "nellymoser",
|
|
7: "g711a",
|
|
8: "g711u",
|
|
9: "nellymoser_16khz",
|
|
audioMessageCodecAAC: "aac",
|
|
11: "speex",
|
|
}
|
|
|
|
const (
|
|
audioMessageAACPacketTypeASC = 0
|
|
audioMessageAACPacketTypeRaw = 1
|
|
)
|
|
|
|
var audioMessageAACPacketTypeNames = scalar.UToSymStr{
|
|
audioMessageAACPacketTypeASC: "asc",
|
|
audioMessageAACPacketTypeRaw: "raw",
|
|
}
|
|
|
|
var audioMessageRateNames = scalar.UToSymU{
|
|
0: 5500,
|
|
1: 11025,
|
|
2: 22050,
|
|
3: 44100,
|
|
}
|
|
|
|
var audioMessageSampleSize = scalar.UToSymU{
|
|
0: 8,
|
|
1: 16,
|
|
}
|
|
|
|
var audioMessageChannels = scalar.UToSymU{
|
|
0: 1,
|
|
1: 2,
|
|
}
|
|
|
|
var videoMessageTypeNames = scalar.UToSymStr{
|
|
1: "keyframe",
|
|
2: "inter_frame",
|
|
3: "disposable_inter_frame",
|
|
4: "generated_key_frame",
|
|
5: "video_info_or_command_frame",
|
|
}
|
|
|
|
const (
|
|
videoMessageCodecH264 = 7
|
|
)
|
|
|
|
var videoMessageCodecNames = scalar.UToSymStr{
|
|
2: "h263",
|
|
3: "screen_video",
|
|
4: "vp6",
|
|
5: "vp6_alpha",
|
|
6: "screen_video_v2",
|
|
videoMessageCodecH264: "h264",
|
|
}
|
|
|
|
var videoMessageH264PacketTypeNames = scalar.UToSymStr{
|
|
0: "dcr",
|
|
1: "au", // TODO: is access unit?
|
|
2: "empty",
|
|
}
|
|
|
|
// TODO: invalid warning that timestampDelta is unused
|
|
//nolint: unused
|
|
type messageHeader struct {
|
|
timestamp uint64
|
|
timestampDelta uint64
|
|
messageStreamID uint64
|
|
messageLength uint64
|
|
messageTypeID uint64
|
|
}
|
|
|
|
func rtmpDecodeMessageType(d *decode.D, typ int, chunkSize *int) {
|
|
switch typ {
|
|
case messageTypeSetChunkSize:
|
|
// TODO: zero bit, verify size? message size is 24 bit
|
|
*chunkSize = int(d.FieldU32("size"))
|
|
case messageTypeAbortMessage:
|
|
d.FieldU32("chunk_stream_id")
|
|
case messageTypeAcknowledgment:
|
|
d.FieldU32("sequence_number")
|
|
case messageTypeUserControlMessage:
|
|
typ := d.FieldU16("type", userControlEvenTypNames)
|
|
switch typ {
|
|
case userControlEvenTypeStreamBegin:
|
|
d.FieldU32("stream_id")
|
|
case userControlEvenTypeStreamEOF:
|
|
d.FieldU32("stream_id")
|
|
case userControlEvenTypeStreamDry:
|
|
d.FieldU32("stream_id")
|
|
case userControlEvenTypeSetBufferLength:
|
|
d.FieldU32("stream_id")
|
|
d.FieldU32("length")
|
|
case userControlEvenTypeStreamIsRecorded:
|
|
d.FieldU32("stream_id")
|
|
case userControlEvenTypePingRequest:
|
|
d.FieldU32("timestamp")
|
|
case userControlEvenTypePingResponse:
|
|
d.FieldU32("timestamp")
|
|
default:
|
|
d.FieldRawLen("data", d.BitsLeft())
|
|
}
|
|
case messageTypeWindowAcknowledgementSize:
|
|
d.FieldU32("window_size")
|
|
case messageTypeSetPeerBandwidth:
|
|
d.FieldU32("chunk_size")
|
|
d.FieldU8("limit_type", setPeerBandwidthLimitTypeName)
|
|
case messageTypeDataMessage:
|
|
d.FieldArray("messages", func(d *decode.D) {
|
|
for !d.End() {
|
|
d.FieldFormat("message", rtmpAmf0Group, nil)
|
|
}
|
|
})
|
|
case messageTypeCommandMessage:
|
|
d.FieldFormat("command_name", rtmpAmf0Group, nil)
|
|
d.FieldFormat("transaction_id", rtmpAmf0Group, nil)
|
|
d.FieldFormat("command_object", rtmpAmf0Group, nil)
|
|
d.FieldArray("arguments", func(d *decode.D) {
|
|
for !d.End() {
|
|
d.FieldFormat("argument", rtmpAmf0Group, nil)
|
|
}
|
|
})
|
|
case messageTypeAggregateMessage:
|
|
d.FieldArray("messages", func(d *decode.D) {
|
|
for !d.End() {
|
|
d.FieldStruct("message", func(d *decode.D) {
|
|
var h messageHeader
|
|
h.messageTypeID = d.FieldU8("message_type_id", rtmpMessageTypeIDNames)
|
|
h.messageLength = d.FieldU24("message_length")
|
|
h.timestamp = d.FieldU32("timestamp", timestampDescription)
|
|
h.messageStreamID = d.FieldU24("message_stream_id")
|
|
// TODO: possible to set chunk size in aggregated message?
|
|
d.FramedFn(int64(h.messageLength*8), func(d *decode.D) {
|
|
rtmpDecodeMessageType(d, int(h.messageTypeID), chunkSize)
|
|
})
|
|
d.FieldU32("back_pointer")
|
|
})
|
|
}
|
|
})
|
|
case messageTypeAudioMessage:
|
|
if d.BitsLeft() == 0 {
|
|
return
|
|
}
|
|
codec := d.FieldU4("codec", audioMessageCodecNames)
|
|
d.FieldU2("sample_rate", audioMessageRateNames)
|
|
d.FieldU1("sample_size", audioMessageSampleSize)
|
|
d.FieldU1("channels", audioMessageChannels)
|
|
if codec == audioMessageCodecAAC {
|
|
switch d.FieldU8("type", audioMessageAACPacketTypeNames) {
|
|
case audioMessageAACPacketTypeASC:
|
|
d.FieldFormat("data", rtmpMpegASCFormat, nil)
|
|
default:
|
|
d.FieldRawLen("data", d.BitsLeft())
|
|
}
|
|
} else {
|
|
d.FieldRawLen("data", d.BitsLeft())
|
|
}
|
|
case messageTypeVideoMessage:
|
|
if d.BitsLeft() == 0 {
|
|
return
|
|
}
|
|
d.FieldU4("type", videoMessageTypeNames)
|
|
codec := d.FieldU4("codec", videoMessageCodecNames)
|
|
// TODO: flv header + h263 format?
|
|
// TODO: ffmpeg rtmp proto seems to recrate a flv stream and demux it
|
|
if codec == videoMessageCodecH264 {
|
|
d.FieldU8("type", videoMessageH264PacketTypeNames)
|
|
}
|
|
|
|
d.FieldRawLen("data", d.BitsLeft())
|
|
default:
|
|
d.FieldRawLen("data", d.BitsLeft())
|
|
}
|
|
}
|
|
|
|
func rtmpDecode(d *decode.D, in any) any {
|
|
var isClient bool
|
|
if tsi, ok := in.(format.TCPStreamIn); ok {
|
|
tsi.MustIsPort(d.Fatalf, format.TCPPortRTMP)
|
|
isClient = tsi.IsClient
|
|
}
|
|
|
|
// chunk size is global for one direction
|
|
chunkSize := defaultChunkSize
|
|
|
|
name := "s"
|
|
if isClient {
|
|
name = "c"
|
|
}
|
|
// TODO: 1536 byte blobs instead?
|
|
d.FieldStruct("handshake", func(d *decode.D) {
|
|
d.FieldStruct(name+"0", func(d *decode.D) {
|
|
d.FieldU8("version")
|
|
})
|
|
d.FieldStruct(name+"1", func(d *decode.D) {
|
|
d.FieldU32("time")
|
|
d.FieldU32("zero") // TODO: does not seems to be zero sometimes?
|
|
d.FieldRawLen("random", 1528*8)
|
|
})
|
|
d.FieldStruct(name+"2", func(d *decode.D) {
|
|
d.FieldU32("time")
|
|
d.FieldU32("time2")
|
|
d.FieldRawLen("random", 1528*8)
|
|
})
|
|
})
|
|
|
|
type messageHeader struct {
|
|
timestamp uint64
|
|
timestampDelta uint64
|
|
messageStreamID uint64
|
|
messageLength uint64
|
|
messageTypeID uint64
|
|
}
|
|
|
|
type message struct {
|
|
l uint64
|
|
b bytes.Buffer
|
|
typ uint64
|
|
}
|
|
type chunkStream struct {
|
|
messageSteams map[uint64]*message
|
|
prevHeader messageHeader
|
|
}
|
|
|
|
chunkStreams := map[uint64]*chunkStream{}
|
|
|
|
messages := d.FieldArrayValue("messages")
|
|
|
|
d.FieldArray("chunks", func(d *decode.D) {
|
|
for !d.End() {
|
|
d.FieldStruct("chunk", func(d *decode.D) {
|
|
var chunkSteamID uint64
|
|
|
|
fmt := d.FieldU2("fmt")
|
|
switch d.PeekBits(6) {
|
|
case 0:
|
|
// 64-319: 2 byte
|
|
d.FieldU6("chunk_stream_id_prefix")
|
|
chunkSteamID = d.FieldU8("chunk_stream_id", scalar.ActualUAdd(64))
|
|
case 1:
|
|
// 64-65599: 3 byte
|
|
d.FieldU6("chunk_stream_id_prefix")
|
|
chunkSteamID = d.FieldU16("chunk_stream_id", scalar.ActualUAdd(64))
|
|
default:
|
|
// 2-63: 1 byte
|
|
chunkSteamID = d.FieldU6("chunk_stream_id")
|
|
}
|
|
|
|
cs, ok := chunkStreams[chunkSteamID]
|
|
if !ok {
|
|
cs = &chunkStream{
|
|
messageSteams: map[uint64]*message{},
|
|
}
|
|
chunkStreams[chunkSteamID] = cs
|
|
}
|
|
|
|
var h messageHeader
|
|
|
|
switch fmt {
|
|
case 0:
|
|
h.timestamp = d.FieldU24("timestamp", timestampDescription)
|
|
h.messageLength = d.FieldU24("message_length")
|
|
h.messageTypeID = d.FieldU8("message_type_id", rtmpMessageTypeIDNames)
|
|
h.messageStreamID = d.FieldU32LE("message_stream_id")
|
|
if h.timestamp == timestampExtended {
|
|
h.timestamp = d.FieldU32("extended_timestamp")
|
|
}
|
|
case 1:
|
|
h.timestampDelta = d.FieldU24("timestamp_delta", timestampDescription)
|
|
h.messageLength = d.FieldU24("message_length")
|
|
h.messageTypeID = d.FieldU8("message_type_id", rtmpMessageTypeIDNames)
|
|
if h.timestamp == timestampExtended {
|
|
h.timestampDelta = d.FieldU32("extended_timestamp")
|
|
}
|
|
h.timestamp = cs.prevHeader.timestamp
|
|
h.messageStreamID = cs.prevHeader.messageStreamID
|
|
d.FieldValueU("message_stream_id", h.messageStreamID, scalar.Description("previous"))
|
|
|
|
case 2:
|
|
h.timestampDelta = d.FieldU24("timestamp_delta", timestampDescription)
|
|
if h.timestamp == timestampExtended {
|
|
h.timestampDelta = d.FieldU32("extended_timestamp")
|
|
}
|
|
h.timestamp = cs.prevHeader.timestamp
|
|
h.messageLength = cs.prevHeader.messageLength
|
|
h.messageStreamID = cs.prevHeader.messageStreamID
|
|
h.messageTypeID = cs.prevHeader.messageTypeID
|
|
d.FieldValueU("message_length", h.messageLength, scalar.Description("previous"))
|
|
d.FieldValueU("message_type_id", h.messageTypeID, scalar.Description("previous"))
|
|
d.FieldValueU("message_stream_id", h.messageStreamID, scalar.Description("previous"))
|
|
case 3:
|
|
h.timestamp = cs.prevHeader.timestamp
|
|
h.timestampDelta = cs.prevHeader.timestampDelta
|
|
h.messageLength = cs.prevHeader.messageLength
|
|
h.messageStreamID = cs.prevHeader.messageStreamID
|
|
h.messageTypeID = cs.prevHeader.messageTypeID
|
|
d.FieldValueU("message_length", h.messageLength, scalar.Description("previous"))
|
|
d.FieldValueU("message_type_id", h.messageTypeID, scalar.Description("previous"))
|
|
d.FieldValueU("message_stream_id", h.messageStreamID, scalar.Description("previous"))
|
|
}
|
|
|
|
h.timestamp += h.timestampDelta
|
|
|
|
d.FieldValueU("calculated_timestamp", h.timestamp)
|
|
|
|
m, ok := cs.messageSteams[h.messageStreamID]
|
|
if !ok {
|
|
m = &message{
|
|
l: h.messageLength,
|
|
typ: h.messageTypeID,
|
|
}
|
|
cs.messageSteams[h.messageStreamID] = m
|
|
}
|
|
|
|
payloadLength := int64(chunkSize)
|
|
messageLeft := int64(m.l) - int64(m.b.Len())
|
|
if messageLeft < payloadLength {
|
|
payloadLength = messageLeft
|
|
}
|
|
// support decoding interrupted rtmp stream
|
|
// TODO: throw away message buffer? currently only do tcp so no point?
|
|
payloadLength *= 8
|
|
if payloadLength > d.BitsLeft() {
|
|
payloadLength = d.BitsLeft()
|
|
}
|
|
|
|
if payloadLength > 0 {
|
|
d.MustCopyBits(&m.b, d.FieldRawLen("data", payloadLength))
|
|
}
|
|
|
|
if m.l == uint64(m.b.Len()) {
|
|
messageBR := bitio.NewBitReader(m.b.Bytes(), -1)
|
|
messages.FieldStructRootBitBufFn("message", messageBR, func(d *decode.D) {
|
|
d.FieldValueU("message_stream_id", h.messageStreamID)
|
|
d.FieldValueU("message_type_id", m.typ, rtmpMessageTypeIDNames)
|
|
rtmpDecodeMessageType(d, int(m.typ), &chunkSize)
|
|
})
|
|
|
|
// delete so that we create a new message{} with a new bytes.Buffer to
|
|
// not share byte slice
|
|
delete(cs.messageSteams, h.messageStreamID)
|
|
}
|
|
|
|
cs.prevHeader = h
|
|
})
|
|
}
|
|
})
|
|
|
|
return nil
|
|
}
|