Akumuli
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
protocolparser.h
1 
17 #pragma once
18 
19 // Using old-style boost.coroutines
20 #define BOOST_COROUTINES_BIDIRECT
21 #include <boost/coroutine/all.hpp>
22 #include <cstdint>
23 #include <memory>
24 #include <queue>
25 #include <vector>
26 
27 #include "logger.h"
28 #include "protocol_consumer.h"
29 #include "resp.h"
30 #include "stream.h"
31 
32 namespace Akumuli {
33 
35 struct PDU {
36  std::shared_ptr<const Byte>
37  buffer; //< Pointer to buffer (buffer can be referenced by several PDU)
38  size_t size; //< Size of the buffer
39  size_t pos; //< Position in the buffer
40 };
41 
43  ProtocolParserError(std::string line, int pos);
44 };
45 
46 typedef boost::coroutines::coroutine<void()> Coroutine;
47 typedef typename Coroutine::caller_type Caller;
48 
49 
51 struct EStopIteration {};
52 
53 
55  mutable std::shared_ptr<Coroutine> coroutine_;
56  mutable Caller* caller_;
57  mutable std::queue<PDU> buffers_;
58  static const PDU POISON_; //< This object marks end of the stream
59  bool done_;
60  std::shared_ptr<ProtocolConsumer> consumer_;
61  Logger logger_;
62 
63  void worker(Caller& yield);
64  void set_caller(Caller& caller);
66  void yield_to_worker();
68  void yield_to_client() const;
70  void throw_if_poisoned(PDU const& top) const;
72  std::tuple<std::string, size_t> get_error_from_pdu(PDU const& pdu) const;
73 
74 public:
75  ProtocolParser(std::shared_ptr<ProtocolConsumer> consumer);
76  void start();
77  void parse_next(PDU pdu);
78 
79  // ByteStreamReader interface
80 public:
81  virtual Byte get();
82  virtual Byte pick() const;
83  virtual bool is_eof();
84  virtual int read(Byte* buffer, size_t buffer_len);
85  virtual void close();
86  virtual std::tuple<std::string, size_t> get_error_context(const char* msg) const;
87 };
88 
89 
90 } // namespace
Definition: stream.h:29
Stop iteration exception.
Definition: protocolparser.h:51
Definition: protocolparser.h:42
virtual void close()
Definition: protocolparser.cpp:218
virtual std::tuple< std::string, size_t > get_error_context(const char *msg) const
Definition: protocolparser.cpp:259
Definition: protocolparser.h:54
virtual int read(Byte *buffer, size_t buffer_len)
Definition: protocolparser.cpp:191
Definition: stream.h:44
Definition: protocolparser.h:35
virtual Byte pick() const
Definition: protocolparser.cpp:169
Definition: log_iface.h:9
virtual bool is_eof()
Definition: protocolparser.cpp:187