Akumuli
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
spacesaver.h
1 #pragma once
2 
3 #include <memory>
4 #include <unordered_map>
5 
6 #include "../queryprocessor_framework.h"
7 
8 namespace Akumuli {
9 namespace QP {
10 
11 template <bool weighted> struct SpaceSaver : Node {
12  std::shared_ptr<Node> next_;
13 
14  struct Item {
15  double count;
16  double error;
17  };
18 
19  std::unordered_map<aku_ParamId, Item> counters_;
21  double N;
22  size_t M;
23  double P;
24 
31  SpaceSaver(double error, double portion, std::shared_ptr<Node> next)
32  : next_(next)
33  , N(0)
34  , M(ceil(1.0 / error))
35  , P(portion) // between 0 and 1
36  {
37  assert(P >= 0.0);
38  assert(P <= 1.0);
39  }
40 
41  SpaceSaver(boost::property_tree::ptree const& ptree, std::shared_ptr<Node> next)
42  : next_(next) {
43  double error = ptree.get<double>("error");
44  double portion = ptree.get<double>("portion");
45  if (error == 0.0) {
46  QueryParserError error("`error` can't be 0.");
47  BOOST_THROW_EXCEPTION(error);
48  }
49  M = ceil(1.0 / error);
50  P = portion;
51  if (P < 0.0) {
52  QueryParserError error("`portion` can't be negative");
53  BOOST_THROW_EXCEPTION(error);
54  }
55  if (P > 1.0) {
56  QueryParserError error("`portion` can't be greater then 1.");
57  BOOST_THROW_EXCEPTION(error);
58  }
59  }
60 
61  bool count() {
62  std::vector<aku_Sample> samples;
63  auto support = N * P;
64  for (auto it : counters_) {
65  auto estimate = it.second.count - it.second.error;
66  if (support < estimate) {
67  aku_Sample s;
68  s.paramid = it.first;
69  s.payload.type = aku_PData::PARAMID_BIT | aku_PData::FLOAT_BIT;
70  s.payload.float64 = it.second.count;
71  s.payload.size = sizeof(aku_Sample);
72  samples.push_back(s);
73  }
74  }
75  std::sort(samples.begin(), samples.end(), [](const aku_Sample& lhs, const aku_Sample& rhs) {
76  return lhs.payload.float64 > rhs.payload.float64;
77  });
78  for (const auto& s : samples) {
79  if (!next_->put(s)) {
80  return false;
81  }
82  }
83  counters_.clear();
84  return true;
85  }
86 
87  virtual void complete() {
88  count();
89  next_->complete();
90  }
91 
92  virtual bool put(const aku_Sample& sample) {
93  if (sample.payload.type > aku_PData::MARGIN) {
94  return count();
95  }
96  if (weighted) {
97  if ((sample.payload.type & aku_PData::FLOAT_BIT) == 0) {
98  return true;
99  }
100  }
101  auto id = sample.paramid;
102  auto weight = weighted ? sample.payload.float64 : 1.0;
103  auto it = counters_.find(id);
104  if (it == counters_.end()) {
105  // new element
106  double count = weight;
107  double error = 0;
108  if (counters_.size() == M) {
109  // remove element with smallest count
110  size_t min = std::numeric_limits<size_t>::max();
111  auto min_iter = it;
112  for (auto i = counters_.begin(); i != counters_.end(); i++) {
113  if (i->second.count < min) {
114  min_iter = i;
115  min = i->second.count;
116  }
117  }
118  counters_.erase(min_iter);
119  count += min;
120  error = min;
121  }
122  counters_[id] = { count, error };
123  } else {
124  // increment
125  it->second.count += weight;
126  }
127  N += weight;
128  return true;
129  }
130 
131  virtual void set_error(aku_Status status) { next_->set_error(status); }
132 
133  virtual int get_requirements() const { return EMPTY | TERMINAL; }
134 };
135 }
136 } // namespace
virtual int get_requirements() const
Definition: spacesaver.h:133
SpaceSaver(double error, double portion, std::shared_ptr< Node > next)
Definition: spacesaver.h:31
double N
Capacity.
Definition: spacesaver.h:21
Definition: spacesaver.h:11
virtual void complete()
Complete adding values.
Definition: spacesaver.h:87
Definition: queryprocessor_framework.h:28
Exception triggered by query parser.
Definition: queryprocessor_framework.h:14
Definition: spacesaver.h:14
virtual bool put(const aku_Sample &sample)
Definition: spacesaver.h:92