Akumuli
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
tcp_server.h
1 
17 #pragma once
18 
19 #include <memory>
20 
21 #include <boost/asio.hpp>
22 #include <boost/bind.hpp>
23 #include <boost/thread/barrier.hpp>
24 
25 #include "logger.h"
26 #include "protocolparser.h"
27 #include "server.h"
28 
29 namespace Akumuli {
30 
31 // //
32 // Type aliases from boost.asio //
33 // //
34 
35 typedef boost::asio::io_service IOServiceT;
36 typedef boost::asio::ip::tcp::acceptor AcceptorT;
37 typedef boost::asio::ip::tcp::socket SocketT;
38 typedef boost::asio::ip::tcp::endpoint EndpointT;
39 typedef boost::asio::strand StrandT;
40 typedef boost::asio::io_service::work WorkT;
41 
45 class TcpSession : public std::enable_shared_from_this<TcpSession> {
46  // TODO: Unique session ID
47  enum {
48  BUFFER_SIZE = 0x1000, //< Buffer size
49  BUFFER_SIZE_THRESHOLD = 0x0200, //< Min free buffer space
50  };
51  IOServiceT* io_;
52  SocketT socket_;
53  StrandT strand_;
54  std::shared_ptr<PipelineSpout> spout_;
55  ProtocolParser parser_;
56  Logger logger_;
57 
58 public:
59  typedef std::shared_ptr<Byte> BufferT;
60  TcpSession(IOServiceT* io, std::shared_ptr<PipelineSpout> spout);
61 
62  SocketT& socket();
63 
64  void start(BufferT buf, size_t buf_size, size_t pos, size_t bytes_read);
65 
66  PipelineErrorCb get_error_cb();
67 
68  static BufferT NO_BUFFER;
69 
70 private:
78  std::tuple<BufferT, size_t, size_t> get_next_buffer(BufferT prev_buf, size_t size, size_t pos,
79  size_t bytes_read);
80 
81  void handle_read(BufferT buffer, size_t pos, size_t buf_size, boost::system::error_code error,
82  size_t nbytes);
83 
84  void handle_write_error(boost::system::error_code error);
85 
86  void drain_pipeline_spout();
87 };
88 
89 
93 class TcpAcceptor : public std::enable_shared_from_this<TcpAcceptor> {
94  IOServiceT own_io_; //< Acceptor's own io-service
95  AcceptorT acceptor_; //< Acceptor
96  std::vector<IOServiceT*> sessions_io_; //< List of io-services for sessions
97  std::vector<WorkT> sessions_work_; //< Work to block io-services from completing too early
98  std::shared_ptr<IngestionPipeline> pipeline_; //< Pipeline instance
99  std::atomic<int> io_index_; //< I/O service index
100 
101  boost::barrier start_barrier_; //< Barrier to start worker thread
102  boost::barrier stop_barrier_; //< Barrier to stop worker thread
103 
104  Logger logger_;
105 
106 public:
112  TcpAcceptor( // Server parameters
113  std::vector<IOServiceT*> io, int port,
114  // Storage & pipeline
115  std::shared_ptr<IngestionPipeline> pipeline);
116 
118  void start();
119 
121  void stop();
122 
124  void _stop();
125 
127  void _start();
128 
130  void _run_one();
131 
132 private:
134  void handle_accept(std::shared_ptr<TcpSession> session, boost::system::error_code err);
135 };
136 
137 
138 struct TcpServer : std::enable_shared_from_this<TcpServer>, Server {
139  std::shared_ptr<IngestionPipeline> pline;
140  std::shared_ptr<TcpAcceptor> serv;
141  boost::asio::io_service io;
142  std::vector<IOServiceT*> iovec;
143  boost::barrier barrier;
144  std::atomic<int> stopped;
145  Logger logger_;
146 
147  TcpServer(std::shared_ptr<IngestionPipeline> pipeline, int concurrency, int port);
148 
150  virtual void start(SignalHandler* sig_handler, int id);
151 
153  void stop();
154 };
155 }
void start()
Start listening on socket.
Definition: tcp_server.cpp:167
Definition: tcp_server.h:138
void _stop()
Stop listening on socket (for testing)
Definition: tcp_server.cpp:227
TcpAcceptor(std::vector< IOServiceT * > io, int port, std::shared_ptr< IngestionPipeline > pipeline)
Definition: tcp_server.cpp:146
void stop()
Stop listening on socket.
Definition: tcp_server.cpp:217
void _run_one()
Run one handler (should be used only for testing)
Definition: tcp_server.cpp:197
Server interface.
Definition: server.h:64
Definition: protocolparser.h:54
void stop()
Stop the server (should be called from signal handler)
Definition: tcp_server.cpp:287
Definition: signal_handler.h:12
Definition: tcp_server.h:45
Definition: log_iface.h:9
virtual void start(SignalHandler *sig_handler, int id)
Run IO service.
Definition: tcp_server.cpp:261
std::function< void(aku_Status, uint64_t)> PipelineErrorCb
Callback from pipeline to session.
Definition: ingestion_pipeline.h:112
void _start()
Start implementation (this method is public only for testing purposes)
Definition: tcp_server.cpp:201
Definition: tcp_server.h:93