Akumuli
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
compression.h
1 
23 #pragma once
24 
25 #include <cassert>
26 #include <cstddef>
27 #include <cstdint>
28 #include <iterator>
29 #include <stdexcept>
30 #include <vector>
31 
32 #include "akumuli.h"
33 #include "util.h"
34 
35 namespace Akumuli {
36 
37 typedef std::vector<unsigned char> ByteVector;
38 
45  std::vector<aku_Timestamp> timestamps;
46  std::vector<aku_ParamId> paramids;
47  std::vector<double> values;
48 };
49 
50 struct ChunkWriter {
51 
52  virtual ~ChunkWriter() = default;
53 
57  virtual aku_MemRange allocate() = 0;
58 
60  virtual aku_Status commit(size_t bytes_written) = 0;
61 };
62 
64 template <class TVal> class Base128Int {
65  TVal value_;
66  typedef unsigned char byte_t;
67  typedef byte_t* byte_ptr;
68 
69 public:
70  Base128Int(TVal val)
71  : value_(val) {}
72 
73  Base128Int()
74  : value_() {}
75 
79  const unsigned char* get(const unsigned char* begin, const unsigned char* end) {
80  assert(begin < end);
81 
82  auto acc = TVal();
83  auto cnt = TVal();
84  const unsigned char* p = begin;
85 
86  while (true) {
87  if (p == end) {
88  return begin;
89  }
90  auto i = static_cast<byte_t>(*p & 0x7F);
91  acc |= TVal(i) << cnt;
92  if ((*p++ & 0x80) == 0) {
93  break;
94  }
95  cnt += 7;
96  }
97  value_ = acc;
98  return p;
99  }
100 
104  unsigned char* put(unsigned char* begin, const unsigned char* end) const {
105  if (begin >= end) {
106  return begin;
107  }
108 
109  TVal value = value_;
110  unsigned char* p = begin;
111 
112  while (true) {
113  if (p == end) {
114  return begin;
115  }
116  *p = value & 0x7F;
117  value >>= 7;
118  if (value != 0) {
119  *p++ |= 0x80;
120  } else {
121  p++;
122  break;
123  }
124  }
125  return p;
126  }
127 
129  operator TVal() const { return value_; }
130 };
131 
134  // underlying memory region
135  const unsigned char* begin_;
136  const unsigned char* end_;
137  unsigned char* pos_;
138 
139  Base128StreamWriter(unsigned char* begin, const unsigned char* end)
140  : begin_(begin)
141  , end_(end)
142  , pos_(begin) {}
143 
145  : begin_(other.begin_)
146  , end_(other.end_)
147  , pos_(other.pos_) {}
148 
151  template <class TVal>
152  bool tput(TVal const* iter, size_t n) {
153  auto oldpos = pos_;
154  for (size_t i = 0; i < n; i++) {
155  if (!put(iter[i])) {
156  // restore old pos_ value
157  pos_ = oldpos;
158  return false;
159  }
160  }
161  return commit(); // no-op
162  }
163 
166  template <class TVal>
167  bool put(TVal value) {
168  Base128Int<TVal> val(value);
169  unsigned char* p = val.put(pos_, end_);
170  if (pos_ == p) {
171  return false;
172  }
173  pos_ = p;
174  return true;
175  }
176 
177  template<class TVal>
178  bool put_raw(TVal value) {
179  if ((end_ - pos_) < (int)sizeof(TVal)) {
180  return false;
181  }
182  *reinterpret_cast<TVal*>(pos_) = value;
183  pos_ += sizeof(value);
184  return true;
185  }
186 
188  bool commit() { return true; }
189 
190  size_t size() const { return pos_ - begin_; }
191 
192  size_t space_left() const { return end_ - pos_; }
193 
198  template <class T>
199  T* allocate() {
200  size_t sz = sizeof(T);
201  if (space_left() < sz) {
202  return nullptr;
203  }
204  T* result = reinterpret_cast<T*>(pos_);
205  pos_ += sz;
206  return result;
207  }
208 };
209 
212  const unsigned char* pos_;
213  const unsigned char* end_;
214 
215  Base128StreamReader(const unsigned char* begin, const unsigned char* end)
216  : pos_(begin)
217  , end_(end)
218  {}
219 
220  template <class TVal> TVal next() {
221  Base128Int<TVal> value;
222  auto p = value.get(pos_, end_);
223  if (p == pos_) {
224  AKU_PANIC("can't read value, out of bounds");
225  }
226  pos_ = p;
227  return static_cast<TVal>(value);
228  }
229 
231  template <class TVal> TVal read_raw() {
232  size_t sz = sizeof(TVal);
233  if (space_left() < sz) {
234  AKU_PANIC("can't read value, out of bounds");
235  }
236  auto val = *reinterpret_cast<const TVal*>(pos_);
237  pos_ += sz;
238  return val;
239  }
240 
241  size_t space_left() const { return end_ - pos_; }
242 
243  const unsigned char* pos() const { return pos_; }
244 };
245 
246 template <class Stream, class TVal>
248  Stream stream_;
249 
251  : stream_(stream)
252  {}
253 
254  bool tput(TVal const* iter, size_t n) {
255  TVal outbuf[n];
256  for (size_t i = 0; i < n; i++) {
257  auto value = iter[i];
258  const int shift_width = sizeof(TVal) * 8 - 1;
259  auto res = (value << 1) ^ (value >> shift_width);
260  outbuf[i] = res;
261  }
262  return stream_.tput(outbuf, n);
263  }
264 
265  bool put(TVal value) {
266  // TVal should be signed
267  const int shift_width = sizeof(TVal) * 8 - 1;
268  auto res = (value << 1) ^ (value >> shift_width);
269  return stream_.put(res);
270  }
271 
272  size_t size() const { return stream_.size(); }
273 
274  bool commit() { return stream_.commit(); }
275 };
276 
277 template <class Stream, class TVal> struct ZigZagStreamReader {
278  Stream stream_;
279 
281  : stream_(stream) {}
282 
283  TVal next() {
284  auto n = stream_.next();
285  return (n >> 1) ^ (-(n & 1));
286  }
287 
288  const unsigned char* pos() const { return stream_.pos(); }
289 };
290 
291 template <class Stream, typename TVal>
293  Stream stream_;
294  TVal prev_;
295 
297  : stream_(stream)
298  , prev_() {}
299 
300  bool tput(TVal const* iter, size_t n) {
301  TVal outbuf[n];
302  for (size_t i = 0; i < n; i++) {
303  auto value = iter[i];
304  auto result = static_cast<TVal>(value) - prev_;
305  outbuf[i] = result;
306  prev_ = value;
307  }
308  return stream_.tput(outbuf, n);
309  }
310 
311  bool put(TVal value) {
312  auto result = stream_.put(static_cast<TVal>(value) - prev_);
313  prev_ = value;
314  return result;
315  }
316 
317  size_t size() const { return stream_.size(); }
318 
319  bool commit() { return stream_.commit(); }
320 };
321 
322 
323 template <class Stream, typename TVal>
325  Stream stream_;
326  TVal prev_;
327 
329  : stream_(stream)
330  , prev_() {}
331 
332  TVal next() {
333  TVal delta = stream_.next();
334  TVal value = prev_ + delta;
335  prev_ = value;
336  return value;
337  }
338 
339  const unsigned char* pos() const { return stream_.pos(); }
340 };
341 
342 
343 template <size_t Step, typename TVal>
345  Base128StreamWriter& stream_;
346  TVal prev_;
347  int put_calls_;
348 
350  : stream_(stream)
351  , prev_()
352  , put_calls_(0)
353  {}
354 
355  bool tput(TVal const* iter, size_t n) {
356  assert(n == Step);
357  TVal outbuf[n];
358  for (size_t i = 0; i < n; i++) {
359  auto value = iter[i];
360  auto result = value - prev_;
361  outbuf[i] = result;
362  prev_ = value;
363  }
364  TVal min = outbuf[0];
365  for (size_t i = 1; i < n; i++) {
366  min = std::min(outbuf[i], min);
367  }
368  for (size_t i = 0; i < n; i++) {
369  outbuf[i] -= min;
370  }
371  // encode min value
372  if (!stream_.put(min)) {
373  return false;
374  }
375  return stream_.tput(outbuf, n);
376  }
377 
378  bool put(TVal value) {
379  bool success = false;
380  if (put_calls_ == 0) {
381  success = stream_.put(0);
382  if (!success) {
383  return false;
384  }
385  }
386  put_calls_++;
387  success = stream_.put(value - prev_);
388  prev_ = value;
389  return success;
390  }
391 
392  size_t size() const { return stream_.size(); }
393 
394  bool commit() { return stream_.commit(); }
395 };
396 
397 template <size_t Step, typename TVal>
399  Base128StreamReader& stream_;
400  TVal prev_;
401  TVal min_;
402  int counter_;
403 
405  : stream_(stream)
406  , prev_()
407  , min_()
408  , counter_()
409  {}
410 
411  TVal next() {
412  if (counter_ % Step == 0) {
413  // read min
414  min_ = stream_.next<TVal>();
415  }
416  counter_++;
417  TVal delta = stream_.next<TVal>();
418  TVal value = prev_ + delta + min_;
419  prev_ = value;
420  return value;
421  }
422 
423  const unsigned char* pos() const { return stream_.pos(); }
424 };
425 
426 template <typename TVal>
428  Base128StreamWriter& stream_;
429  TVal prev_;
430  TVal reps_;
431  size_t start_size_;
432 
434  : stream_(stream)
435  , prev_()
436  , reps_()
437  , start_size_(stream.size()) {}
438 
439  bool tput(TVal const* iter, size_t n) {
440  size_t outpos = 0;
441  TVal outbuf[n*2];
442  for (size_t i = 0; i < n; i++) {
443  auto value = iter[i];
444  if (value != prev_) {
445  if (reps_) {
446  // commit changes
447  outbuf[outpos++] = reps_;
448  outbuf[outpos++] = prev_;
449  }
450  prev_ = value;
451  reps_ = TVal();
452  }
453  reps_++;
454  }
455  // commit RLE if needed
456  if (outpos < n*2) {
457  outbuf[outpos++] = reps_;
458  outbuf[outpos++] = prev_;
459  }
460  prev_ = TVal();
461  reps_ = TVal();
462  // continue
463  return stream_.tput(outbuf, outpos);
464  }
465 
466  bool put(TVal value) {
467  //
468  if (value != prev_) {
469  if (reps_) {
470  // commit changes
471  if (!stream_.put(reps_)) {
472  return false;
473  }
474  if (!stream_.put(prev_)) {
475  return false;
476  }
477  }
478  prev_ = value;
479  reps_ = TVal();
480  }
481  reps_++;
482  return true;
483  }
484 
485  size_t size() const { return stream_.size() - start_size_; }
486 
487  bool commit() {
488  return stream_.put(reps_) && stream_.put(prev_) && stream_.commit();
489  }
490 };
491 
492 template <typename TVal>
494  Base128StreamReader& stream_;
495  TVal prev_;
496  TVal reps_;
497 
499  : stream_(stream)
500  , prev_()
501  , reps_() {}
502 
503  TVal next() {
504  if (reps_ == 0) {
505  reps_ = stream_.next<TVal>();
506  prev_ = stream_.next<TVal>();
507  }
508  reps_--;
509  return prev_;
510  }
511 
512  const unsigned char* pos() const { return stream_.pos(); }
513 };
514 
515 struct FcmPredictor {
516  std::vector<uint64_t> table;
517  uint64_t last_hash;
518  const uint64_t MASK_;
519 
520  FcmPredictor(size_t table_size);
521 
522  uint64_t predict_next() const;
523 
524  void update(uint64_t value);
525 };
526 
528  std::vector<uint64_t> table;
529  uint64_t last_hash;
530  uint64_t last_value;
531  const uint64_t MASK_;
532 
534  DfcmPredictor(int table_size);
535 
536  uint64_t predict_next() const;
537 
538  void update(uint64_t value);
539 };
540 
541 typedef FcmPredictor PredictorT;
542 
544  Base128StreamWriter& stream_;
545  PredictorT predictor_;
546  uint64_t prev_diff_;
547  unsigned char prev_flag_;
548  int nelements_;
549 
551 
552  bool tput(double const* values, size_t n);
553 
554  bool put(double value);
555 
556  size_t size() const;
557 
558  bool commit();
559 };
560 
562  Base128StreamReader& stream_;
563  PredictorT predictor_;
564  int flags_;
565  int iter_;
566 
568 
569  double next();
570 
571  const unsigned char* pos() const;
572 };
573 
574 
576 struct SeriesSlice {
578  aku_ParamId id;
580  aku_Timestamp* ts;
582  double* value;
584  size_t size;
586  size_t offset;
587 };
588 
589 // Old depricated functions
591 
598  static aku_Status encode_chunk(uint32_t* n_elements, aku_Timestamp* ts_begin,
599  aku_Timestamp* ts_end, ChunkWriter* writer,
600  const UncompressedChunk& data);
601 
613  static aku_Status decode_chunk(UncompressedChunk* header, const unsigned char* pbegin,
614  const unsigned char* pend, uint32_t nelements);
615 
621  static size_t compress_doubles(const std::vector<double>& input, Base128StreamWriter& wstream);
622 
629  static void decompress_doubles(Base128StreamReader& rstream, size_t numvalues,
630  std::vector<double>* output);
631 
636  static bool convert_from_chunk_order(const UncompressedChunk& header, UncompressedChunk* out);
637 
642  static bool convert_from_time_order(const UncompressedChunk& header, UncompressedChunk* out);
643 };
644 
645 
646 // Length -> RLE -> Base128
648 
649 // Base128 -> RLE -> Length
651 
652 // int64_t -> Delta -> ZigZag -> RLE -> Base128
656 
657 // Base128 -> RLE -> ZigZag -> Delta -> int64_t
661 
662 // uint64_t -> Delta -> RLE -> Base128
664 // Base128 -> RLE -> Delta -> uint64_t
666 
667 
668 namespace StorageEngine {
669 
671  enum {
672  CHUNK_SIZE = 16,
673  CHUNK_MASK = 15,
674  HEADER_SIZE = 14, // 2 (version) + 2 (nchunks) + 2 (tail size) + 8 (series id)
675  };
676  Base128StreamWriter stream_;
677  DeltaRLEWriter ts_stream_;
678  FcmStreamWriter val_stream_;
679  int write_index_;
680  aku_Timestamp ts_writebuf_[CHUNK_SIZE];
681  double val_writebuf_[CHUNK_SIZE];
682  uint16_t* nchunks_;
683  uint16_t* ntail_;
684 
686  DataBlockWriter();
687 
693  DataBlockWriter(aku_ParamId id, uint8_t* buf, int size);
694 
700  aku_Status put(aku_Timestamp ts, double value);
701 
702  size_t commit();
703 
704 private:
706  bool room_for_chunk() const;
707 };
708 
710  enum {
711  CHUNK_SIZE = 16,
712  CHUNK_MASK = 15,
713  };
714  const uint8_t* begin_;
715  Base128StreamReader stream_;
716  DeltaRLEReader ts_stream_;
717  FcmStreamReader val_stream_;
718  aku_Timestamp read_buffer_[CHUNK_SIZE];
719  uint32_t read_index_;
720 
721  DataBlockReader(uint8_t const* buf, size_t bufsize);
722 
723  std::tuple<aku_Status, aku_Timestamp, double> next();
724 
725  size_t nelements() const;
726 
727  aku_ParamId get_id() const;
728 
729  uint16_t version() const;
730 };
731 
732 } // namespace V2
733 }
Definition: compression.h:590
DataBlockWriter()
Empty c-tor. Constructs unwritable object.
Definition: compression.cpp:506
size_t offset
Current position.
Definition: compression.h:586
virtual aku_MemRange allocate()=0
TVal read_raw()
Read uncompressed value from stream.
Definition: compression.h:231
bool commit()
Commit stream.
Definition: compression.h:188
DfcmPredictor(int table_size)
C-tor. table_size should be a power of two.
Definition: compression.cpp:28
double val_writebuf_[CHUNK_SIZE]
Write buffer for timestamps.
Definition: compression.h:681
Definition: compression.h:670
Definition: compression.h:561
Definition: compression.h:398
Base 128 encoded integer.
Definition: compression.h:64
aku_Timestamp * ts
Pointer to the array of timestamps.
Definition: compression.h:580
Definition: compression.h:493
Definition: compression.h:50
Definition: compression.h:344
Definition: compression.h:292
bool put(TVal value)
Definition: compression.h:167
Definition: compression.h:324
static aku_Status encode_chunk(uint32_t *n_elements, aku_Timestamp *ts_begin, aku_Timestamp *ts_end, ChunkWriter *writer, const UncompressedChunk &data)
Definition: compression.cpp:371
Base128 encoder.
Definition: compression.h:133
Definition: compression.h:39
aku_Status put(aku_Timestamp ts, double value)
Definition: compression.cpp:535
Definition: compression.h:277
Definition: compression.h:247
static aku_Status decode_chunk(UncompressedChunk *header, const unsigned char *pbegin, const unsigned char *pend, uint32_t nelements)
Decode part of the ChunkHeader structure depending on stage and steps values. First goes list of time...
Definition: compression.cpp:426
Definition: compression.h:515
size_t size
Array size.
Definition: compression.h:584
T * allocate()
Definition: compression.h:199
bool tput(TVal const *iter, size_t n)
Definition: compression.h:152
static void decompress_doubles(Base128StreamReader &rstream, size_t numvalues, std::vector< double > *output)
Definition: compression.cpp:306
Base128 decoder.
Definition: compression.h:211
static bool convert_from_chunk_order(const UncompressedChunk &header, UncompressedChunk *out)
Definition: compression.cpp:486
Definition: compression.h:527
virtual aku_Status commit(size_t bytes_written)=0
Commit changes.
double * value
Pointer to the array of values.
Definition: compression.h:582
aku_ParamId id
Series id.
Definition: compression.h:578
Definition: compression.h:543
const unsigned char * get(const unsigned char *begin, const unsigned char *end)
Definition: compression.h:79
unsigned char * put(unsigned char *begin, const unsigned char *end) const
Definition: compression.h:104
std::vector< aku_Timestamp > timestamps
Definition: compression.h:45
uint16_t * nchunks_
Write buffer for values.
Definition: compression.h:682
SeriesSlice represents consiquent data points from one series.
Definition: compression.h:576
static size_t compress_doubles(const std::vector< double > &input, Base128StreamWriter &wstream)
Definition: compression.cpp:209
Definition: compression.h:709
Definition: compression.h:427
static bool convert_from_time_order(const UncompressedChunk &header, UncompressedChunk *out)
Definition: compression.cpp:495