[Groonga-commit] groonga/grnxx at 81fbbec [master] Add grnxx::map::Pool.

Back to archive index

susumu.yata null+****@clear*****
Tue Aug 13 14:25:53 JST 2013

susumu.yata	2013-08-13 14:25:53 +0900 (Tue, 13 Aug 2013)

  New Revision: 81fbbecfecc97b8cad3445284766a22716860c84

    Add grnxx::map::Pool.

  Added files:
  Modified files:

  Modified: lib/grnxx/map/Makefile.am (+2 -0)
--- lib/grnxx/map/Makefile.am    2013-08-07 16:59:59 +0900 (ee16a3d)
+++ lib/grnxx/map/Makefile.am    2013-08-13 14:25:53 +0900 (a841299)
@@ -11,6 +11,7 @@ libgrnxx_map_la_SOURCES =				\
 	hash_table.cpp					\
 	key_pool.cpp					\
 	patricia.cpp					\
+	pool.cpp					\
 libgrnxx_map_includedir = ${includedir}/grnxx/map
@@ -26,4 +27,5 @@ libgrnxx_map_include_HEADERS =				\
 	helper.hpp					\
 	key_pool.hpp					\
 	patricia.hpp					\
+	pool.hpp					\

  Added: lib/grnxx/map/pool.cpp (+381 -0) 100644
