Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
zip_with_index.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/zip_with_index.hpp
3  *
4  * DIANode for a ZipWithIndex operation.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2016 Timo Bingmann <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_API_ZIP_WITH_INDEX_HEADER
15 #define THRILL_API_ZIP_WITH_INDEX_HEADER
16 
17 #include <thrill/api/dia.hpp>
18 #include <thrill/api/dop_node.hpp>
20 #include <thrill/common/logger.hpp>
21 #include <thrill/data/file.hpp>
22 
23 #include <algorithm>
24 #include <functional>
25 
26 namespace thrill {
27 namespace api {
28 
29 /*!
30  * A DIANode which calculates the array index for each items and performs a
31  * Zip-like operation without extra rebalancing of the DIA data. This DIANode
32  * supports only one parent, if more than one inputs must be zipped, use the
33  * general Zip() with a Generate() DIA.
34  *
35  * \ingroup api_layer
36  */
37 template <typename ValueType, typename ZipFunction>
38 class ZipWithIndexNode final : public DOpNode<ValueType>
39 {
40  static constexpr bool debug = false;
41 
43  using Super::context_;
44 
45  using InputType =
46  typename common::FunctionTraits<ZipFunction>::template arg_plain<0>;
47 
48 public:
49  /*!
50  * Constructor for a ZipNode.
51  */
52  template <typename ParentDIA>
54  const ZipFunction& zip_function, const ParentDIA& parent)
55  : Super(parent.ctx(), "ZipWithIndex",
56  { parent.id() }, { parent.node() }),
57  zip_function_(zip_function),
58  parent_stack_empty_(ParentDIA::stack_empty) {
59  // Hook PreOp(s)
60  auto pre_op_fn = [this](const InputType& input) {
61  writer_.Put(input);
62  };
63 
64  auto lop_chain = parent.stack().push(pre_op_fn).fold();
65  parent.node()->AddChild(this, lop_chain);
66  }
67 
68  //! Receive a whole data::File of ValueType, but only if our stack is empty.
69  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
70  if (!parent_stack_empty_) {
72  << "ZipWithIndex rejected File from parent due to Stack";
73  return false;
74  }
75 
76  // accept file
77  assert(file_.num_items() == 0);
78  file_ = file.Copy();
79  return true;
80  }
81 
82  void StopPreOp(size_t /* parent_index */) final {
83  writer_.Close();
84  }
85 
86  void Execute() final {
87  //! number of elements of this worker
88  size_t dia_local_size = file_.num_items();
89  sLOG << "dia_local_size" << dia_local_size;
90 
91  dia_local_rank_ = context_.net.ExPrefixSum(dia_local_size);
92  sLOG << "dia_local_rank_" << dia_local_rank_;
93  }
94 
95  void PushData(bool consume) final {
96  size_t result_count = file_.num_items();
97 
98  data::File::Reader reader = file_.GetReader(consume);
99  size_t index = dia_local_rank_;
100 
101  while (reader.HasNext()) {
102  this->PushItem(
103  zip_function_(reader.template Next<InputType>(), index++));
104  }
105 
106  if (debug) {
108  "Zip() result_count", result_count);
109  }
110  }
111 
112  void Dispose() final {
113  file_.Clear();
114  }
115 
116 private:
117  //! Zip function
118  ZipFunction zip_function_;
119 
120  //! Whether the parent stack is empty
121  const bool parent_stack_empty_;
122 
123  //! File for intermediate storage
124  data::File file_ { context_.GetFile(this) };
125 
126  //! Writer to intermediate file
127  data::File::Writer writer_ { file_.GetWriter() };
128 
129  //! \name Variables for Calculating Global Index
130  //! \{
131 
132  //! exclusive prefix sum over the number of items in workers
134 
135  //! \}
136 };
137 
138 template <typename ValueType, typename Stack>
139 template <typename ZipFunction>
141  const ZipFunction& zip_function) const {
142 
143  static_assert(
144  common::FunctionTraits<ZipFunction>::arity == 2,
145  "ZipWithIndexFunction must take exactly two parameters");
146 
147  static_assert(
148  std::is_convertible<
149  ValueType,
150  typename common::FunctionTraits<ZipFunction>::template arg<0>
151  >::value,
152  "ZipWithIndexFunction has the wrong input type in DIA 0");
153 
154  static_assert(
155  std::is_convertible<
156  size_t,
157  typename common::FunctionTraits<ZipFunction>::template arg<1>
158  >::value,
159  "ZipWithIndexFunction must take a const unsigned long int& (aka. size_t)"
160  " as second parameter");
161 
162  using ZipResult
163  = typename common::FunctionTraits<ZipFunction>::result_type;
164 
166 
167  auto node = tlx::make_counting<ZipWithIndexNode>(zip_function, *this);
168 
169  return DIA<ZipResult>(node);
170 }
171 
172 //! \}
173 
174 } // namespace api
175 } // namespace thrill
176 
177 #endif // !THRILL_API_ZIP_WITH_INDEX_HEADER
178 
179 /******************************************************************************/
const bool parent_stack_empty_
Whether the parent stack is empty.
net::FlowControlChannel & net
Definition: context.hpp:446
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
void PrintCollectiveMeanStdev(const char *text, const Type &local)
Definition: context.hpp:352
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
size_t dia_local_rank_
exclusive prefix sum over the number of items in workers
typename common::FunctionTraits< ZipFunction >::template arg_plain< 0 > InputType
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, given a certain sum operation.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
Definition: config.hpp:44
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
data::File::Writer writer_
Writer to intermediate file.
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
ZipWithIndexNode(const ZipFunction &zip_function, const ParentDIA &parent)
Constructor for a ZipNode.
data::File file_
File for intermediate storage.
static constexpr bool debug
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
void Close()
Explicitly close the writer.
auto ZipWithIndex(const ZipFunction &zip_function) const
Zips each item of a DIA with its zero-based array index.
A DIANode which calculates the array index for each items and performs a Zip-like operation without e...
DynBlockReader Reader
Definition: file.hpp:60
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21