[Groonga-commit] groonga/grnxx at 9832050 [master] Update Merger and add an implementation named AndMerger. (#64, #66)

Back to archive index

susumu.yata null+****@clear*****
Mon Sep 22 16:07:22 JST 2014


susumu.yata	2014-09-22 16:07:22 +0900 (Mon, 22 Sep 2014)

  New Revision: 98320505e5ef85cf649d382e7553987670c5e61b
  https://github.com/groonga/grnxx/commit/98320505e5ef85cf649d382e7553987670c5e61b

  Message:
    Update Merger and add an implementation named AndMerger. (#64, #66)

  Added files:
    include/grnxx/merger.hpp
    lib/grnxx/merger.cpp
  Modified files:
    include/grnxx/Makefile.am
    include/grnxx/types/options.hpp
    lib/grnxx/Makefile.am
    lib/grnxx/types.cpp

  Modified: include/grnxx/Makefile.am (+1 -0)
===================================================================
--- include/grnxx/Makefile.am    2014-09-22 11:26:58 +0900 (96b0ac9)
+++ include/grnxx/Makefile.am    2014-09-22 16:07:22 +0900 (b3249c1)
@@ -8,6 +8,7 @@ pkginclude_HEADERS =	\
 	expression.hpp	\
 	index.hpp	\
 	library.hpp	\
+	merger.hpp	\
 	pipeline.hpp	\
 	sorter.hpp	\
 	table.hpp	\

  Added: include/grnxx/merger.hpp (+66 -0) 100644
===================================================================
--- /dev/null
+++ include/grnxx/merger.hpp    2014-09-22 16:07:22 +0900 (2cb8f7b)
@@ -0,0 +1,66 @@
+#ifndef GRNXX_MERGER_HPP
+#define GRNXX_MERGER_HPP
+
+#include "grnxx/types.hpp"
+
+namespace grnxx {
+
+class Merger {
+ public:
+  Merger();
+  virtual ~Merger();
+
+  // Create an object for merging record arrays.
+  //
+  // On success, returns a poitner to the merger.
+  // On failure, returns nullptr and stores error information into "*error" if
+  // "error" != nullptr.
+  static unique_ptr<Merger> create(
+      Error *error,
+      const MergerOptions &options = MergerOptions());
+
+  // Set the target record sets.
+  //
+  // Aborts merging the old record sets and starts merging the new record sets.
+  //
+  // On success, returns true.
+  // On failure, returns false and stores error information into "*error" if
+  // "error" != nullptr.
+  virtual bool reset(Error *error,
+                     Array<Record> *input_records_1,
+                     Array<Record> *input_records_2,
+                     Array<Record> *output_records) = 0;
+
+  // Progress merging.
+  //
+  // On success, returns true.
+  // On failure, returns false and stores error information into "*error" if
+  // "error" != nullptr.
+  virtual bool progress(Error *error) = 0;
+
+  // Finish merging.
+  //
+  // Assumes that all the records are ready.
+  // Leaves only the result records if offset and limit are specified.
+  //
+  // On success, returns true.
+  // On failure, returns false and stores error information into "*error" if
+  // "error" != nullptr.
+  virtual bool finish(Error *error) = 0;
+
+  // Merge records.
+  //
+  // Calls reset() and finish() to merge records.
+  //
+  // On success, returns true.
+  // On failure, returns false and stores error information into "*error" if
+  // "error" != nullptr.
+  virtual bool merge(Error *error,
+                     Array<Record> *input_records_1,
+                     Array<Record> *input_records_2,
+                     Array<Record> *output_records) = 0;
+};
+
+}  // namespace grnxx
+
+#endif  // GRNXX_MERGER_HPP

  Modified: include/grnxx/types/options.hpp (+7 -0)
===================================================================
--- include/grnxx/types/options.hpp    2014-09-22 11:26:58 +0900 (e8d8db5)
+++ include/grnxx/types/options.hpp    2014-09-22 16:07:22 +0900 (34a8192)
@@ -63,6 +63,13 @@ struct MergerOptions {
   // How to merge scores.
   MergerOperatorType operator_type;
 
+  // The first "offset" records are skipped (default: 0).
+  Int offset;
+
+  // At most "limit" records are returned
+  // (default: numeric_limits<Int>::max()).
+  Int limit;
+
   MergerOptions();
 };
 

  Modified: lib/grnxx/Makefile.am (+1 -0)
===================================================================
--- lib/grnxx/Makefile.am    2014-09-22 11:26:58 +0900 (8d7bf41)
+++ lib/grnxx/Makefile.am    2014-09-22 16:07:22 +0900 (6d72070)
@@ -17,6 +17,7 @@ libgrnxx_la_SOURCES =			\
 	expression.cpp			\
 	index.cpp			\
 	library.cpp			\
+	merger.cpp			\
 	name.cpp			\
 	pipeline.cpp			\
 	sorter.cpp			\

  Added: lib/grnxx/merger.cpp (+190 -0) 100644
===================================================================
--- /dev/null
+++ lib/grnxx/merger.cpp    2014-09-22 16:07:22 +0900 (b28b318)
@@ -0,0 +1,190 @@
+#include "grnxx/merger.hpp"
+
+#include <unordered_map>
+
+namespace grnxx {
+
+// -- AndMerger --
+
+class AndMerger : public Merger {
+ public:
+  ~AndMerger() {}
+
+  static unique_ptr<Merger> create(Error *error, const MergerOptions &options);
+
+  bool reset(Error *error,
+             Array<Record> *input_records_1,
+             Array<Record> *input_records_2,
+             Array<Record> *output_records);
+
+  bool progress(Error *error);
+
+  bool finish(Error *error);
+
+  bool merge(Error *error,
+             Array<Record> *input_records_1,
+             Array<Record> *input_records_2,
+             Array<Record> *output_records);
+
+ private:
+  Array<Record> *input_records_1_;
+  Array<Record> *input_records_2_;
+  Array<Record> *output_records_;
+  MergerOperatorType operator_type_;
+  Int offset_;
+  Int limit_;
+
+  AndMerger(MergerOperatorType operator_type, Int offset, Int limit)
+      : Merger(),
+        input_records_1_(nullptr),
+        input_records_2_(nullptr),
+        output_records_(nullptr),
+        operator_type_(operator_type),
+        offset_(offset),
+        limit_(limit) {}
+};
+
+unique_ptr<Merger> AndMerger::create(Error *error,
+                                     const MergerOptions &options) {
+  unique_ptr<Merger> merger(
+      new (nothrow) AndMerger(options.operator_type,
+                              options.offset,
+                              options.limit));
+  if (!merger) {
+    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
+    return nullptr;
+  }
+  return merger;
+}
+
+bool AndMerger::reset(Error *,
+                      Array<Record> *input_records_1,
+                      Array<Record> *input_records_2,
+                      Array<Record> *output_records) {
+  input_records_1_ = input_records_1;
+  input_records_2_ = input_records_2;
+  output_records_ = output_records;
+  return true;
+}
+
+bool AndMerger::progress(Error *) {
+  // TODO: Incremental merging is not supported yet.
+  return true;
+}
+
+bool AndMerger::finish(Error *error) {
+  // Create a hash table from the smaller input.
+  Array<Record> *filter_records;
+  Array<Record> *stream_records;
+  if (input_records_1_->size() < input_records_2_->size()) {
+    filter_records = input_records_1_;
+    stream_records = input_records_2_;
+  } else {
+    filter_records = input_records_2_;
+    stream_records = input_records_1_;
+  }
+  std::unordered_map<Int, Float> filter;
+  for (Int i = 0; i < filter_records->size(); ++i) try {
+    filter[filter_records->get_row_id(i)] = filter_records->get_score(i);
+  } catch (...) {
+    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
+    return false;
+  }
+
+  // Filter the stream (the larger input) with the hash table.
+  const MergerOperatorType operator_type = operator_type_;
+  const bool stream_is_1 = stream_records == input_records_1_;
+  for (Int i = 0; i < stream_records->size(); ++i) {
+    auto it = filter.find(stream_records->get_row_id(i));
+    if (it != filter.end()) {
+      Record record;
+      record.row_id = it->first;
+      switch (operator_type) {
+        case PLUS_MERGER_OPERATOR: {
+          record.score = stream_records->get_score(i) + it->second;
+          break;
+        }
+        case MINUS_MERGER_OPERATOR: {
+          if (stream_is_1) {
+            record.score = stream_records->get_score(i) - it->second;
+          } else {
+            record.score = it->second - stream_records->get_score(i);
+          }
+        }
+        case MULTIPLICATION_MERGER_OPERATOR: {
+          record.score = stream_records->get_score(i) * it->second;
+          break;
+        }
+        case LHS_MERGER_OPERATOR: {
+          if (stream_is_1) {
+            record.score = stream_records->get_score(i);
+          } else {
+            record.score = it->second;
+          }
+          break;
+        }
+        case RHS_MERGER_OPERATOR: {
+          if (stream_is_1) {
+            record.score = it->second;
+          } else {
+            record.score = stream_records->get_score(i);
+          }
+          break;
+        }
+        case ZERO_MERGER_OPERATOR: {
+          record.score = 0.0;
+          break;
+        }
+      }
+      if (!output_records_->push_back(error, record)) {
+        return false;
+      }
+    }
+  }
+
+  // Remove out-of-range records.
+  if (offset_ > 0) {
+    for (Int i = offset_; i < output_records_->size(); ++i) {
+      output_records_->set(i - offset_, output_records_->get(i));
+    }
+    output_records_->resize(nullptr, output_records_->size() - offset_);
+  }
+  if (limit_ < output_records_->size()) {
+    output_records_->resize(nullptr, limit_);
+  }
+  return true;
+}
+
+bool AndMerger::merge(Error *error,
+                      Array<Record> *input_records_1,
+                      Array<Record> *input_records_2,
+                      Array<Record> *output_records) {
+  if (!reset(error, input_records_1, input_records_2, output_records)) {
+    return false;
+  }
+  return finish(error);
+}
+
+// -- Merger --
+
+Merger::Merger() {}
+
+Merger::~Merger() {}
+
+unique_ptr<Merger> Merger::create(Error *error, const MergerOptions &options) {
+  switch (options.type) {
+    case AND_MERGER: {
+      return AndMerger::create(error, options);
+    }
+    case OR_MERGER:
+    case XOR_MERGER:
+    case MINUS_MERGER:
+    case LHS_MERGER:
+    case RHS_MERGER: {
+      GRNXX_ERROR_SET(error, NOT_SUPPORTED_YET, "Not supported yet");
+      return nullptr;
+    }
+  }
+}
+
+}  // namespace grnxx

  Modified: lib/grnxx/types.cpp (+3 -1)
===================================================================
--- lib/grnxx/types.cpp    2014-09-22 11:26:58 +0900 (d617bd9)
+++ lib/grnxx/types.cpp    2014-09-22 16:07:22 +0900 (636a24e)
@@ -62,7 +62,9 @@ SorterOptions::SorterOptions()
 
 MergerOptions::MergerOptions()
     : type(AND_MERGER),
-      operator_type(PLUS_MERGER_OPERATOR) {}
+      operator_type(PLUS_MERGER_OPERATOR),
+      offset(0),
+      limit(numeric_limits<Int>::max()) {}
 
 PipelineOptions::PipelineOptions() {}
 
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index