--- /dev/null
+++ lib/grnxx/map/pool.cpp    2013-08-13 14:25:53 +0900 (65b190e)
@@ -0,0 +1,381 @@
+  Copyright (C) 2012-2013  Brazil, Inc.
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  Lesser General Public License for more details.
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+#include "grnxx/map/pool.hpp"
+#include "grnxx/exception.hpp"
+#include "grnxx/geo_point.hpp"
+#include "grnxx/intrinsic.hpp"
+#include "grnxx/lock.hpp"
+#include "grnxx/logger.hpp"
+#include "grnxx/map.hpp"
+#include "grnxx/storage.hpp"
+namespace grnxx {
+namespace map {
+namespace {
+constexpr uint64_t MIN_SIZE        = POOL_UNIT_SIZE;
+constexpr uint64_t MIN_TABLE_SIZE  = 1 << 10;
+constexpr uint64_t INVALID_UNIT_ID = ~0ULL;
+}  // namespace
+    : max_key_id(-1),
+      num_keys(0),
+      size(0),
+      latest_available_unit_id(INVALID_UNIT_ID),
+      page_storage_node_id(STORAGE_INVALID_NODE_ID),
+      mutex() {}
+template <typename T>
+    : storage_(nullptr),
+      storage_node_id_(STORAGE_INVALID_NODE_ID),
+      header_(nullptr),
+      pages_(),
+      table_(nullptr),
+      size_(0),
+      table_size_(0),
+      queue_() {}
+template <typename T>
+Pool<T>::~Pool() {}
+template <typename T>
+Pool<T> *Pool<T>::create(Storage *storage, uint32_t storage_node_id) {
+  if (!storage) {
+    GRNXX_ERROR() << "invalid argument: storage == nullptr";
+    throw LogicError();
+  }
+  std::unique_ptr<Pool> pool(new (std::nothrow) Pool);
+  if (!pool) {
+    GRNXX_ERROR() << "new grnxx::map::Pool failed";
+    throw MemoryError();
+  }
+  pool->create_pool(storage, storage_node_id);
+  return pool.release();
+template <typename T>
+Pool<T> *Pool<T>::open(Storage *storage, uint32_t storage_node_id) {
+  if (!storage) {
+    GRNXX_ERROR() << "invalid argument: storage == nullptr";
+    throw LogicError();
+  }
+  std::unique_ptr<Pool> pool(new (std::nothrow) Pool);
+  if (!pool) {
+    GRNXX_ERROR() << "new grnxx::map::Pool failed";
+    throw MemoryError();
+  }
+  pool->open_pool(storage, storage_node_id);
+  return pool.release();
+template <typename T>
+void Pool<T>::unlink(Storage *storage, uint32_t storage_node_id) {
+  std::unique_ptr<Pool> pool(Pool::open(storage, storage_node_id));
+  storage->unlink_node(storage_node_id);
+template <typename T>
+void Pool<T>::unset(int64_t key_id) {
+  void * const page = get_page(key_id);
+  key_id %= PAGE_SIZE;
+  const uint64_t unit_id = key_id / UNIT_SIZE;
+  const uint64_t unit_bit = 1ULL << (key_id % UNIT_SIZE);
+  Unit * const unit = static_cast<Unit *>(page) - unit_id - 1;
+  if (~unit->bits & unit_bit) {
+    GRNXX_ERROR() << "not found: key_id = " << key_id;
+    throw LogicError();
+  }
+  if (unit->bits == ~uint64_t(0)) {
+    unit->next_available_unit_id = header_->latest_available_unit_id;
+    header_->latest_available_unit_id = unit_id;
+  }
+  unit->bits &= ~unit_bit;
+  --header_->num_keys;
+template <typename T>
+void Pool<T>::reset(int64_t key_id, KeyArg dest_key) {
+  void * const page = get_page(key_id);
+  key_id %= PAGE_SIZE;
+  const Unit * const unit =
+      static_cast<const Unit *>(page) - (key_id / UNIT_SIZE) - 1;
+  if (~unit->bits & (1ULL << (key_id % UNIT_SIZE))) {
+    GRNXX_ERROR() << "not found: key_id = " << key_id;
+    throw LogicError();
+  }
+  static_cast<T *>(page)[key_id] = dest_key;
+template <typename T>
+int64_t Pool<T>::add(KeyArg key) {
+  if (header_->latest_available_unit_id == INVALID_UNIT_ID) {
+    const int64_t next_key_id = header_->max_key_id + 1;
+    if (next_key_id > MAX_KEY_ID) {
+      GRNXX_ERROR() << "pool is full: next_key_id = " << next_key_id
+                    << ", max_key_id = " << MAX_KEY_ID;
+      throw LogicError();
+    }
+    void * const page = reserve_page(next_key_id);
+    const uint64_t key_id = next_key_id % PAGE_SIZE;
+    Unit * const unit = static_cast<Unit *>(page) - (key_id / UNIT_SIZE) - 1;
+    unit->bits = 1;
+    unit->next_available_unit_id = INVALID_UNIT_ID;
+    header_->latest_available_unit_id = next_key_id / UNIT_SIZE;
+    static_cast<T *>(page)[key_id] = key;
+    header_->max_key_id = next_key_id;
+    ++header_->num_keys;
+    return next_key_id;
+  } else {
+    const uint64_t unit_id = header_->latest_available_unit_id;
+    void * const page = get_page(unit_id * UNIT_SIZE);
+    Unit * const unit =
+        static_cast<Unit *>(page) - (unit_id % (PAGE_SIZE / UNIT_SIZE)) - 1;
+    const uint8_t unit_bit_id = bit_scan_forward(~unit->bits);
+    const int64_t next_key_id = (unit_id * UNIT_SIZE) + unit_bit_id;
+    if (next_key_id > MAX_KEY_ID) {
+      GRNXX_ERROR() << "pool is full: next_key_id = " << next_key_id
+                    << ", max_key_id = " << MAX_KEY_ID;
+      throw LogicError();
+    }
+    unit->bits |= 1ULL << unit_bit_id;
+    if (unit->bits == ~uint64_t(0)) {
+      header_->latest_available_unit_id = unit->next_available_unit_id;
+    }
+    static_cast<T *>(page)[next_key_id % PAGE_SIZE] = key;
+    if (next_key_id > header_->max_key_id) {
+      header_->max_key_id = next_key_id;
+    }
+    ++header_->num_keys;
+    return next_key_id;
+  }
+template <typename T>
+void Pool<T>::defrag() {
+  // Nothing to do.
+template <typename T>
+void Pool<T>::create_pool(Storage *storage, uint32_t storage_node_id) {
+  storage_ = storage;
+  StorageNode storage_node =
+      storage->create_node(storage_node_id, sizeof(Header));
+  storage_node_id_ = storage_node.id();
+  try {
+    header_ = static_cast<Header *>(storage_node.body());
+    *header_ = Header();
+  } catch (...) {
+    storage->unlink_node(storage_node_id_);
+    throw;
+  }
+template <typename T>
+void Pool<T>::open_pool(Storage *storage, uint32_t storage_node_id) {
+  storage_ = storage;
+  StorageNode storage_node = storage->open_node(storage_node_id);
+  storage_node_id_ = storage_node.id();
+  header_ = static_cast<Header *>(storage_node.body());
+template <typename T>
+void *Pool<T>::open_page(int64_t key_id) {
+  if (static_cast<uint64_t>(key_id) >= header_->size) {
+    GRNXX_ERROR() << "invalid argument: key_id = " << key_id
+                  << ", size = " << header_->size;
+    throw LogicError();
+  }
+  Lock lock(&header_->mutex);
+  refresh_pool();
+  const uint64_t page_id = key_id / PAGE_SIZE;
+  if (!pages_[page_id]) {
+    // Open an existing full-size page.
+    // Note that a small-size page is opened in refresh_pool().
+    if (table_[page_id] == STORAGE_INVALID_NODE_ID) {
+      GRNXX_ERROR() << "not found: page_id = " << page_id;
+      throw LogicError();
+    }
+    StorageNode page_node = storage_->open_node(table_[page_id]);
+    pages_[page_id] =
+        static_cast<Unit *>(page_node.body()) + (PAGE_SIZE / UNIT_SIZE);
+  }
+  return pages_[page_id];
+template <typename T>
+void *Pool<T>::reserve_page(int64_t key_id) {
+  if (static_cast<uint64_t>(key_id) >= header_->size) {
+    expand_pool();
+  }
+  const uint64_t page_id = key_id / PAGE_SIZE;
+  if (!pages_[page_id]) {
+    Lock lock(&header_->mutex);
+    if (!pages_[page_id]) {
+      void *page;
+      if (table_[page_id] == STORAGE_INVALID_NODE_ID) {
+        // Create a full-size page.
+        // Note that a small-size page is created in expand_pool().
+        const uint64_t page_node_size =
+            (sizeof(Unit) * (PAGE_SIZE / UNIT_SIZE)) + (sizeof(T) * PAGE_SIZE);
+        StorageNode page_node =
+            storage_->create_node(storage_node_id_, page_node_size);
+        table_[page_id] = page_node.id();
+        page = page_node.body();
+      } else {
+        // Open an existing full-size page.
+        // Note that a small-size page is opened in refresh_pool().
+        StorageNode page_node = storage_->open_node(table_[page_id]);
+        page = page_node.body();
+      }
+      pages_[page_id] = static_cast<Unit *>(page) + (PAGE_SIZE / UNIT_SIZE);
+    }
+  }
+  return pages_[page_id];
+template <typename T>
+void Pool<T>::expand_pool() {
+  Lock lock(&header_->mutex);
+  refresh_pool();
+  uint64_t new_size = (size_ == 0) ? MIN_SIZE : (size_ * 2);
+  if (new_size <= PAGE_SIZE) {
+    // Create a small-size page.
+    const uint64_t page_node_size =
+        (sizeof(Unit) * (new_size / UNIT_SIZE)) + (sizeof(T) * new_size);
+    StorageNode page_node =
+        storage_->create_node(storage_node_id_, page_node_size);
+    if (size_ != 0) {
+      // Copy data from the current page and unlink it.
+      std::memcpy(static_cast<Unit *>(page_node.body()) + (size_ / UNIT_SIZE),
+                  static_cast<Unit *>(pages_[0]) - (size_ / UNIT_SIZE),
+                  page_node_size / 2);
+      try {
+        storage_->unlink_node(header_->page_storage_node_id);
+      } catch (...) {
+        storage_->unlink_node(page_node.id());
+        throw;
+      }
+    }
+    header_->page_storage_node_id = page_node.id();
+  } else {
+    // Create a table.
+    uint64_t new_table_size = new_size / PAGE_SIZE;
+    if (new_table_size < MIN_TABLE_SIZE) {
+      new_size = PAGE_SIZE * MIN_TABLE_SIZE;
+      new_table_size = MIN_TABLE_SIZE;
+    }
+    StorageNode table_node = storage_->create_node(
+        storage_node_id_, sizeof(uint32_t) * new_table_size);
+    uint32_t * const new_table = static_cast<uint32_t *>(table_node.body());
+    uint64_t i;
+    if (table_size_ == 0) {
+      new_table[0] = header_->page_storage_node_id;
+      i = 1;
+    } else {
+      for (i = 0; i < table_size_; ++i) {
+        new_table[i] = table_[i];
+      }
+    }
+    for ( ; i < new_table_size; ++i) {
+      new_table[i] = STORAGE_INVALID_NODE_ID;
+    }
+    header_->table_storage_node_id = table_node.id();
+  }
+  header_->size = new_size;
+  refresh_pool();
+template <typename T>
+void Pool<T>::refresh_pool() {
+  if (size_ == header_->size) {
+    // Nothing to do.
+    return;
+  }
+  if (header_->size <= PAGE_SIZE) {
+    // Reopen a page because it is old.
+    StorageNode page_node =
+        storage_->open_node(header_->page_storage_node_id);
+    if (!pages_) {
+      std::unique_ptr<void *[]> new_pages(new (std::nothrow) void *[1]);
+      if (!new_pages) {
+        GRNXX_ERROR() << "new void *[] failed: size = " << 1;
+        throw MemoryError();
+      }
+      new_pages[0] =
+          static_cast<Unit *>(page_node.body()) + (header_->size / UNIT_SIZE);
+      pages_.swap(new_pages);
+    } else {
+      pages_[0] =
+          static_cast<Unit *>(page_node.body()) + (header_->size / UNIT_SIZE);
+    }
+  } else {
+    // Reopen a table because it is old.
+    StorageNode table_node =
+        storage_->open_node(header_->table_storage_node_id);
+    uint32_t * const new_table = static_cast<uint32_t *>(table_node.body());
+    const uint64_t new_table_size = header_->size / PAGE_SIZE;
+    std::unique_ptr<void *[]> new_pages(
+        new (std::nothrow) void *[new_table_size]);
+    if (!new_pages) {
+      GRNXX_ERROR() << "new void *[] failed: size = " << new_table_size;
+      throw MemoryError();
+    }
+    // Initialize a new cache table.
+    uint64_t i = 0;
+    for ( ; i < table_size_; ++i) {
+      new_pages[i] = pages_[i];
+    }
+    for ( ; i < new_table_size; ++i) {
+      new_pages[i] = nullptr;
+    }
+    pages_.swap(new_pages);
+    // Keep an old cache table because another thread may read it.
+    if (new_pages) {
+      try {
+        // TODO: Time must be added.
+        queue_.push(std::move(new_pages));
+      } catch (...) {
+        GRNXX_ERROR() << "std::queue::push() failed";
+        // TODO: Wrap an error.
+        throw;
+      }
+    }
+    table_ = new_table;
+    table_size_ = new_table_size;
+  }
+  size_ = header_->size;
+template class Pool<int8_t>;
+template class Pool<int16_t>;
+template class Pool<int32_t>;
+template class Pool<int64_t>;
+template class Pool<uint8_t>;
+template class Pool<uint16_t>;
+template class Pool<uint32_t>;
+template class Pool<uint64_t>;
+template class Pool<double>;
+template class Pool<GeoPoint>;
+}  // namespace map
+}  // namespace grnxx

  Added: lib/grnxx/map/pool.hpp (+170 -0) 100644
--- /dev/null
+++ lib/grnxx/map/pool.hpp    2013-08-13 14:25:53 +0900 (7e4821a)
@@ -0,0 +1,170 @@
+  Copyright (C) 2012-2013  Brazil, Inc.
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  Lesser General Public License for more details.
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+#include "grnxx/features.hpp"
+#include <memory>
+#include <queue>
+#include "grnxx/bytes.hpp"
+#include "grnxx/mutex.hpp"
+#include "grnxx/traits.hpp"
+#include "grnxx/types.hpp"
+// FIXME: for debug.
+#include "grnxx/logger.hpp"
+namespace grnxx {
+class Storage;
+namespace map {
+constexpr int64_t POOL_MIN_KEY_ID = 0;
+constexpr int64_t POOL_MAX_KEY_ID = (1ULL << 40) - 1;
+constexpr uint64_t POOL_UNIT_SIZE = 64;
+constexpr uint64_t POOL_PAGE_SIZE = 1ULL << 16;
+struct PoolHeader {
+  int64_t max_key_id;
+  uint64_t num_keys;
+  uint64_t size;
+  uint64_t latest_available_unit_id;
+  union {
+    uint32_t page_storage_node_id;
+    uint32_t table_storage_node_id;
+  };
+  Mutex mutex;
+  PoolHeader();
+struct PoolUnit {
+  uint64_t bits;
+  uint64_t next_available_unit_id;
+template <typename T>
+class Pool {
+  using Header = PoolHeader;
+  using Unit   = PoolUnit;
+  static constexpr int64_t MIN_KEY_ID = POOL_MIN_KEY_ID;
+  static constexpr int64_t MAX_KEY_ID = POOL_MAX_KEY_ID;
+  static constexpr uint64_t UNIT_SIZE = POOL_UNIT_SIZE;
+  static constexpr uint64_t PAGE_SIZE = POOL_PAGE_SIZE;
+ public:
+  using Key = typename Traits<T>::Type;
+  using KeyArg = typename Traits<T>::ArgumentType;
+  Pool();
+  ~Pool();
+  static Pool *create(Storage *storage, uint32_t storage_node_id);
+  static Pool *open(Storage *storage, uint32_t storage_node_id);
+  static void unlink(Storage *storage, uint32_t storage_node_id);
+  uint32_t storage_node_id() const {
+    return storage_node_id_;
+  }
+  static constexpr int64_t min_key_id() {
+    return MIN_KEY_ID;
+  }
+  int64_t max_key_id() const {
+    return header_->max_key_id;
+  }
+  uint64_t num_keys() const {
+    return header_->num_keys;
+  }
+  bool get(int64_t key_id, Key *key) {
+    const void * const page = get_page(key_id);
+    key_id %= PAGE_SIZE;
+    const Unit * const unit =
+        static_cast<const Unit *>(page) - (key_id / UNIT_SIZE) - 1;
+    if (~unit->bits & (1ULL << (key_id % UNIT_SIZE))) {
+      // Not found.
+      return false;
+    }
+    *key = static_cast<const T *>(page)[key_id];
+    return true;
+  }
+  Key get_key(int64_t key_id) {
+    const void * const page = get_page(key_id);
+    return static_cast<const T *>(page)[key_id % PAGE_SIZE];
+  }
+  bool get_bit(int64_t key_id) {
+    const void * const page = get_page(key_id);
+    key_id %= PAGE_SIZE;
+    const Unit * const unit =
+        static_cast<const Unit *>(page) - (key_id / UNIT_SIZE) - 1;
+    return unit->bits & (1ULL << (key_id % UNIT_SIZE));
+  }
+  void unset(int64_t key_id);
+  void reset(int64_t key_id, KeyArg dest_key);
+  int64_t add(KeyArg key);
+  void defrag();
+ private:
+  Storage *storage_;
+  uint32_t storage_node_id_;
+  Header *header_;
+  std::unique_ptr<void *[]> pages_;
+  uint32_t *table_;
+  uint64_t size_;
+  uint64_t table_size_;
+  // TODO: Time must be added.
+  std::queue<std::unique_ptr<void *[]>> queue_;
+  void create_pool(Storage *storage, uint32_t storage_node_id);
+  void open_pool(Storage *storage, uint32_t storage_node_id);
+  void *get_page(int64_t key_id) {
+    if (size_ != header_->size) {
+      return open_page(key_id);
+    }
+    void * const page = pages_[key_id / PAGE_SIZE];
+    if (!page) {
+      return open_page(key_id);
+    }
+    return page;
+  }
+  void *open_page(int64_t key_id);
+  void *reserve_page(int64_t key_id);
+  void expand_pool();
+  void refresh_pool();
+}  // namespace map
+}  // namespace grnxx
+#endif  // GRNXX_MAP_POOL_HPP
-------------- next part --------------

More information about the Groonga-commit mailing list
Back to archive index