Akumuli
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
sequencer.h
1 
23 #pragma once
24 #include "cursor.h"
25 #include "page.h"
26 #include "queryprocessor_framework.h"
27 
28 #include <algorithm>
29 #include <memory>
30 #include <mutex>
31 #include <tuple>
32 #include <vector>
33 
34 namespace Akumuli {
35 
36 namespace V2 {
37 
38  struct PatienceSort {
39  struct SortedRun {
40  std::vector<aku_Timestamp> timestamps;
41  std::vector<double> values;
42  };
43  typedef std::shared_ptr<SortedRun> PSortedRun;
44 
45  PSortedRun key_;
46  std::vector<PSortedRun> runs_;
47 
48  PatienceSort();
49 
50  void add(aku_Timestamp ts, double value);
51  };
52 
53 }
54 
56 
57  enum ValueType {
58  BLOB,
59  DOUBLE,
60  };
61 
62  // Data members
63  aku_Timestamp key_ts_; // Key value (time)
64  aku_ParamId key_id_; // Key value (id)
65  double value; // Numeric payload
66 
68 
69  TimeSeriesValue(aku_Timestamp ts, aku_ParamId id, double value);
70 
71  aku_Timestamp get_timestamp() const;
72 
73  aku_ParamId get_paramid() const;
74 
75  aku_Sample to_result() const;
76 
77  void add_to_header(UncompressedChunk* chunk_header) const;
78 
79  friend bool operator<(TimeSeriesValue const& lhs, TimeSeriesValue const& rhs);
80 
82  friend bool chunk_order_LT(TimeSeriesValue const& lhs, TimeSeriesValue const& rhs);
83 };
84 
85 std::ostream& operator<<(std::ostream& str, TimeSeriesValue const& val);
86 
87 
94 struct Sequencer {
95  typedef std::vector<TimeSeriesValue> SortedRun;
96  typedef std::shared_ptr<SortedRun> PSortedRun;
97  typedef std::mutex Mutex;
98  typedef std::unique_lock<Mutex> Lock;
99 
100  static const int RUN_LOCK_MAX_BACKOFF = 0x100;
101  static const int RUN_LOCK_BUSY_COUNT = 0xFFF;
102  static const int RUN_LOCK_FLAGS_MASK = 0x0FF;
103  static const int RUN_LOCK_FLAGS_SIZE = 0x100;
104 
105  // TODO: space usage should be limited
106 
107  std::vector<PSortedRun> runs_; //< Active sorted runs
108  std::vector<PSortedRun> ready_; //< Ready to merge
109  PSortedRun key_;
110  const aku_Duration window_size_;
111  aku_Timestamp top_timestamp_; //< Largest timestamp ever seen
112  aku_Timestamp checkpoint_; //< Last checkpoint timestamp
113  mutable std::atomic_int
114  sequence_number_; //< Flag indicates that merge operation is in progress and
115  //< search will return inaccurate results.
116  //< If progress_flag_ is odd - merge is in progress if it is
117  //< even - there is no merge and search will work correctly.
118  mutable Mutex runs_resize_lock_;
119  mutable std::vector<RWLock> run_locks_;
120  const size_t c_threshold_; //< Compression threshold
121 
122  Sequencer(aku_FineTuneParams const& config);
123 
128  std::tuple<aku_Status, int> add(TimeSeriesValue const& value);
129 
131  void merge(Caller& caller, InternalCursor* cur);
132 
137  aku_Status merge_and_compress(PageHeader* target, bool enforce_write = false);
138 
140  aku_Status close(PageHeader* target);
141 
146  int reset();
147 
158  void search(std::shared_ptr<QP::IQueryProcessor> query, int sequence_number) const;
159 
160  std::tuple<aku_Timestamp, int> get_window() const;
161 
162 private:
164  aku_Timestamp get_checkpoint_(aku_Timestamp ts) const;
165 
167  aku_Timestamp get_timestamp_(aku_Timestamp cp) const;
168 
169  // move sorted runs to ready_ collection
170  int make_checkpoint_(aku_Timestamp new_checkpoint);
171 
175  std::tuple<aku_Status, int> check_timestamp_(aku_Timestamp ts);
176 
177  void filter(PSortedRun run, std::shared_ptr<QP::IQueryProcessor> query,
178  std::vector<PSortedRun>* results) const;
179 };
180 }
Definition: sequencer.h:55
std::tuple< aku_Status, int > add(TimeSeriesValue const &value)
Timestamp of the sample can be out of order.
Definition: sequencer.cpp:233
aku_Status close(PageHeader *target)
Close cache for writing, merge everything to page header.
Definition: sequencer.cpp:286
void search(std::shared_ptr< QP::IQueryProcessor > query, int sequence_number) const
Definition: sequencer.cpp:543
Definition: page.h:86
void merge(Caller &caller, InternalCursor *cur)
Simple merge and sync without compression. (depricated)
Definition: sequencer.cpp:432
Definition: internal_cursor.h:41
friend bool chunk_order_LT(TimeSeriesValue const &lhs, TimeSeriesValue const &rhs)
Chunk order less then operator (id goes first, then goes timestamp)
Definition: sequencer.cpp:126
Akumuli can accept unordered time-series (this is the case when clocks of the different time-series s...
Definition: sequencer.h:94
Definition: compression.h:39
Definition: sequencer.h:39
int reset()
Definition: sequencer.cpp:304
aku_Status merge_and_compress(PageHeader *target, bool enforce_write=false)
Definition: sequencer.cpp:460
Definition: sequencer.h:38