open-source-search-engine/UdpServer.h
2021-05-06 01:52:55 +10:00

479 lines
18 KiB
C++

// Copyright Matt Wells Nov 2000
// TODO: use temporary bufs for each UdpSlot to avoid mallocs for reads
// TODO: eliminate m_gotMsgsToSend, m_gotAcksToSend in lieau of Loop.h methods
// . A reliable udp/datagram server
// . non-blocking, no threads
// . works on I/O interrupts by registering callbacks with the Loop class
// . great for broadcasting to many non-local IPs
// . great for communicating with thousands of random machines to avoid
// the connect/close overhead associated with TCP
// . the UdpSlot holds the details for one transaction
// . the UdpSlot is like a socket
// . we use transactionIds to link incoming replies w/ the initiating requests
// . the key of the UdpSlot is based on transactionId and ip/port of dest. host
// . when sending a request you supply a callback to be called on completion
// of that transaction
// . UdpSlot's m_errno will be set if an error or timeout occurred
// . UdpSlot's m_readBuf/m_readBufSize will hold the reply
// . you register your request handlers w/ UdpServer::registerHandler()
// . msgs are routed to handling routines based on msgType in the dgram
// . you can change the protocol by changing UdpProtocol
// . UdpProtocol just contains many virtual datagram parsing functions
// . UdpProtocol's default protocol is the Mattster Protocol(see UdpProtocol.h)
// . Dns.h overrides UdpProtocol to form DnsProtocol
// . we can send up to ACK_WINDOW_SIZE dgrams before requiring the reception
// of the first dgram we sent's ACK
// . this ACK window helps highPing/highBandwidth connections (distant hosts)
// . readPoll(), sendAckPoll(), readTimeoutPoll() can call callbacks/handlers
#ifndef _UDPSERVER_H_
#define _UDPSERVER_H_
#include <sys/time.h> // select()
#include <sys/types.h> // select()
#include <netinet/in.h> // ntohl() ntohs()
#include "Mem.h" // ntohll() getNumBitsOn()
#include "File.h" // for setting our socket non-blocking, etc.
#include "UdpSlot.h"
#include "UdpProtocol.h"
#include "Hostdb.h"
#include "Loop.h" // loop class that handles signals on our socket
//#ifdef _SMALLDGRAMS_
//#define MAX_UDP_SLOTS 1000
//#else
//#define MAX_UDP_SLOTS 5000
//#endif
// . The rules of Async Sig Safe functions
// 1. to be safe, _ass functions should only call other _ass functions.
// otherwise, that function may be re-entered by the async sig handler.
// 2. most _ass functions turn off interrupts or return if g_inSigHandler.
// otherwise, they will need to be re_entrant... i.e. while entered from the
// main process they might be interrupted and entered from the sig handler.
// 3. only _ass functions should be called from an ASYNC signal handler.
class UdpServer {
public:
friend class Dns;
UdpServer() ;
~UdpServer() ;
// free send/readBufs for all slots
void reset();
// . returns false and sets errno on error
// . we use this udp "port" for all this server's reads/writes
// . we need to save and read out last transaction Id in a file to
// maintain the msgdb properly for incremental syncing
// . niceness of 0 is highest priority, 1 is lowest
// . read/writeBufSize are the socket buf's size
// . pollTime is how often to call timePollWrapper() (in milliseconds)
// . it should be at least the minimal slot timeout
bool init ( uint16_t port ,
UdpProtocol *proto ,
int32_t niceness ,
int32_t readBufSize ,
int32_t writeBufSize ,
int32_t pollTime ,
int32_t maxSlots ,
bool isDns );
bool m_isDns;
// . sends a request
// . returns false and sets g_errno on error, true on success
// . callback will be called on reception of reply or on error
// . we will set errno before calling callback if an error occurred
// . errno may be ETIMEDOUT
// . we call destroySlot(slot) after calling the callback
// . NULLify slot->m_readBuf,slot->m_sendBuf if you don't want them
// to be freed by destroySlot(slot)
// . hostId is used to lookup Host's in g_hostdb to get/put resend time
// . if an ack isn't received after a certain time we resend the dgram
// . if endpoint calls sendErrorReply() to reply to you your callback
// will be called with errno set to what he passed to sendErrorReply
// . UdpSlot is like a udp socket
// . "msgType" may be stored in each dgram's header depending on
// if you're using UdpProtocol, DnsProtocol, ...
// . "msgType" is used to route the request to handling functions
// on the remote machine
// . backoff is how long to wait for an ACK in ms before we resend
// . we double backoff each time we wait w/o getting any ACK
// . don't wait longer than maxWait for a resend
// . if we try to resend a request dgram MORE than "maxResends" times,
// we do not resend it and we returns with g_errno set to ENOACK,
// indicating we have not gotten ANY ack for a dgram. if a host dies
// we typically have 10-20 seconds or so before marking it as dead.
// BUT with this method we should realize it is fruiteless in like
// 500 ms or so and Multicast or Proxy.cpp can re-send to another
// host. for niceness=0 requests the backoff is usually constant
// and set to about 30 ms. so if you set maxResends to 10 that is
// probably at least 300 ms of resending tries.
// . use an ip of 0 and port of 0 if you provide a hostId. use a hostid
// of -1 to indicate no hostid.
bool sendRequest ( char *msg ,
int32_t msgSize ,
unsigned char msgType ,
uint32_t ip ,
uint16_t port ,
int32_t hostId ,
UdpSlot **retSlot , // can be NULL
void *state , // callback state
void (* callback ) (void *state, UdpSlot *slot) ,
int32_t timeout = 60 , // seconds
int16_t backoff = -1 ,
int16_t maxWait = -1 , // ms
char *replyBuf = NULL ,
int32_t replyBufMaxSize = 0 ,
int32_t niceness = 1 ,
int32_t maxResends = -1 );
// . send a reply to the host specified in "slot"
// . slot is destroyed on error or completion of the send
// . the "msg" will be freed unless slot->m_sendBufAlloc is set to NULL
// . backoff is how long to wait for an ACK in ms before we resend
// . we double backoff each time we wait w/o getting any ACK
// . don't wait longer than maxWait for a resend
void sendReply_ass (char *msg ,
int32_t msgSize ,
char *alloc ,
int32_t allocSize ,
UdpSlot *slot ,
int32_t timeout = 60 , // in seconds
void *state = NULL , // callback state
void (* callback2)(void *state,UdpSlot *slot)=NULL,
int16_t backoff = -1 ,
int16_t maxWait = -1 ,
bool isCallback2Hot = false ,
bool useSameSwitch = false );
// . propagate an errno to the requesting machine
// . his callback will be called with errno set to "errnum"
void sendErrorReply ( UdpSlot *slot ,
int32_t errnum ,
int32_t timeout = 60 /*seconds*/ );
// . too many transactions going on?
// . this is just an estimate...
//int32_t getNumAvailSlots () { return MAX_UDP_SLOTS - m_topUsedSlot - 1; };
// an estimation as well
//int32_t getNumUsedSlots () { return m_topUsedSlot + 1; };
int32_t getNumUsedSlots () { return m_numUsedSlots; };
int32_t getNumUsedSlotsIncoming () { return m_numUsedSlotsIncoming; };
// . when a request/msg of type "msgType" is received we call the
// corresponding request handler on this machine
// . use this function to register a handler for a msgType
// . we do NOT destroy the slot after calling the handler
// . handler MUST call sendReply() or sendErrorReply() no matter what
// . returns true if handler registered successfully
// . returns false on error
// . if you want the handler to be called while in an async signal
// handler then set "isHandlerHot" to true
bool registerHandler ( unsigned char msgType,
void(* handler)(UdpSlot *,int32_t) ,
bool isHandlerHot = false );
// . frees the m_readBuf and m_sendBuf
// . marks the slot as available
// . called after callback called for a slot you used to send a request
// . called after sendReply()/sendErrorReply() completes or has error
void destroySlot ( UdpSlot *slot );
// . take a slot that we made from sendRequest() above and reset it
// . you request will be sent again w/ the original parameters
// void resendSlot ( UdpSlot *slot );
// these *Poll() routines must be public so wrappers can call them
// . this is called by main/Loop.cpp when m_sock is ready for reading
// . actually it calls readPollWrapper() which calls this
// . this calls registerd handlers for recvd REQUESTS based on msgType
// . this calls callbacks (in UdpSlot) for received REPLIES
// . returns false if it read nothing and had no errors
// . returns true if it read something or had an error so you can
// call it again ASAP because it has unfinished business
bool readPoll ( int64_t now );
// . this is called by main/Loop.cpp when m_sock is ready for writing
// . actually it calls sendPollWrapper()
// . it sends as much as it can from all UdpSlots until one blocks
// or until it's done
// . sends both dgrams AND ACKs
bool sendPoll_ass ( bool allowResends , int64_t now );
// called every 30ms to get tokens? not any more...
void timePoll ( );
// called by readPoll()/sendPoll()/readTimeoutPoll() to do
// reading/sending/callbacks in that order until nothing left to do
void process_ass ( int64_t now , int32_t maxNiceness = 100);
// . this is called by main/Loop.cpp every second
// . actually it calls readTimeoutPollWrapper()
// . call the callback of reception slots that have timed out
// . return true if we did something, like reset a slot for resend
// or timed a slot out so it's callback should be called
bool readTimeoutPoll ( int64_t now ) ;
// how nice as a server are we?
//int32_t getNiceness ( ) { return m_niceness; };
// m_token should point to shared memory
//bool useSharedMem() ;
//bool setupSharedMem() ;
// . sets *s_token and *s_token2 to NULL
// . resets bits and bit counts on all big-reply slots so they
// send dgrams or acks in attempt to get the newly available token
//void releaseTokens ( UdpSlot *slot = NULL ) ;
// . this will wait until all fully received requests have had their
// reply sent to them
// . in the meantime it will send back error replies to all new
// incoming requests
// . this will do a blocking close on the listening socket descriptor
// . returns false if blocked, true otherwise if shutdown immediate
// . set g_errno on error
bool shutdown ( bool urgent );
// the high priority udp server suspends/resumes the low priority one
// when it has/doesn't-have pending requests/replies
void suspend ();
void resume ();
bool isSuspended () { return m_isSuspended; };
bool needBottom () { return m_needBottom; }
UdpSlot *getUdpSlotNum ( int32_t i ) { return &m_slots[i]; };
//bool isEmpty ( int32_t i ) { return (m_keys[i].n0 == 0LL);};
//int32_t getTopUsedSlot ( ) { return m_topUsedSlot; };
// try calling makeCallback() on all slots
bool makeCallbacks_ass ( int32_t niceness );
bool makeCallbacks_ass2( int32_t niceness );
// return true if we turned them off, false if we did not
bool interruptsOff() {
if ( m_isRealTime && g_interruptsOn && ! g_inSigHandler ) {
g_loop.interruptsOff();
return true;
}
return false; };
void interruptsOn() { g_loop.interruptsOn(); };
// when a call to sendto() blocks we set this to true so Loop.cpp
// will know to manually call sendPoll_ass() rather than counting
// on receiving a fd-ready-for-writing signal for this UdpServer
bool m_needToSend;
bool m_writeRegistered;
UdpSlot *getActiveHead ( ) { return m_head2; };
// callback linked list functions (m_head3)
void addToCallbackLinkedList ( UdpSlot *slot ) ;
bool isInCallbackLinkedList ( UdpSlot *slot );
void removeFromCallbackLinkedList ( UdpSlot *slot ) ;
// cancel a transaction
void cancel ( void *state , unsigned char msgType ) ;
// replace ips and ports in outstanding slots
void replaceHost ( Host *oldHost, Host *newHost );
void printState();
// count how many of each msgType we drop, report on PageStats.cpp
//int32_t m_droppedNiceness0[128];
//int32_t m_droppedNiceness1[128];
// . we have up to 1 handler routine for each msg type
// . call these handlers for the corresponding msgType
// . msgTypes go from 0 to 64 i think (see UdpProtocol.h dgram header)
void (* m_handlers[MAX_MSG_TYPES])(UdpSlot *slot, int32_t niceness);
// changes timeout to very low on dead hosts
bool timeoutDeadHosts ( class Host *h );
// private:
// . we maintain a sequential list of transaction ids to guarantee
// uniqueness to a point
// . if server is restarted this will go back to 0 though
// . the key of a UdpSlot is based on this, the endpoint ip/port and
// whether it's a request/reply by/from us
int32_t getTransId ( ) {
int32_t tid = m_nextTransId;
m_nextTransId++;
if ( m_nextTransId >= UDP_MAX_TRANSID ) m_nextTransId = 0;
return tid;
};
// . send as many dgrams as you can from slot's m_sendBuf
// . returns false and sets errno on error, true otherwise
bool doSending_ass ( UdpSlot *slot, bool allowResends, int64_t now );
// . calls a m_handler request handler if slot->m_callback is NULL
// which means it was an incoming request
// . otherwise calls slot->m_callback because it was an outgoing
// request
bool makeCallback_ass ( UdpSlot *slot ) ;
// . picks the slot that is most caught up to it's ACKs
// . picks resends first, however
// . then we send a dgram from that slot
UdpSlot *getBestSlotToSend ( int64_t now );
// . reads a pending dgram on the udp stack
// . returns -1 on error, 0 if blocked, 1 if completed reading dgram
// . called by readPoll()
int32_t readSock_ass ( UdpSlot **slot , int64_t now );
// a debug util
void dumpdgram ( char *dgram , int32_t dgramSize );
// returns false if cannot shutdown right now due to pending traffic
//bool tryShuttingDown ( bool callCallback ) ;
// our listening/sending udp socket and port
int m_sock ;
uint16_t m_port ;
// used to prevent cpu usage by sendPoll() when nothing can be sent
bool m_gotMsgsToSend;
// for defining your own protocol on top of udp
UdpProtocol *m_proto;
// is the handler meant to be called from an async signal handler?
bool m_isHandlerHot [ MAX_MSG_TYPES ];
// . we need a transaction id for every transaction so we can match
// incoming reply msgs with their corresponding request msgs
// . TODO: should be stored to disk on shutdown and every 1024 sends
// . store a shutdown bit with it so we know if we crashed
// . on crashes add 1024 or so to the read value
// . TODO: make somewhat random cuz it's easy to spoof like it is now
int32_t m_nextTransId;
// our niceness level
//int32_t m_niceness;
// called when shutdown completes
void (*m_shutdownCallback )( void *state );
void *m_shutdownState;
bool m_isShuttingDown;
//did we have to give back control before we called all of the
bool m_needBottom;
// are we suspended so the high priority server can be more efficient?
bool m_isSuspended;
// . how many requests are we handling at this momment
// . does not include requests whose replies we are sending, only
// those whose replies have not yet been generated
// . starts counting as soon as first dgram of request is recvd
int32_t m_requestsInWaiting;
// like m_requestsInWaiting but requests which spawn other requests
int32_t m_msg07sInWaiting;
int32_t m_msg10sInWaiting;
int32_t m_msgc1sInWaiting;
//int32_t m_msgDsInWaiting;
//int32_t m_msg23sInWaiting;
int32_t m_msg25sInWaiting;
int32_t m_msg50sInWaiting;
int32_t m_msg39sInWaiting;
int32_t m_msg20sInWaiting;
int32_t m_msg2csInWaiting;
int32_t m_msg0csInWaiting;
int32_t m_msg0sInWaiting;
// do we live on interrupts?
bool m_isRealTime;
int32_t m_outstandingConverts;
// we now avoid malloc with these
//UdpSlot m_slots [ MAX_UDP_SLOTS ];
// but alloc MAX_UDP_SLOTS of these in init so we don't blow the stack
UdpSlot *m_slots ;
//key_t m_keys [ MAX_UDP_SLOTS ];
//int32_t m_topUsedSlot;
int32_t m_maxSlots;
// routines
UdpSlot *getEmptyUdpSlot_ass ( key_t k , bool incoming );
void freeUdpSlot_ass ( UdpSlot *slot );
void addKey ( key_t key , UdpSlot *ptr ) ;
// verified these are only called from within _ass routines that
// turn them interrupts off before calling this
//bool isEmpty ( UdpSlot *slot ) {
// return isEmpty ( slot - m_slots ); };
UdpSlot *getUdpSlot ( key_t k );
// . hash table for converting keys to slots
// . if m_ptrs[i] is NULL, ith bucket is empty
UdpSlot **m_ptrs;
int32_t m_numBuckets;
uint32_t m_bucketMask;
char *m_buf; // memory to hold m_ptrs
int32_t m_bufSize;
// linked list of available slots (uses UdpSlot::m_next)
UdpSlot *m_head;
// linked list of slots in use
UdpSlot *m_head2;
UdpSlot *m_tail2;
// linked list of callback candidates
UdpSlot *m_head3;
UdpSlot *m_tail3;
int32_t m_numUsedSlots;
int32_t m_numUsedSlotsIncoming;
// stats
public:
int64_t m_eth0BytesIn;
int64_t m_eth0BytesOut;
int64_t m_eth0PacketsIn;
int64_t m_eth0PacketsOut;
int64_t m_eth1BytesIn;
int64_t m_eth1BytesOut;
int64_t m_eth1PacketsIn;
int64_t m_eth1PacketsOut;
int64_t m_outsiderPacketsIn;
int64_t m_outsiderPacketsOut;
int64_t m_outsiderBytesIn;
int64_t m_outsiderBytesOut;
};
extern class UdpServer g_udpServer;
// this is the high priority udpServer, it's requests are handled first
extern class UdpServer g_udpServer2;
extern int32_t g_dropped;
extern int32_t g_corruptPackets;
extern bool g_inHandler;
#endif