susumu.yata
null+****@clear*****
Tue Aug 13 14:25:53 JST 2013
susumu.yata 2013-08-13 14:25:53 +0900 (Tue, 13 Aug 2013) New Revision: 81fbbecfecc97b8cad3445284766a22716860c84 https://github.com/groonga/grnxx/commit/81fbbecfecc97b8cad3445284766a22716860c84 Message: Add grnxx::map::Pool. Added files: lib/grnxx/map/pool.cpp lib/grnxx/map/pool.hpp Modified files: lib/grnxx/map/Makefile.am Modified: lib/grnxx/map/Makefile.am (+2 -0) =================================================================== --- lib/grnxx/map/Makefile.am 2013-08-07 16:59:59 +0900 (ee16a3d) +++ lib/grnxx/map/Makefile.am 2013-08-13 14:25:53 +0900 (a841299) @@ -11,6 +11,7 @@ libgrnxx_map_la_SOURCES = \ hash_table.cpp \ key_pool.cpp \ patricia.cpp \ + pool.cpp \ scanner_impl.cpp libgrnxx_map_includedir = ${includedir}/grnxx/map @@ -26,4 +27,5 @@ libgrnxx_map_include_HEADERS = \ helper.hpp \ key_pool.hpp \ patricia.hpp \ + pool.hpp \ scanner_impl.hpp Added: lib/grnxx/map/pool.cpp (+381 -0) 100644 =================================================================== --- /dev/null +++ lib/grnxx/map/pool.cpp 2013-08-13 14:25:53 +0900 (65b190e) @@ -0,0 +1,381 @@ +/* + Copyright (C) 2012-2013 Brazil, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ +#include "grnxx/map/pool.hpp" + +#include "grnxx/exception.hpp" +#include "grnxx/geo_point.hpp" +#include "grnxx/intrinsic.hpp" +#include "grnxx/lock.hpp" +#include "grnxx/logger.hpp" +#include "grnxx/map.hpp" +#include "grnxx/storage.hpp" + +namespace grnxx { +namespace map { +namespace { + +constexpr uint64_t MIN_SIZE = POOL_UNIT_SIZE; +constexpr uint64_t MIN_TABLE_SIZE = 1 << 10; + +constexpr uint64_t INVALID_UNIT_ID = ~0ULL; + +} // namespace + +PoolHeader::PoolHeader() + : max_key_id(-1), + num_keys(0), + size(0), + latest_available_unit_id(INVALID_UNIT_ID), + page_storage_node_id(STORAGE_INVALID_NODE_ID), + mutex() {} + +template <typename T> +Pool<T>::Pool() + : storage_(nullptr), + storage_node_id_(STORAGE_INVALID_NODE_ID), + header_(nullptr), + pages_(), + table_(nullptr), + size_(0), + table_size_(0), + queue_() {} + +template <typename T> +Pool<T>::~Pool() {} + +template <typename T> +Pool<T> *Pool<T>::create(Storage *storage, uint32_t storage_node_id) { + if (!storage) { + GRNXX_ERROR() << "invalid argument: storage == nullptr"; + throw LogicError(); + } + std::unique_ptr<Pool> pool(new (std::nothrow) Pool); + if (!pool) { + GRNXX_ERROR() << "new grnxx::map::Pool failed"; + throw MemoryError(); + } + pool->create_pool(storage, storage_node_id); + return pool.release(); +} + +template <typename T> +Pool<T> *Pool<T>::open(Storage *storage, uint32_t storage_node_id) { + if (!storage) { + GRNXX_ERROR() << "invalid argument: storage == nullptr"; + throw LogicError(); + } + std::unique_ptr<Pool> pool(new (std::nothrow) Pool); + if (!pool) { + GRNXX_ERROR() << "new grnxx::map::Pool failed"; + throw MemoryError(); + } + pool->open_pool(storage, storage_node_id); + return pool.release(); +} + +template <typename T> +void Pool<T>::unlink(Storage *storage, uint32_t storage_node_id) { + std::unique_ptr<Pool> pool(Pool::open(storage, storage_node_id)); + storage->unlink_node(storage_node_id); +} + +template <typename T> +void Pool<T>::unset(int64_t key_id) { + void * const page = get_page(key_id); + key_id %= PAGE_SIZE; + const uint64_t unit_id = key_id / UNIT_SIZE; + const uint64_t unit_bit = 1ULL << (key_id % UNIT_SIZE); + Unit * const unit = static_cast<Unit *>(page) - unit_id - 1; + if (~unit->bits & unit_bit) { + GRNXX_ERROR() << "not found: key_id = " << key_id; + throw LogicError(); + } + if (unit->bits == ~uint64_t(0)) { + unit->next_available_unit_id = header_->latest_available_unit_id; + header_->latest_available_unit_id = unit_id; + } + unit->bits &= ~unit_bit; + --header_->num_keys; +} + +template <typename T> +void Pool<T>::reset(int64_t key_id, KeyArg dest_key) { + void * const page = get_page(key_id); + key_id %= PAGE_SIZE; + const Unit * const unit = + static_cast<const Unit *>(page) - (key_id / UNIT_SIZE) - 1; + if (~unit->bits & (1ULL << (key_id % UNIT_SIZE))) { + GRNXX_ERROR() << "not found: key_id = " << key_id; + throw LogicError(); + } + static_cast<T *>(page)[key_id] = dest_key; +} + +template <typename T> +int64_t Pool<T>::add(KeyArg key) { + if (header_->latest_available_unit_id == INVALID_UNIT_ID) { + const int64_t next_key_id = header_->max_key_id + 1; + if (next_key_id > MAX_KEY_ID) { + GRNXX_ERROR() << "pool is full: next_key_id = " << next_key_id + << ", max_key_id = " << MAX_KEY_ID; + throw LogicError(); + } + void * const page = reserve_page(next_key_id); + const uint64_t key_id = next_key_id % PAGE_SIZE; + Unit * const unit = static_cast<Unit *>(page) - (key_id / UNIT_SIZE) - 1; + unit->bits = 1; + unit->next_available_unit_id = INVALID_UNIT_ID; + header_->latest_available_unit_id = next_key_id / UNIT_SIZE; + static_cast<T *>(page)[key_id] = key; + header_->max_key_id = next_key_id; + ++header_->num_keys; + return next_key_id; + } else { + const uint64_t unit_id = header_->latest_available_unit_id; + void * const page = get_page(unit_id * UNIT_SIZE); + Unit * const unit = + static_cast<Unit *>(page) - (unit_id % (PAGE_SIZE / UNIT_SIZE)) - 1; + const uint8_t unit_bit_id = bit_scan_forward(~unit->bits); + const int64_t next_key_id = (unit_id * UNIT_SIZE) + unit_bit_id; + if (next_key_id > MAX_KEY_ID) { + GRNXX_ERROR() << "pool is full: next_key_id = " << next_key_id + << ", max_key_id = " << MAX_KEY_ID; + throw LogicError(); + } + unit->bits |= 1ULL << unit_bit_id; + if (unit->bits == ~uint64_t(0)) { + header_->latest_available_unit_id = unit->next_available_unit_id; + } + static_cast<T *>(page)[next_key_id % PAGE_SIZE] = key; + if (next_key_id > header_->max_key_id) { + header_->max_key_id = next_key_id; + } + ++header_->num_keys; + return next_key_id; + } +} + +template <typename T> +void Pool<T>::defrag() { + // Nothing to do. +} + +template <typename T> +void Pool<T>::create_pool(Storage *storage, uint32_t storage_node_id) { + storage_ = storage; + StorageNode storage_node = + storage->create_node(storage_node_id, sizeof(Header)); + storage_node_id_ = storage_node.id(); + try { + header_ = static_cast<Header *>(storage_node.body()); + *header_ = Header(); + } catch (...) { + storage->unlink_node(storage_node_id_); + throw; + } +} + +template <typename T> +void Pool<T>::open_pool(Storage *storage, uint32_t storage_node_id) { + storage_ = storage; + StorageNode storage_node = storage->open_node(storage_node_id); + storage_node_id_ = storage_node.id(); + header_ = static_cast<Header *>(storage_node.body()); +} + +template <typename T> +void *Pool<T>::open_page(int64_t key_id) { + if (static_cast<uint64_t>(key_id) >= header_->size) { + GRNXX_ERROR() << "invalid argument: key_id = " << key_id + << ", size = " << header_->size; + throw LogicError(); + } + Lock lock(&header_->mutex); + refresh_pool(); + const uint64_t page_id = key_id / PAGE_SIZE; + if (!pages_[page_id]) { + // Open an existing full-size page. + // Note that a small-size page is opened in refresh_pool(). + if (table_[page_id] == STORAGE_INVALID_NODE_ID) { + GRNXX_ERROR() << "not found: page_id = " << page_id; + throw LogicError(); + } + StorageNode page_node = storage_->open_node(table_[page_id]); + pages_[page_id] = + static_cast<Unit *>(page_node.body()) + (PAGE_SIZE / UNIT_SIZE); + } + return pages_[page_id]; +} + +template <typename T> +void *Pool<T>::reserve_page(int64_t key_id) { + if (static_cast<uint64_t>(key_id) >= header_->size) { + expand_pool(); + } + const uint64_t page_id = key_id / PAGE_SIZE; + if (!pages_[page_id]) { + Lock lock(&header_->mutex); + if (!pages_[page_id]) { + void *page; + if (table_[page_id] == STORAGE_INVALID_NODE_ID) { + // Create a full-size page. + // Note that a small-size page is created in expand_pool(). + const uint64_t page_node_size = + (sizeof(Unit) * (PAGE_SIZE / UNIT_SIZE)) + (sizeof(T) * PAGE_SIZE); + StorageNode page_node = + storage_->create_node(storage_node_id_, page_node_size); + table_[page_id] = page_node.id(); + page = page_node.body(); + } else { + // Open an existing full-size page. + // Note that a small-size page is opened in refresh_pool(). + StorageNode page_node = storage_->open_node(table_[page_id]); + page = page_node.body(); + } + pages_[page_id] = static_cast<Unit *>(page) + (PAGE_SIZE / UNIT_SIZE); + } + } + return pages_[page_id]; +} + +template <typename T> +void Pool<T>::expand_pool() { + Lock lock(&header_->mutex); + refresh_pool(); + uint64_t new_size = (size_ == 0) ? MIN_SIZE : (size_ * 2); + if (new_size <= PAGE_SIZE) { + // Create a small-size page. + const uint64_t page_node_size = + (sizeof(Unit) * (new_size / UNIT_SIZE)) + (sizeof(T) * new_size); + StorageNode page_node = + storage_->create_node(storage_node_id_, page_node_size); + if (size_ != 0) { + // Copy data from the current page and unlink it. + std::memcpy(static_cast<Unit *>(page_node.body()) + (size_ / UNIT_SIZE), + static_cast<Unit *>(pages_[0]) - (size_ / UNIT_SIZE), + page_node_size / 2); + try { + storage_->unlink_node(header_->page_storage_node_id); + } catch (...) { + storage_->unlink_node(page_node.id()); + throw; + } + } + header_->page_storage_node_id = page_node.id(); + } else { + // Create a table. + uint64_t new_table_size = new_size / PAGE_SIZE; + if (new_table_size < MIN_TABLE_SIZE) { + new_size = PAGE_SIZE * MIN_TABLE_SIZE; + new_table_size = MIN_TABLE_SIZE; + } + StorageNode table_node = storage_->create_node( + storage_node_id_, sizeof(uint32_t) * new_table_size); + uint32_t * const new_table = static_cast<uint32_t *>(table_node.body()); + uint64_t i; + if (table_size_ == 0) { + new_table[0] = header_->page_storage_node_id; + i = 1; + } else { + for (i = 0; i < table_size_; ++i) { + new_table[i] = table_[i]; + } + } + for ( ; i < new_table_size; ++i) { + new_table[i] = STORAGE_INVALID_NODE_ID; + } + header_->table_storage_node_id = table_node.id(); + } + header_->size = new_size; + refresh_pool(); +} + +template <typename T> +void Pool<T>::refresh_pool() { + if (size_ == header_->size) { + // Nothing to do. + return; + } + if (header_->size <= PAGE_SIZE) { + // Reopen a page because it is old. + StorageNode page_node = + storage_->open_node(header_->page_storage_node_id); + if (!pages_) { + std::unique_ptr<void *[]> new_pages(new (std::nothrow) void *[1]); + if (!new_pages) { + GRNXX_ERROR() << "new void *[] failed: size = " << 1; + throw MemoryError(); + } + new_pages[0] = + static_cast<Unit *>(page_node.body()) + (header_->size / UNIT_SIZE); + pages_.swap(new_pages); + } else { + pages_[0] = + static_cast<Unit *>(page_node.body()) + (header_->size / UNIT_SIZE); + } + } else { + // Reopen a table because it is old. + StorageNode table_node = + storage_->open_node(header_->table_storage_node_id); + uint32_t * const new_table = static_cast<uint32_t *>(table_node.body()); + const uint64_t new_table_size = header_->size / PAGE_SIZE; + std::unique_ptr<void *[]> new_pages( + new (std::nothrow) void *[new_table_size]); + if (!new_pages) { + GRNXX_ERROR() << "new void *[] failed: size = " << new_table_size; + throw MemoryError(); + } + // Initialize a new cache table. + uint64_t i = 0; + for ( ; i < table_size_; ++i) { + new_pages[i] = pages_[i]; + } + for ( ; i < new_table_size; ++i) { + new_pages[i] = nullptr; + } + pages_.swap(new_pages); + // Keep an old cache table because another thread may read it. + if (new_pages) { + try { + // TODO: Time must be added. + queue_.push(std::move(new_pages)); + } catch (...) { + GRNXX_ERROR() << "std::queue::push() failed"; + // TODO: Wrap an error. + throw; + } + } + table_ = new_table; + table_size_ = new_table_size; + } + size_ = header_->size; +} + +template class Pool<int8_t>; +template class Pool<int16_t>; +template class Pool<int32_t>; +template class Pool<int64_t>; +template class Pool<uint8_t>; +template class Pool<uint16_t>; +template class Pool<uint32_t>; +template class Pool<uint64_t>; +template class Pool<double>; +template class Pool<GeoPoint>; + +} // namespace map +} // namespace grnxx Added: lib/grnxx/map/pool.hpp (+170 -0) 100644 =================================================================== --- /dev/null +++ lib/grnxx/map/pool.hpp 2013-08-13 14:25:53 +0900 (7e4821a) @@ -0,0 +1,170 @@ +/* + Copyright (C) 2012-2013 Brazil, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ +#ifndef GRNXX_MAP_POOL_HPP +#define GRNXX_MAP_POOL_HPP + +#include "grnxx/features.hpp" + +#include <memory> +#include <queue> + +#include "grnxx/bytes.hpp" +#include "grnxx/mutex.hpp" +#include "grnxx/traits.hpp" +#include "grnxx/types.hpp" + +// FIXME: for debug. +#include "grnxx/logger.hpp" + +namespace grnxx { + +class Storage; + +namespace map { + +constexpr int64_t POOL_MIN_KEY_ID = 0; +constexpr int64_t POOL_MAX_KEY_ID = (1ULL << 40) - 1; + +constexpr uint64_t POOL_UNIT_SIZE = 64; +constexpr uint64_t POOL_PAGE_SIZE = 1ULL << 16; + +struct PoolHeader { + int64_t max_key_id; + uint64_t num_keys; + uint64_t size; + uint64_t latest_available_unit_id; + union { + uint32_t page_storage_node_id; + uint32_t table_storage_node_id; + }; + Mutex mutex; + + PoolHeader(); +}; + +struct PoolUnit { + uint64_t bits; + uint64_t next_available_unit_id; +}; + +template <typename T> +class Pool { + using Header = PoolHeader; + using Unit = PoolUnit; + + static constexpr int64_t MIN_KEY_ID = POOL_MIN_KEY_ID; + static constexpr int64_t MAX_KEY_ID = POOL_MAX_KEY_ID; + + static constexpr uint64_t UNIT_SIZE = POOL_UNIT_SIZE; + static constexpr uint64_t PAGE_SIZE = POOL_PAGE_SIZE; + + public: + using Key = typename Traits<T>::Type; + using KeyArg = typename Traits<T>::ArgumentType; + + Pool(); + ~Pool(); + + static Pool *create(Storage *storage, uint32_t storage_node_id); + static Pool *open(Storage *storage, uint32_t storage_node_id); + + static void unlink(Storage *storage, uint32_t storage_node_id); + + uint32_t storage_node_id() const { + return storage_node_id_; + } + + static constexpr int64_t min_key_id() { + return MIN_KEY_ID; + } + int64_t max_key_id() const { + return header_->max_key_id; + } + uint64_t num_keys() const { + return header_->num_keys; + } + + bool get(int64_t key_id, Key *key) { + const void * const page = get_page(key_id); + key_id %= PAGE_SIZE; + const Unit * const unit = + static_cast<const Unit *>(page) - (key_id / UNIT_SIZE) - 1; + if (~unit->bits & (1ULL << (key_id % UNIT_SIZE))) { + // Not found. + return false; + } + *key = static_cast<const T *>(page)[key_id]; + return true; + } + + Key get_key(int64_t key_id) { + const void * const page = get_page(key_id); + return static_cast<const T *>(page)[key_id % PAGE_SIZE]; + } + + bool get_bit(int64_t key_id) { + const void * const page = get_page(key_id); + key_id %= PAGE_SIZE; + const Unit * const unit = + static_cast<const Unit *>(page) - (key_id / UNIT_SIZE) - 1; + return unit->bits & (1ULL << (key_id % UNIT_SIZE)); + } + + void unset(int64_t key_id); + void reset(int64_t key_id, KeyArg dest_key); + + int64_t add(KeyArg key); + + void defrag(); + + private: + Storage *storage_; + uint32_t storage_node_id_; + Header *header_; + std::unique_ptr<void *[]> pages_; + uint32_t *table_; + uint64_t size_; + uint64_t table_size_; + // TODO: Time must be added. + std::queue<std::unique_ptr<void *[]>> queue_; + + void create_pool(Storage *storage, uint32_t storage_node_id); + void open_pool(Storage *storage, uint32_t storage_node_id); + + void *get_page(int64_t key_id) { + if (size_ != header_->size) { + return open_page(key_id); + } + void * const page = pages_[key_id / PAGE_SIZE]; + if (!page) { + return open_page(key_id); + } + return page; + } + void *open_page(int64_t key_id); + + void *reserve_page(int64_t key_id); + + void expand_pool(); + void refresh_pool(); +}; + +} // namespace map +} // namespace grnxx + +#endif // GRNXX_MAP_POOL_HPP -------------- next part -------------- HTML����������������������������... Download