mirror of
https://github.com/wader/fq.git
synced 2024-11-27 14:14:58 +03:00
266 lines
6.4 KiB
Go
266 lines
6.4 KiB
Go
package flowsdecoder
|
|
|
|
// TODO: option to not allow missing syn/ack?
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"net"
|
|
|
|
"github.com/gopacket/gopacket"
|
|
"github.com/gopacket/gopacket/ip4defrag"
|
|
"github.com/gopacket/gopacket/layers"
|
|
"github.com/gopacket/gopacket/reassembly"
|
|
)
|
|
|
|
type TCPEndpoint struct {
|
|
IP net.IP
|
|
Port int
|
|
}
|
|
|
|
type TCPDirection struct {
|
|
Endpoint TCPEndpoint
|
|
HasStart bool
|
|
HasEnd bool
|
|
Buffer *bytes.Buffer
|
|
SkippedBytes uint64
|
|
}
|
|
|
|
type TCPConnection struct {
|
|
Client *TCPDirection
|
|
Server *TCPDirection
|
|
tcpState *reassembly.TCPSimpleFSM
|
|
optChecker *reassembly.TCPOptionCheck
|
|
net gopacket.Flow
|
|
transport gopacket.Flow
|
|
}
|
|
|
|
func (t *TCPConnection) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
|
|
// has ok state?
|
|
if !t.tcpState.CheckState(tcp, dir) {
|
|
// TODO: handle err?
|
|
return false
|
|
}
|
|
if t.optChecker != nil {
|
|
// has ok options?
|
|
if err := t.optChecker.Accept(tcp, ci, dir, nextSeq, start); err != nil {
|
|
// TODO: handle err?
|
|
return false
|
|
}
|
|
}
|
|
// TODO: checksum?
|
|
|
|
// accept
|
|
return true
|
|
}
|
|
|
|
func (t *TCPConnection) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.AssemblerContext) {
|
|
dir, start, end, skip := sg.Info()
|
|
length, _ := sg.Lengths()
|
|
|
|
var d *TCPDirection
|
|
switch dir {
|
|
case reassembly.TCPDirClientToServer:
|
|
d = t.Client
|
|
case reassembly.TCPDirServerToClient:
|
|
d = t.Server
|
|
default:
|
|
panic("unreachable")
|
|
}
|
|
|
|
if skip == -1 {
|
|
// can't find where skip == -1 is documented but this is what gopacket reassemblydump does
|
|
// to allow missing syn/ack
|
|
} else if skip != 0 {
|
|
// stream has missing bytes
|
|
d.SkippedBytes += uint64(skip)
|
|
return
|
|
}
|
|
|
|
d.HasStart = d.HasStart || start
|
|
d.HasEnd = d.HasEnd || end
|
|
|
|
data := sg.Fetch(length)
|
|
|
|
d.Buffer.Write(data)
|
|
}
|
|
|
|
func (t *TCPConnection) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
|
|
// do not remove the connection to allow last ACK
|
|
return false
|
|
}
|
|
|
|
type IPV4Reassembled struct {
|
|
SourceIP net.IP
|
|
DestinationIP net.IP
|
|
Datagram []byte
|
|
}
|
|
|
|
func (fd *Decoder) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
|
|
fsmOptions := reassembly.TCPSimpleFSMOptions{
|
|
SupportMissingEstablishment: true,
|
|
}
|
|
|
|
// TODO: get ip layer somehow?
|
|
// TODO: understand how gopacket handles broken/too short packets, seems like
|
|
// we can get here when lots of things are missing, assume zero port for now
|
|
var clientPort int
|
|
if len(transport.Src().Raw()) == 2 {
|
|
clientPort = int(binary.BigEndian.Uint16(transport.Src().Raw()))
|
|
}
|
|
var serverPort int
|
|
if len(transport.Dst().Raw()) == 2 {
|
|
serverPort = int(binary.BigEndian.Uint16(transport.Dst().Raw()))
|
|
}
|
|
|
|
stream := &TCPConnection{
|
|
Client: &TCPDirection{
|
|
Endpoint: TCPEndpoint{
|
|
IP: append([]byte(nil), net.Src().Raw()...),
|
|
Port: clientPort,
|
|
},
|
|
Buffer: &bytes.Buffer{},
|
|
},
|
|
Server: &TCPDirection{
|
|
Endpoint: TCPEndpoint{
|
|
IP: append([]byte(nil), net.Dst().Raw()...),
|
|
Port: serverPort,
|
|
},
|
|
Buffer: &bytes.Buffer{},
|
|
},
|
|
|
|
net: net,
|
|
transport: transport,
|
|
tcpState: reassembly.NewTCPSimpleFSM(fsmOptions),
|
|
}
|
|
|
|
if fd.Options.CheckTCPOptions {
|
|
c := reassembly.NewTCPOptionCheck()
|
|
stream.optChecker = &c
|
|
}
|
|
|
|
fd.TCPConnections = append(fd.TCPConnections, stream)
|
|
|
|
return stream
|
|
}
|
|
|
|
type Decoder struct {
|
|
Options DecoderOptions
|
|
|
|
TCPConnections []*TCPConnection
|
|
IPV4Reassembled []IPV4Reassembled
|
|
|
|
ipv4Defrag *ip4defrag.IPv4Defragmenter
|
|
tcpAssembler *reassembly.Assembler
|
|
}
|
|
|
|
type DecoderOptions struct {
|
|
CheckTCPOptions bool
|
|
}
|
|
|
|
func New(options DecoderOptions) *Decoder {
|
|
flowDecoder := &Decoder{
|
|
Options: options,
|
|
}
|
|
streamPool := reassembly.NewStreamPool(flowDecoder)
|
|
tcpAssembler := reassembly.NewAssembler(streamPool)
|
|
flowDecoder.tcpAssembler = tcpAssembler
|
|
flowDecoder.ipv4Defrag = ip4defrag.NewIPv4Defragmenter()
|
|
|
|
return flowDecoder
|
|
}
|
|
|
|
func (fd *Decoder) EthernetFrame(bs []byte) error {
|
|
return fd.packet(gopacket.NewPacket(bs, layers.LayerTypeEthernet, gopacket.Lazy))
|
|
}
|
|
|
|
func (fd *Decoder) IPv4Packet(bs []byte) error {
|
|
return fd.packet(gopacket.NewPacket(bs, layers.LayerTypeIPv4, gopacket.Lazy))
|
|
}
|
|
|
|
func (fd *Decoder) IPv6Packet(bs []byte) error {
|
|
return fd.packet(gopacket.NewPacket(bs, layers.LayerTypeIPv6, gopacket.Lazy))
|
|
}
|
|
|
|
func (fd *Decoder) SLLPacket(bs []byte) error {
|
|
return fd.packet(gopacket.NewPacket(bs, layers.LayerTypeLinuxSLL, gopacket.Lazy))
|
|
}
|
|
|
|
func (fd *Decoder) SLL2Packet(bs []byte) error {
|
|
return fd.packet(gopacket.NewPacket(bs, layers.LayerTypeLinuxSLL2, gopacket.Lazy))
|
|
}
|
|
|
|
func (fd *Decoder) LoopbackFrame(bs []byte) error {
|
|
return fd.packet(gopacket.NewPacket(bs, layers.LayerTypeLoopback, gopacket.Lazy))
|
|
}
|
|
|
|
// LinkTypeRAW IPv4 or Ipv6
|
|
func (fd *Decoder) RAWIPFrame(bs []byte) error {
|
|
version := bs[0] >> 4
|
|
switch version {
|
|
case 4:
|
|
return fd.IPv4Packet(bs)
|
|
case 6:
|
|
return fd.IPv6Packet(bs)
|
|
}
|
|
return fmt.Errorf("invalid ip version %v", version)
|
|
}
|
|
|
|
func (fd *Decoder) packet(p gopacket.Packet) error {
|
|
// TODO: linkType
|
|
ip4Layer := p.Layer(layers.LayerTypeIPv4)
|
|
if ip4Layer != nil {
|
|
ip4, _ := ip4Layer.(*layers.IPv4)
|
|
l := ip4.Length
|
|
newIPv4, err := fd.ipv4Defrag.DefragIPv4(ip4)
|
|
if err != nil {
|
|
return err
|
|
} else if newIPv4 != nil {
|
|
// TODO: correct way to detect finished reassemble?
|
|
if newIPv4.Length != l {
|
|
// TODO: better way to reconstruct package?
|
|
sb := gopacket.NewSerializeBuffer()
|
|
b, _ := sb.PrependBytes(len(newIPv4.Payload))
|
|
copy(b, newIPv4.Payload)
|
|
if err := newIPv4.SerializeTo(sb, gopacket.SerializeOptions{
|
|
FixLengths: true,
|
|
ComputeChecksums: true,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
fd.IPV4Reassembled = append(fd.IPV4Reassembled, IPV4Reassembled{
|
|
SourceIP: ip4.SrcIP,
|
|
DestinationIP: ip4.DstIP,
|
|
Datagram: sb.Bytes(),
|
|
})
|
|
|
|
// i think this replaces p with the newly defragmented ip packet and is
|
|
// used below when reassembling tcp streams
|
|
// see gopacket reassemblydump example
|
|
pb, ok := p.(gopacket.PacketBuilder)
|
|
if !ok {
|
|
panic("not a PacketBuilder")
|
|
}
|
|
nextDecoder := newIPv4.NextLayerType()
|
|
if err := nextDecoder.Decode(newIPv4.Payload, pb); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
tcp := p.Layer(layers.LayerTypeTCP)
|
|
if tcp != nil {
|
|
tcp, _ := tcp.(*layers.TCP)
|
|
fd.tcpAssembler.Assemble(p.NetworkLayer().NetworkFlow(), tcp)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (fd *Decoder) Flush() {
|
|
fd.tcpAssembler.FlushAll()
|
|
}
|