Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
zip_with_index.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/zip_with_index.hpp
3  *
4  * DIANode for a ZipWithIndex operation.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2016 Timo Bingmann <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_API_ZIP_WITH_INDEX_HEADER
15 #define THRILL_API_ZIP_WITH_INDEX_HEADER
16 
17 #include <thrill/api/dia.hpp>
18 #include <thrill/api/dop_node.hpp>
20 #include <thrill/common/logger.hpp>
21 #include <thrill/data/file.hpp>
22 
23 #include <algorithm>
24 #include <functional>
25 
26 namespace thrill {
27 namespace api {
28 
29 /*!
30  * A DIANode which calculates the array index for each items and performs a
31  * Zip-like operation without extra rebalancing of the DIA data. This DIANode
32  * supports only one parent, if more than one inputs must be zipped, use the
33  * general Zip() with a Generate() DIA.
34  *
35  * \ingroup api_layer
36  */
37 template <typename ValueType, typename ZipFunction>
38 class ZipWithIndexNode final : public DOpNode<ValueType>
39 {
40  static constexpr bool debug = false;
41 
43  using Super::context_;
44 
45  using InputType =
46  typename common::FunctionTraits<ZipFunction>::template arg_plain<0>;
47 
48 public:
49  /*!
50  * Constructor for a ZipNode.
51  */
52  template <typename ParentDIA>
54  const ZipFunction& zip_function, const ParentDIA& parent)
55  : Super(parent.ctx(), "ZipWithIndex",
56  { parent.id() }, { parent.node() }),
57  zip_function_(zip_function),
58  parent_stack_empty_(ParentDIA::stack_empty)
59  {
60  // Hook PreOp(s)
61  auto pre_op_fn = [this](const InputType& input) {
62  writer_.Put(input);
63  };
64 
65  auto lop_chain = parent.stack().push(pre_op_fn).fold();
66  parent.node()->AddChild(this, lop_chain);
67  }
68 
69  //! Receive a whole data::File of ValueType, but only if our stack is empty.
70  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
71  if (!parent_stack_empty_) {
73  << "ZipWithIndex rejected File from parent due to Stack";
74  return false;
75  }
76 
77  // accept file
78  assert(file_.num_items() == 0);
79  file_ = file.Copy();
80  return true;
81  }
82 
83  void StopPreOp(size_t /* parent_index */) final {
84  writer_.Close();
85  }
86 
87  void Execute() final {
88  //! number of elements of this worker
89  size_t dia_local_size = file_.num_items();
90  sLOG << "dia_local_size" << dia_local_size;
91 
92  dia_local_rank_ = context_.net.ExPrefixSum(dia_local_size);
93  sLOG << "dia_local_rank_" << dia_local_rank_;
94  }
95 
96  void PushData(bool consume) final {
97  size_t result_count = file_.num_items();
98 
99  data::File::Reader reader = file_.GetReader(consume);
100  size_t index = dia_local_rank_;
101 
102  while (reader.HasNext()) {
103  this->PushItem(
104  zip_function_(reader.template Next<InputType>(), index++));
105  }
106 
107  if (debug) {
109  "Zip() result_count", result_count);
110  }
111  }
112 
113  void Dispose() final {
114  file_.Clear();
115  }
116 
117 private:
118  //! Zip function
119  ZipFunction zip_function_;
120 
121  //! Whether the parent stack is empty
122  const bool parent_stack_empty_;
123 
124  //! File for intermediate storage
125  data::File file_ { context_.GetFile(this) };
126 
127  //! Writer to intermediate file
128  data::File::Writer writer_ { file_.GetWriter() };
129 
130  //! \name Variables for Calculating Global Index
131  //! \{
132 
133  //! exclusive prefix sum over the number of items in workers
135 
136  //! \}
137 };
138 
139 template <typename ValueType, typename Stack>
140 template <typename ZipFunction>
142  const ZipFunction& zip_function) const {
143 
144  static_assert(
145  common::FunctionTraits<ZipFunction>::arity == 2,
146  "ZipWithIndexFunction must take exactly two parameters");
147 
148  static_assert(
149  std::is_convertible<
150  ValueType,
151  typename common::FunctionTraits<ZipFunction>::template arg<0>
152  >::value,
153  "ZipWithIndexFunction has the wrong input type in DIA 0");
154 
155  static_assert(
156  std::is_convertible<
157  size_t,
158  typename common::FunctionTraits<ZipFunction>::template arg<1>
159  >::value,
160  "ZipWithIndexFunction must take a const unsigned long int& (aka. size_t)"
161  " as second parameter");
162 
163  using ZipResult
164  = typename common::FunctionTraits<ZipFunction>::result_type;
165 
167 
168  auto node = tlx::make_counting<ZipWithIndexNode>(zip_function, *this);
169 
170  return DIA<ZipResult>(node);
171 }
172 
173 //! \}
174 
175 } // namespace api
176 } // namespace thrill
177 
178 #endif // !THRILL_API_ZIP_WITH_INDEX_HEADER
179 
180 /******************************************************************************/
const bool parent_stack_empty_
Whether the parent stack is empty.
net::FlowControlChannel & net
Definition: context.hpp:443
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
void PrintCollectiveMeanStdev(const char *text, const Type &local)
Definition: context.hpp:349
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
size_t dia_local_rank_
exclusive prefix sum over the number of items in workers
typename common::FunctionTraits< ZipFunction >::template arg_plain< 0 > InputType
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, given a certain sum operation.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
Definition: config.hpp:44
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
data::File::Writer writer_
Writer to intermediate file.
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
ZipWithIndexNode(const ZipFunction &zip_function, const ParentDIA &parent)
Constructor for a ZipNode.
data::File file_
File for intermediate storage.
static constexpr bool debug
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
void Close()
Explicitly close the writer.
auto ZipWithIndex(const ZipFunction &zip_function) const
Zips each item of a DIA with its zero-based array index.
A DIANode which calculates the array index for each items and performs a Zip-like operation without e...
DynBlockReader Reader
Definition: file.hpp:60
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21