Akumuli
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
paa.h
1 #pragma once
2 
3 #include <memory>
4 #include <unordered_map>
5 #include <vector>
6 
7 #include "../queryprocessor_framework.h"
8 
9 namespace Akumuli {
10 namespace QP {
11 
12 
14 template <class State> struct PAA : Node {
15  std::shared_ptr<Node> next_;
16  std::unordered_map<aku_ParamId, State> counters_;
17 
18  PAA(std::shared_ptr<Node> next)
19  : next_(next) {}
20 
21  bool average_samples(aku_Sample const& margin) {
22  std::vector<aku_ParamId> ids;
23  for (auto& pair : counters_) {
24  ids.push_back(pair.first);
25  }
26  if (margin.payload.type == aku_PData::LO_MARGIN) {
27  // Moving in backward direction
28  std::sort(ids.begin(), ids.end(), std::greater<aku_ParamId>());
29  } else {
30  // Moving forward
31  std::sort(ids.begin(), ids.end(), std::less<aku_ParamId>());
32  }
33  for (auto id : ids) {
34  State& state = counters_[id];
35  if (state.ready()) {
36  aku_Sample sample;
37  sample.paramid = id;
38  sample.payload.float64 = state.value();
39  sample.payload.type = AKU_PAYLOAD_FLOAT;
40  sample.payload.size = sizeof(aku_Sample);
41  sample.timestamp = margin.timestamp;
42  state.reset();
43  if (!next_->put(sample)) {
44  return false;
45  }
46  }
47  }
48  if (!next_->put(margin)) {
49  return false;
50  }
51  return true;
52  }
53 
54  virtual void complete() { next_->complete(); }
55 
56  virtual bool put(const aku_Sample& sample) {
57  if (sample.payload.type > aku_PData::MARGIN) {
58  if (!average_samples(sample)) {
59  return false;
60  }
61  } else {
62  auto& state = counters_[sample.paramid];
63  state.add(sample);
64  }
65  return true;
66  }
67 
68  virtual void set_error(aku_Status status) { next_->set_error(status); }
69 
70  virtual int get_requirements() const { return GROUP_BY_REQUIRED; }
71 };
72 
73 
74 struct MeanCounter {
75  double acc = 0;
76  size_t num = 0;
77 
78  void reset();
79 
80  double value() const;
81 
82  bool ready() const;
83 
84  void add(aku_Sample const& value);
85 };
86 
87 struct MeanPAA : PAA<MeanCounter> {
88 
89  MeanPAA(std::shared_ptr<Node> next);
90 
91  MeanPAA(boost::property_tree::ptree const&, std::shared_ptr<Node> next);
92 };
93 
94 struct MedianCounter {
95  mutable std::vector<double> acc;
96 
97  void reset();
98 
99  double value() const;
100 
101  bool ready() const;
102 
103  void add(aku_Sample const& value);
104 };
105 
106 struct MedianPAA : PAA<MedianCounter> {
107 
108  MedianPAA(std::shared_ptr<Node> next);
109 
110  MedianPAA(boost::property_tree::ptree const&, std::shared_ptr<Node> next);
111 };
112 
113 template <class SelectFn> struct ValueSelector {
114  double acc;
115  size_t num;
116 
117  void reset() {
118  acc = 0;
119  num = 0;
120  }
121 
122  double value() const { return acc; }
123 
124  bool ready() const { return num != 0; }
125 
126  void add(aku_Sample const& value) {
127  if (!num) {
128  acc = value.payload.float64;
129  } else {
130  SelectFn fn;
131  acc = fn(acc, value.payload.float64);
132  }
133  num++;
134  }
135 };
136 
137 template <class SelectFn> struct GenericPAA : PAA<ValueSelector<SelectFn>> {
138  GenericPAA(std::shared_ptr<Node> next)
139  : PAA<ValueSelector<SelectFn>>(next) {}
140 
141  GenericPAA(boost::property_tree::ptree const&, std::shared_ptr<Node> next)
142  : PAA<ValueSelector<SelectFn>>(next) {}
143 };
144 }
145 } // namespace
Definition: paa.h:74
virtual int get_requirements() const
Definition: paa.h:70
Definition: paa.h:137
Definition: paa.h:106
virtual void complete()
Complete adding values.
Definition: paa.h:54
Definition: queryprocessor_framework.h:28
virtual bool put(const aku_Sample &sample)
Definition: paa.h:56
Definition: paa.h:87
Definition: paa.h:94
Definition: paa.h:113
Generic piecewise aggregate approximation.
Definition: paa.h:14