Akumuli
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
ingestion_pipeline.h
1 
17 #pragma once
18 #include <atomic>
19 #include <functional>
20 #include <memory>
21 #include <string>
22 
23 #include <boost/lockfree/queue.hpp>
24 #include <boost/thread/barrier.hpp>
25 
26 #include "logger.h"
27 #include "protocol_consumer.h"
28 // akumuli-storage API
29 #include "akumuli.h"
30 #include "akumuli_config.h"
31 
32 namespace Akumuli {
33 
35 struct DbCursor {
37  virtual size_t read(void* dest, size_t dest_size) = 0;
38 
40  virtual int is_done() = 0;
41 
43  virtual bool is_error(aku_Status* out_error_code_or_null) = 0;
44 
46  virtual void close() = 0;
47 };
48 
50 struct DbConnection {
51 
52  virtual ~DbConnection() {}
53 
54  virtual void close() = 0;
55 
57  virtual aku_Status write(const aku_Sample& sample) = 0;
58 
60  virtual std::shared_ptr<DbCursor> search(std::string query) = 0;
61 
63  virtual int param_id_to_series(aku_ParamId id, char* buffer, size_t buffer_size) = 0;
64 
65  virtual aku_Status series_to_param_id(const char* name, size_t size, aku_Sample* sample) = 0;
66 
67  virtual std::string get_all_stats() = 0;
68 };
69 
70 
73 public:
74  enum Durability {
75  MaxDurability = 1,
76  RelaxedDurability = 2,
77  MaxThroughput = 4,
78  };
79 
80 private:
81  std::string dbpath_;
82  aku_Database* db_;
83 
84 public:
85  AkumuliConnection(const char* path, bool hugetlb, Durability durability,
86  uint32_t compression_threshold, uint64_t window_width, uint64_t cache_size);
87 
88  virtual void close();
89 
90  virtual aku_Status write(const aku_Sample& sample);
91 
92  virtual std::shared_ptr<DbCursor> search(std::string query);
93 
94  virtual int param_id_to_series(aku_ParamId id, char* buffer, size_t buffer_size);
95 
96  virtual aku_Status series_to_param_id(const char* name, size_t size, aku_Sample* sample);
97 
98  virtual std::string get_all_stats();
99 };
100 
101 using boost::lockfree::queue;
102 using boost::lockfree::capacity;
103 
104 
105 enum BackoffPolicy {
106  AKU_THROTTLE,
107  AKU_LINEAR_BACKOFF,
108 };
109 
110 
112 typedef std::function<void(aku_Status, uint64_t)> PipelineErrorCb;
113 
114 
125 
126  // Constants
127  enum {
129  POOL_SIZE = 0x200,
130  QCAP = 0x10,
131  };
132 
133  // Typedefs
134  typedef struct { char emptybits[64]; } Padding; //< Padding
135  typedef std::atomic<uint64_t> SpoutCounter; //< Shared counter
136  typedef struct {
137  aku_Sample sample; //< Value
138  SpoutCounter* cnt; //< Pointer to spout's shared counter
139  PipelineErrorCb* on_error; //< On error callback
140  } TVal; //< Value
141  typedef std::shared_ptr<TVal> PVal; //< Pointer to value
142  typedef queue<TVal*> Queue; //< Queue class
143  typedef std::shared_ptr<Queue> PQueue; //< Pointer to queue
144  typedef std::shared_ptr<DbConnection> PDatabase; //< Database "connection"
145 
146  // Data
147  SpoutCounter created_; //< Created elements counter
148  Padding pad0;
149  SpoutCounter deleted_; //< Deleted elements counter
150  std::vector<PVal> pool_; //< TVal pool
151  Padding pad1;
152  PQueue queue_; //< Queue
153  const BackoffPolicy backoff_;
154  Logger logger_; //< Logger instance
155  PipelineErrorCb on_error_; //< Session callback
156  PDatabase db_;
157 
158  // C-tor
159  PipelineSpout(std::shared_ptr<Queue> q, BackoffPolicy bp, std::shared_ptr<DbConnection> con);
160  ~PipelineSpout();
161 
162  void set_error_cb(PipelineErrorCb cb);
163 
164  // ProtocolConsumer
165  virtual void write(const aku_Sample& sample);
166  virtual void add_bulk_string(const Byte* buffer, size_t n);
167 
168  // Utility
171 
175  void get_error(std::ostream& ostr);
176 
177  aku_Status series_to_param_id(const char* str, size_t strlen, aku_Sample* sample);
178 
180  bool is_empty() const;
181 };
182 
183 class IngestionPipeline : public std::enable_shared_from_this<IngestionPipeline> {
184  enum {
185  N_QUEUES = 8,
186  };
187  typedef boost::barrier Barr;
188  std::shared_ptr<DbConnection> con_; //< DB connection
189  std::vector<PipelineSpout::PQueue> queues_; //< Queues collection
190  std::atomic<int> ixmake_; //< Index for the make_spout mehtod
191  Barr stopbar_; //< Stopping barrier
192  Barr startbar_; //< Stopping barrier
193  static PipelineSpout::TVal* POISON; //< Poisoned object to stop worker thread
194  static int TIMEOUT; //< Close timeout
195  const BackoffPolicy backoff_; //< Back-pressure policy
196  Logger logger_; //< Logger instance
197 public:
200  IngestionPipeline(std::shared_ptr<DbConnection> con, BackoffPolicy bp = AKU_THROTTLE);
201 
204  void start();
205 
207  std::shared_ptr<PipelineSpout> make_spout();
208 
209  void stop();
210 };
211 
212 } // namespace Akumuli
virtual bool is_error(aku_Status *out_error_code_or_null)=0
Check for error condition.
PVal pool size.
Definition: ingestion_pipeline.h:129
bool is_empty() const
Definition: ingestion_pipeline.cpp:167
virtual std::shared_ptr< DbCursor > search(std::string query)
Execute search query.
Definition: ingestion_pipeline.cpp:78
Definition: ingestion_pipeline.h:136
Definition: ingestion_pipeline.h:183
virtual int param_id_to_series(aku_ParamId id, char *buffer, size_t buffer_size)
Convert paramid to series name.
Definition: ingestion_pipeline.cpp:83
Definition: protocol_consumer.h:27
aku_Status series_to_param_id(const char *str, size_t strlen, aku_Sample *sample)
Convert series name to param id.
Definition: ingestion_pipeline.cpp:148
IngestionPipeline(std::shared_ptr< DbConnection > con, BackoffPolicy bp=AKU_THROTTLE)
Definition: ingestion_pipeline.cpp:173
virtual int param_id_to_series(aku_ParamId id, char *buffer, size_t buffer_size)=0
Convert paramid to series name.
int get_index_of_empty_slot()
Reserve index for the next TVal in the pool or negative value on error.
Definition: ingestion_pipeline.cpp:157
void start()
Definition: ingestion_pipeline.cpp:187
Definition: ingestion_pipeline.h:124
Definition: ingestion_pipeline.h:134
Abstraction layer above aku_Database.
Definition: ingestion_pipeline.h:50
void get_error(std::ostream &ostr)
virtual size_t read(void *dest, size_t dest_size)=0
Read data from cursor.
virtual int is_done()=0
Check is cursor is done reading.
virtual aku_Status write(const aku_Sample &sample)
Write value to DB.
Definition: ingestion_pipeline.cpp:74
virtual void close()=0
Close cursor.
virtual std::shared_ptr< DbCursor > search(std::string query)=0
Execute search query.
Definition: log_iface.h:9
Object of this class writes everything to the database.
Definition: ingestion_pipeline.h:72
std::shared_ptr< PipelineSpout > make_spout()
Definition: ingestion_pipeline.cpp:257
std::function< void(aku_Status, uint64_t)> PipelineErrorCb
Callback from pipeline to session.
Definition: ingestion_pipeline.h:112
virtual aku_Status write(const aku_Sample &sample)=0
Write value to DB.
Abstraction layer above aku_Cursor.
Definition: ingestion_pipeline.h:35