1
1
mirror of https://github.com/wader/fq.git synced 2024-12-24 13:52:02 +03:00
fq/format/inet/flowsdecoder/flowsdecoder.go

250 lines
6.0 KiB
Go
Raw Normal View History

package flowsdecoder
// TODO: option to not allow missing syn/ack?
import (
"bytes"
"encoding/binary"
"net"
"github.com/gopacket/gopacket"
"github.com/gopacket/gopacket/ip4defrag"
"github.com/gopacket/gopacket/layers"
"github.com/gopacket/gopacket/reassembly"
)
type TCPEndpoint struct {
IP net.IP
Port int
}
type TCPDirection struct {
Endpoint TCPEndpoint
HasStart bool
HasEnd bool
Buffer *bytes.Buffer
SkippedBytes uint64
}
type TCPConnection struct {
Client TCPDirection
Server TCPDirection
tcpState *reassembly.TCPSimpleFSM
optChecker *reassembly.TCPOptionCheck
net gopacket.Flow
transport gopacket.Flow
}
func (t *TCPConnection) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
// has ok state?
if !t.tcpState.CheckState(tcp, dir) {
// TODO: handle err?
return false
}
if t.optChecker != nil {
// has ok options?
if err := t.optChecker.Accept(tcp, ci, dir, nextSeq, start); err != nil {
// TODO: handle err?
return false
}
}
// TODO: checksum?
// accept
return true
}
func (t *TCPConnection) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.AssemblerContext) {
dir, start, end, skip := sg.Info()
length, _ := sg.Lengths()
var d *TCPDirection
switch dir {
case reassembly.TCPDirClientToServer:
d = &t.Client
case reassembly.TCPDirServerToClient:
d = &t.Server
default:
panic("unreachable")
}
if skip == -1 {
// can't find where skip == -1 is documented but this is what gopacket reassemblydump does
// to allow missing syn/ack
} else if skip != 0 {
// stream has missing bytes
d.SkippedBytes += uint64(skip)
return
}
d.HasStart = d.HasStart || start
d.HasEnd = d.HasEnd || end
data := sg.Fetch(length)
d.Buffer.Write(data)
}
func (t *TCPConnection) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
// do not remove the connection to allow last ACK
return false
}
type IPV4Reassembled struct {
SourceIP net.IP
DestinationIP net.IP
Datagram []byte
}
func (fd *Decoder) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
fsmOptions := reassembly.TCPSimpleFSMOptions{
SupportMissingEstablishment: true,
}
// TODO: get ip layer somehow?
// TODO: understand how gopacket handles broken/too short packets, seems like
// we can get here when lots of things are missing, assume zero port for now
var clientPort int
if len(transport.Src().Raw()) == 2 {
clientPort = int(binary.BigEndian.Uint16(transport.Src().Raw()))
}
var serverPort int
if len(transport.Dst().Raw()) == 2 {
serverPort = int(binary.BigEndian.Uint16(transport.Dst().Raw()))
}
stream := &TCPConnection{
Client: TCPDirection{
Endpoint: TCPEndpoint{
IP: append([]byte(nil), net.Src().Raw()...),
Port: clientPort,
},
Buffer: &bytes.Buffer{},
},
Server: TCPDirection{
Endpoint: TCPEndpoint{
IP: append([]byte(nil), net.Dst().Raw()...),
Port: serverPort,
},
Buffer: &bytes.Buffer{},
},
net: net,
transport: transport,
tcpState: reassembly.NewTCPSimpleFSM(fsmOptions),
}
if fd.Options.CheckTCPOptions {
c := reassembly.NewTCPOptionCheck()
stream.optChecker = &c
}
fd.TCPConnections = append(fd.TCPConnections, stream)
return stream
}
type Decoder struct {
Options DecoderOptions
TCPConnections []*TCPConnection
IPV4Reassembled []IPV4Reassembled
ipv4Defrag *ip4defrag.IPv4Defragmenter
tcpAssembler *reassembly.Assembler
}
type DecoderOptions struct {
CheckTCPOptions bool
}
func New(options DecoderOptions) *Decoder {
flowDecoder := &Decoder{
Options: options,
}
streamPool := reassembly.NewStreamPool(flowDecoder)
tcpAssembler := reassembly.NewAssembler(streamPool)
flowDecoder.tcpAssembler = tcpAssembler
flowDecoder.ipv4Defrag = ip4defrag.NewIPv4Defragmenter()
return flowDecoder
}
func (fd *Decoder) EthernetFrame(bs []byte) error {
return fd.packet(gopacket.NewPacket(bs, layers.LayerTypeEthernet, gopacket.Lazy))
}
func (fd *Decoder) IPv4Packet(bs []byte) error {
return fd.packet(gopacket.NewPacket(bs, layers.LayerTypeIPv4, gopacket.Lazy))
}
func (fd *Decoder) IPv6Packet(bs []byte) error {
return fd.packet(gopacket.NewPacket(bs, layers.LayerTypeIPv6, gopacket.Lazy))
}
func (fd *Decoder) SLLPacket(bs []byte) error {
return fd.packet(gopacket.NewPacket(bs, layers.LayerTypeLinuxSLL, gopacket.Lazy))
}
func (fd *Decoder) SLL2Packet(bs []byte) error {
return fd.packet(gopacket.NewPacket(bs, layers.LayerTypeLinuxSLL2, gopacket.Lazy))
}
func (fd *Decoder) LoopbackFrame(bs []byte) error {
return fd.packet(gopacket.NewPacket(bs, layers.LayerTypeLoopback, gopacket.Lazy))
}
func (fd *Decoder) packet(p gopacket.Packet) error {
// TODO: linkType
ip4Layer := p.Layer(layers.LayerTypeIPv4)
if ip4Layer != nil {
ip4, _ := ip4Layer.(*layers.IPv4)
l := ip4.Length
newIPv4, err := fd.ipv4Defrag.DefragIPv4(ip4)
if err != nil {
return err
} else if newIPv4 != nil {
// TODO: correct way to detect finished reassemble?
if newIPv4.Length != l {
// TODO: better way to reconstruct package?
sb := gopacket.NewSerializeBuffer()
b, _ := sb.PrependBytes(len(newIPv4.Payload))
copy(b, newIPv4.Payload)
if err := newIPv4.SerializeTo(sb, gopacket.SerializeOptions{
FixLengths: true,
ComputeChecksums: true,
}); err != nil {
return err
}
fd.IPV4Reassembled = append(fd.IPV4Reassembled, IPV4Reassembled{
SourceIP: ip4.SrcIP,
DestinationIP: ip4.DstIP,
Datagram: sb.Bytes(),
})
pb, ok := p.(gopacket.PacketBuilder)
if !ok {
panic("not a PacketBuilder")
}
nextDecoder := newIPv4.NextLayerType()
if err := nextDecoder.Decode(newIPv4.Payload, pb); err != nil {
return err
}
}
}
}
tcp := p.Layer(layers.LayerTypeTCP)
if tcp != nil {
tcp, _ := tcp.(*layers.TCP)
fd.tcpAssembler.Assemble(p.NetworkLayer().NetworkFlow(), tcp)
}
return nil
}
func (fd *Decoder) Flush() {
fd.tcpAssembler.FlushAll()
}