Akumuli
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
storage.h
1 
23 #pragma once
24 #include <atomic>
25 #include <cstddef>
26 #include <list>
27 #include <map>
28 #include <memory>
29 #include <mutex>
30 #include <queue>
31 #include <thread>
32 #include <vector>
33 
34 // APR headers
35 #include <apr.h>
36 #include <apr_mmap.h>
37 
38 #include "akumuli_def.h"
39 #include "cursor.h"
40 #include "metadatastorage.h"
41 #include "page.h"
42 #include "sequencer.h"
43 #include "seriesparser.h"
44 #include "util.h"
45 
46 #include <boost/thread.hpp>
47 
48 namespace Akumuli {
49 
54 struct Volume : std::enable_shared_from_this<Volume> {
55  MemoryMappedFile mmap_;
56  PageHeader* page_;
57  aku_Duration window_;
58  size_t max_cache_size_;
59  std::unique_ptr<Sequencer> cache_;
60  std::string file_path_;
61  aku_FineTuneParams config_;
62  aku_logger_cb_t logger_;
63  std::atomic_bool
64  is_temporary_; //< True if this is temporary volume and underlying file should be deleted
65 
67  Volume(const char* file_path, aku_FineTuneParams conf, aku_logger_cb_t logger);
68 
69  ~Volume();
70 
72  PageHeader* get_page() const;
73 
75  std::shared_ptr<Volume> safe_realloc();
76 
78  void open();
79 
81  void close();
82 
84  void flush();
85 
87  void make_readonly();
88 
90  void make_writable();
91 };
92 
95 struct Storage {
96  typedef std::mutex LockType;
97  typedef std::shared_ptr<Volume> PVolume;
98  typedef std::shared_ptr<MetadataStorage> PMetadataStorage;
99  typedef std::shared_ptr<SeriesMatcher> PSeriesMatcher;
100  typedef std::shared_ptr<ChunkCache> PCache;
101 
102  // Active volume state
103  aku_FineTuneParams config_;
104  PVolume active_volume_;
105  PageHeader* active_page_;
106  std::atomic<int> active_volume_index_;
107  aku_Duration ttl_; //< Late write limit
108  aku_Status open_error_code_; //< Open op-n error code
109  std::vector<PVolume> volumes_; //< List of all volumes
110  PMetadataStorage metadata_; //< Metadata storage
111  PSeriesMatcher matcher_; //< Series matcher
112 
113  LockType mutex_; //< Storage lock (used by worker thread)
114 
115  apr_time_t creation_time_; //< Cached metadata
116  aku_logger_cb_t logger_;
117  Rand rand_;
118  PCache cache_;
119 
121  mutable boost::thread_specific_ptr<SeriesMatcher> local_matcher_;
122 
126  Storage(const char* path, aku_FineTuneParams const& conf);
127 
132  void set_thread_local_matcher(SeriesMatcher* spool) const;
133 
135  void select_active_page();
136 
138  void prepopulate_cache(int64_t max_cache_size);
139 
140  void log_message(const char* message) const;
141 
142  void log_error(const char* message) const;
143 
144  void log_message(const char* message, uint64_t value) const;
145 
146  // Writing
147 
149  void close();
150 
154  void advance_volume_(int ix);
155 
157  aku_Status write_double(aku_ParamId param, aku_Timestamp ts, double value);
158 
159  aku_Status _write_impl(TimeSeriesValue value, aku_MemRange data);
160 
167  aku_Status series_to_param_id(const char* begin, const char* end, uint64_t* value);
168 
171  int param_id_to_series(aku_ParamId id, char* buffer, size_t buffer_size) const;
172 
173  // Reading
174 
176  void search(Caller& caller, InternalCursor* cur, const char* query) const;
177 
178  // Static interface
179 
186  static apr_status_t new_storage(const char* file_name, const char* metadata_path,
187  const char* volumes_path, int num_pages, aku_logger_cb_t logger,
188  uint64_t page_size = 0);
189 
195  static apr_status_t remove_storage(const char* file_name, aku_logger_cb_t logger);
196 
197  // Stats
198  void get_stats(aku_StorageStats* rcv_stats);
199 
200  aku_Status get_open_error() const;
201 
202  void debug_print() const;
203 };
204 }
void select_active_page()
Select page that was active last time.
Definition: storage.cpp:258
aku_Status write_double(aku_ParamId param, aku_Timestamp ts, double value)
Write double.
Definition: storage.cpp:543
Definition: sequencer.h:55
void close()
Flush all data and close volume for write until reallocation.
Definition: storage.cpp:123
void search(Caller &caller, InternalCursor *cur, const char *query) const
Search storage using cursor.
Definition: storage.cpp:401
static apr_status_t new_storage(const char *file_name, const char *metadata_path, const char *volumes_path, int num_pages, aku_logger_cb_t logger, uint64_t page_size=0)
Definition: storage.cpp:776
aku_Status series_to_param_id(const char *begin, const char *end, uint64_t *value)
Definition: storage.cpp:549
Volume(const char *file_path, aku_FineTuneParams conf, aku_logger_cb_t logger)
Create new volume stored in file.
Definition: storage.cpp:48
Definition: page.h:86
Definition: internal_cursor.h:41
Definition: util.h:154
boost::thread_specific_ptr< SeriesMatcher > local_matcher_
Local (per query) string pool.
Definition: storage.h:121
void make_readonly()
Make volume read-only.
Definition: storage.cpp:74
Definition: storage.h:54
static apr_status_t remove_storage(const char *file_name, aku_logger_cb_t logger)
Definition: storage.cpp:857
void set_thread_local_matcher(SeriesMatcher *spool) const
Definition: storage.cpp:567
Storage(const char *path, aku_FineTuneParams const &conf)
Definition: storage.cpp:191
std::shared_ptr< Volume > safe_realloc()
Reallocate space safely.
Definition: storage.cpp:86
void advance_volume_(int ix)
Definition: storage.cpp:302
void make_writable()
Make volume writeable.
Definition: storage.cpp:80
int param_id_to_series(aku_ParamId id, char *buffer, size_t buffer_size) const
Definition: storage.cpp:571
void open()
Open page for writing.
Definition: storage.cpp:118
void close()
Close db (this call should be performed by writer thread)
Definition: storage.cpp:241
Definition: storage.h:95
PageHeader * get_page() const
Get pointer to page.
Definition: storage.cpp:70
Definition: seriesparser.h:38
void flush()
Flush page.
Definition: storage.cpp:128
Definition: util.h:72
void prepopulate_cache(int64_t max_cache_size)
Prepopulate cache.
Definition: storage.cpp:282