susumu.yata
null+****@clear*****
Mon Sep 22 17:46:57 JST 2014
susumu.yata 2014-09-22 17:46:57 +0900 (Mon, 22 Sep 2014) New Revision: e4277ea4ce28ca6dcab55279e885a911dc60cbd6 https://github.com/groonga/grnxx/commit/e4277ea4ce28ca6dcab55279e885a911dc60cbd6 Message: Add OrMerger. (#66) Modified files: lib/grnxx/merger.cpp Modified: lib/grnxx/merger.cpp (+185 -1) =================================================================== --- lib/grnxx/merger.cpp 2014-09-22 17:29:31 +0900 (e33cbcb) +++ lib/grnxx/merger.cpp 2014-09-22 17:46:57 +0900 (bcd51ec) @@ -146,6 +146,188 @@ bool AndMerger::finish(Error *error) { return true; } +// -- OrMerger -- + +class OrMerger : public Merger { + public: + ~OrMerger() {} + + static unique_ptr<Merger> create(Error *error, const MergerOptions &options); + + bool reset(Error *error, + Array<Record> *input_records_1, + Array<Record> *input_records_2, + Array<Record> *output_records); + + bool finish(Error *error); + + private: + Array<Record> *input_records_1_; + Array<Record> *input_records_2_; + Array<Record> *output_records_; + MergerOperatorType operator_type_; + Int offset_; + Int limit_; + + OrMerger(MergerOperatorType operator_type, Int offset, Int limit) + : Merger(), + input_records_1_(nullptr), + input_records_2_(nullptr), + output_records_(nullptr), + operator_type_(operator_type), + offset_(offset), + limit_(limit) {} +}; + +unique_ptr<Merger> OrMerger::create(Error *error, + const MergerOptions &options) { + unique_ptr<Merger> merger( + new (nothrow) OrMerger(options.operator_type, + options.offset, + options.limit)); + if (!merger) { + GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); + return nullptr; + } + return merger; +} + +bool OrMerger::reset(Error *, + Array<Record> *input_records_1, + Array<Record> *input_records_2, + Array<Record> *output_records) { + input_records_1_ = input_records_1; + input_records_2_ = input_records_2; + output_records_ = output_records; + return true; +} + +bool OrMerger::finish(Error *error) { + // Create a hash table from the smaller input. + Array<Record> *filter_records; + Array<Record> *stream_records; + if (input_records_1_->size() < input_records_2_->size()) { + filter_records = input_records_1_; + stream_records = input_records_2_; + } else { + filter_records = input_records_2_; + stream_records = input_records_1_; + } + std::unordered_map<Int, Float> filter; + for (Int i = 0; i < filter_records->size(); ++i) try { + filter[filter_records->get_row_id(i)] = filter_records->get_score(i); + } catch (...) { + GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); + return false; + } + + // Filter the stream (the larger input) with the hash table. + const MergerOperatorType operator_type = operator_type_; + const bool stream_is_1 = stream_records == input_records_1_; + for (Int i = 0; i < stream_records->size(); ++i) { + auto it = filter.find(stream_records->get_row_id(i)); + if (it == filter.end()) { + Record record; + record.row_id = stream_records->get_row_id(i); + switch (operator_type) { + case PLUS_MERGER_OPERATOR: { + record.score = stream_records->get_score(i); + break; + } + case MINUS_MERGER_OPERATOR: { + record.score = stream_records->get_score(i); + if (!stream_is_1) { + record.score = -record.score; + } + break; + } + case MULTIPLICATION_MERGER_OPERATOR: { + // TODO: I'm not sure if stream_records->get_score(i) should be used? + record.score = 0.0; + break; + } + case LHS_MERGER_OPERATOR: { + if (stream_is_1) { + record.score = stream_records->get_score(i); + } else { + record.score = 0.0; + } + break; + } + case RHS_MERGER_OPERATOR: { + if (stream_is_1) { + record.score = 0.0; + } else { + record.score = stream_records->get_score(i); + } + break; + } + case ZERO_MERGER_OPERATOR: { + record.score = 0.0; + break; + } + } + if (!output_records_->push_back(error, record)) { + return false; + } + } else { + switch (operator_type) { + case PLUS_MERGER_OPERATOR: { + it->second += stream_records->get_score(i); + break; + } + case MINUS_MERGER_OPERATOR: { + if (stream_is_1) { + it->second = stream_records->get_score(i) - it->second; + } else { + it->second -= stream_records->get_score(i); + } + } + case MULTIPLICATION_MERGER_OPERATOR: { + it->second *= stream_records->get_score(i); + break; + } + case LHS_MERGER_OPERATOR: { + if (stream_is_1) { + it->second = stream_records->get_score(i); + } + break; + } + case RHS_MERGER_OPERATOR: { + if (!stream_is_1) { + it->second = stream_records->get_score(i); + } + break; + } + case ZERO_MERGER_OPERATOR: { + it->second = 0.0; + break; + } + } + } + } + + for (auto it : filter) { + if (!output_records_->push_back(error, Record(it.first, it.second))) { + return false; + } + } + + // Remove out-of-range records. + if (offset_ > 0) { + for (Int i = offset_; i < output_records_->size(); ++i) { + output_records_->set(i - offset_, output_records_->get(i)); + } + output_records_->resize(nullptr, output_records_->size() - offset_); + } + if (limit_ < output_records_->size()) { + output_records_->resize(nullptr, limit_); + } + input_records_1_->clear(); + input_records_2_->clear(); + return true; +} + // -- Merger -- Merger::Merger() {} @@ -157,7 +339,9 @@ unique_ptr<Merger> Merger::create(Error *error, const MergerOptions &options) { case AND_MERGER: { return AndMerger::create(error, options); } - case OR_MERGER: + case OR_MERGER: { + return OrMerger::create(error, options); + } case XOR_MERGER: case MINUS_MERGER: case LHS_MERGER: -------------- next part -------------- HTML����������������������������... Download