Akumuli
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
udp_server.h
1 
17 #pragma once
18 
19 #include <atomic>
20 #include <memory>
21 
22 #include <boost/thread/barrier.hpp>
23 
24 #include "ingestion_pipeline.h"
25 #include "logger.h"
26 #include "protocolparser.h"
27 #include "server.h"
28 
29 
30 namespace Akumuli {
31 
32 
35 class UdpServer : public std::enable_shared_from_this<UdpServer>, public Server {
36  std::shared_ptr<IngestionPipeline> pipeline_;
37  boost::barrier start_barrier_; //< Barrier to start worker thread
38  boost::barrier stop_barrier_; //< Barrier to stop worker thread
39  std::atomic<int> stop_;
40  const int port_;
41  const int nworkers_;
42 
43  Logger logger_;
44 
45  static const int MSS = 2048 - 128;
46  static const int NPACKETS = 512;
47 
48  struct IOBuf {
49  // Counters
50  std::atomic<uint64_t> pps;
51  std::atomic<uint64_t> bps;
52 
53  // Packet recv structs
54  mmsghdr msgs[NPACKETS];
55  iovec iovecs[NPACKETS];
56  char bufs[NPACKETS][MSS];
57 
58  IOBuf() {
59  memset(this, 0, sizeof(IOBuf));
60  for (int i = 0; i < NPACKETS; i++) {
61  iovecs[i].iov_base = bufs[i];
62  iovecs[i].iov_len = MSS;
63  msgs[i].msg_hdr.msg_iov = &iovecs[i];
64  msgs[i].msg_hdr.msg_iovlen = 1;
65  }
66  }
67 
68  } __attribute__((aligned(
69  64))); // Otherwise struct will be aligned by sizeof(bufs) and this is crazy expensive
70 
71 public:
77  UdpServer(std::shared_ptr<IngestionPipeline> pipeline, int nworkers, int port);
78 
80  virtual void start(SignalHandler* sig, int id);
81 
82 private:
84  void stop();
85 
86  void worker(std::shared_ptr<PipelineSpout> spout);
87 };
88 
89 } // namespace
UdpServer(std::shared_ptr< IngestionPipeline > pipeline, int nworkers, int port)
Definition: udp_server.cpp:16
Server interface.
Definition: server.h:64
Definition: udp_server.h:35
Definition: signal_handler.h:12
Definition: log_iface.h:9
virtual void start(SignalHandler *sig, int id)
Start processing packets.
Definition: udp_server.cpp:28