susumu.yata
null+****@clear*****
Mon Sep 22 16:07:22 JST 2014
susumu.yata 2014-09-22 16:07:22 +0900 (Mon, 22 Sep 2014) New Revision: 98320505e5ef85cf649d382e7553987670c5e61b https://github.com/groonga/grnxx/commit/98320505e5ef85cf649d382e7553987670c5e61b Message: Update Merger and add an implementation named AndMerger. (#64, #66) Added files: include/grnxx/merger.hpp lib/grnxx/merger.cpp Modified files: include/grnxx/Makefile.am include/grnxx/types/options.hpp lib/grnxx/Makefile.am lib/grnxx/types.cpp Modified: include/grnxx/Makefile.am (+1 -0) =================================================================== --- include/grnxx/Makefile.am 2014-09-22 11:26:58 +0900 (96b0ac9) +++ include/grnxx/Makefile.am 2014-09-22 16:07:22 +0900 (b3249c1) @@ -8,6 +8,7 @@ pkginclude_HEADERS = \ expression.hpp \ index.hpp \ library.hpp \ + merger.hpp \ pipeline.hpp \ sorter.hpp \ table.hpp \ Added: include/grnxx/merger.hpp (+66 -0) 100644 =================================================================== --- /dev/null +++ include/grnxx/merger.hpp 2014-09-22 16:07:22 +0900 (2cb8f7b) @@ -0,0 +1,66 @@ +#ifndef GRNXX_MERGER_HPP +#define GRNXX_MERGER_HPP + +#include "grnxx/types.hpp" + +namespace grnxx { + +class Merger { + public: + Merger(); + virtual ~Merger(); + + // Create an object for merging record arrays. + // + // On success, returns a poitner to the merger. + // On failure, returns nullptr and stores error information into "*error" if + // "error" != nullptr. + static unique_ptr<Merger> create( + Error *error, + const MergerOptions &options = MergerOptions()); + + // Set the target record sets. + // + // Aborts merging the old record sets and starts merging the new record sets. + // + // On success, returns true. + // On failure, returns false and stores error information into "*error" if + // "error" != nullptr. + virtual bool reset(Error *error, + Array<Record> *input_records_1, + Array<Record> *input_records_2, + Array<Record> *output_records) = 0; + + // Progress merging. + // + // On success, returns true. + // On failure, returns false and stores error information into "*error" if + // "error" != nullptr. + virtual bool progress(Error *error) = 0; + + // Finish merging. + // + // Assumes that all the records are ready. + // Leaves only the result records if offset and limit are specified. + // + // On success, returns true. + // On failure, returns false and stores error information into "*error" if + // "error" != nullptr. + virtual bool finish(Error *error) = 0; + + // Merge records. + // + // Calls reset() and finish() to merge records. + // + // On success, returns true. + // On failure, returns false and stores error information into "*error" if + // "error" != nullptr. + virtual bool merge(Error *error, + Array<Record> *input_records_1, + Array<Record> *input_records_2, + Array<Record> *output_records) = 0; +}; + +} // namespace grnxx + +#endif // GRNXX_MERGER_HPP Modified: include/grnxx/types/options.hpp (+7 -0) =================================================================== --- include/grnxx/types/options.hpp 2014-09-22 11:26:58 +0900 (e8d8db5) +++ include/grnxx/types/options.hpp 2014-09-22 16:07:22 +0900 (34a8192) @@ -63,6 +63,13 @@ struct MergerOptions { // How to merge scores. MergerOperatorType operator_type; + // The first "offset" records are skipped (default: 0). + Int offset; + + // At most "limit" records are returned + // (default: numeric_limits<Int>::max()). + Int limit; + MergerOptions(); }; Modified: lib/grnxx/Makefile.am (+1 -0) =================================================================== --- lib/grnxx/Makefile.am 2014-09-22 11:26:58 +0900 (8d7bf41) +++ lib/grnxx/Makefile.am 2014-09-22 16:07:22 +0900 (6d72070) @@ -17,6 +17,7 @@ libgrnxx_la_SOURCES = \ expression.cpp \ index.cpp \ library.cpp \ + merger.cpp \ name.cpp \ pipeline.cpp \ sorter.cpp \ Added: lib/grnxx/merger.cpp (+190 -0) 100644 =================================================================== --- /dev/null +++ lib/grnxx/merger.cpp 2014-09-22 16:07:22 +0900 (b28b318) @@ -0,0 +1,190 @@ +#include "grnxx/merger.hpp" + +#include <unordered_map> + +namespace grnxx { + +// -- AndMerger -- + +class AndMerger : public Merger { + public: + ~AndMerger() {} + + static unique_ptr<Merger> create(Error *error, const MergerOptions &options); + + bool reset(Error *error, + Array<Record> *input_records_1, + Array<Record> *input_records_2, + Array<Record> *output_records); + + bool progress(Error *error); + + bool finish(Error *error); + + bool merge(Error *error, + Array<Record> *input_records_1, + Array<Record> *input_records_2, + Array<Record> *output_records); + + private: + Array<Record> *input_records_1_; + Array<Record> *input_records_2_; + Array<Record> *output_records_; + MergerOperatorType operator_type_; + Int offset_; + Int limit_; + + AndMerger(MergerOperatorType operator_type, Int offset, Int limit) + : Merger(), + input_records_1_(nullptr), + input_records_2_(nullptr), + output_records_(nullptr), + operator_type_(operator_type), + offset_(offset), + limit_(limit) {} +}; + +unique_ptr<Merger> AndMerger::create(Error *error, + const MergerOptions &options) { + unique_ptr<Merger> merger( + new (nothrow) AndMerger(options.operator_type, + options.offset, + options.limit)); + if (!merger) { + GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); + return nullptr; + } + return merger; +} + +bool AndMerger::reset(Error *, + Array<Record> *input_records_1, + Array<Record> *input_records_2, + Array<Record> *output_records) { + input_records_1_ = input_records_1; + input_records_2_ = input_records_2; + output_records_ = output_records; + return true; +} + +bool AndMerger::progress(Error *) { + // TODO: Incremental merging is not supported yet. + return true; +} + +bool AndMerger::finish(Error *error) { + // Create a hash table from the smaller input. + Array<Record> *filter_records; + Array<Record> *stream_records; + if (input_records_1_->size() < input_records_2_->size()) { + filter_records = input_records_1_; + stream_records = input_records_2_; + } else { + filter_records = input_records_2_; + stream_records = input_records_1_; + } + std::unordered_map<Int, Float> filter; + for (Int i = 0; i < filter_records->size(); ++i) try { + filter[filter_records->get_row_id(i)] = filter_records->get_score(i); + } catch (...) { + GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); + return false; + } + + // Filter the stream (the larger input) with the hash table. + const MergerOperatorType operator_type = operator_type_; + const bool stream_is_1 = stream_records == input_records_1_; + for (Int i = 0; i < stream_records->size(); ++i) { + auto it = filter.find(stream_records->get_row_id(i)); + if (it != filter.end()) { + Record record; + record.row_id = it->first; + switch (operator_type) { + case PLUS_MERGER_OPERATOR: { + record.score = stream_records->get_score(i) + it->second; + break; + } + case MINUS_MERGER_OPERATOR: { + if (stream_is_1) { + record.score = stream_records->get_score(i) - it->second; + } else { + record.score = it->second - stream_records->get_score(i); + } + } + case MULTIPLICATION_MERGER_OPERATOR: { + record.score = stream_records->get_score(i) * it->second; + break; + } + case LHS_MERGER_OPERATOR: { + if (stream_is_1) { + record.score = stream_records->get_score(i); + } else { + record.score = it->second; + } + break; + } + case RHS_MERGER_OPERATOR: { + if (stream_is_1) { + record.score = it->second; + } else { + record.score = stream_records->get_score(i); + } + break; + } + case ZERO_MERGER_OPERATOR: { + record.score = 0.0; + break; + } + } + if (!output_records_->push_back(error, record)) { + return false; + } + } + } + + // Remove out-of-range records. + if (offset_ > 0) { + for (Int i = offset_; i < output_records_->size(); ++i) { + output_records_->set(i - offset_, output_records_->get(i)); + } + output_records_->resize(nullptr, output_records_->size() - offset_); + } + if (limit_ < output_records_->size()) { + output_records_->resize(nullptr, limit_); + } + return true; +} + +bool AndMerger::merge(Error *error, + Array<Record> *input_records_1, + Array<Record> *input_records_2, + Array<Record> *output_records) { + if (!reset(error, input_records_1, input_records_2, output_records)) { + return false; + } + return finish(error); +} + +// -- Merger -- + +Merger::Merger() {} + +Merger::~Merger() {} + +unique_ptr<Merger> Merger::create(Error *error, const MergerOptions &options) { + switch (options.type) { + case AND_MERGER: { + return AndMerger::create(error, options); + } + case OR_MERGER: + case XOR_MERGER: + case MINUS_MERGER: + case LHS_MERGER: + case RHS_MERGER: { + GRNXX_ERROR_SET(error, NOT_SUPPORTED_YET, "Not supported yet"); + return nullptr; + } + } +} + +} // namespace grnxx Modified: lib/grnxx/types.cpp (+3 -1) =================================================================== --- lib/grnxx/types.cpp 2014-09-22 11:26:58 +0900 (d617bd9) +++ lib/grnxx/types.cpp 2014-09-22 16:07:22 +0900 (636a24e) @@ -62,7 +62,9 @@ SorterOptions::SorterOptions() MergerOptions::MergerOptions() : type(AND_MERGER), - operator_type(PLUS_MERGER_OPERATOR) {} + operator_type(PLUS_MERGER_OPERATOR), + offset(0), + limit(numeric_limits<Int>::max()) {} PipelineOptions::PipelineOptions() {} -------------- next part -------------- HTML����������������������������... Download