Thrill  0.1
reduce_by_hash_post_phase.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/reduce_by_hash_post_phase.hpp
3  *
4  * Hash table with support for reduce.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
9  * Copyright (C) 2015 Alexander Noe <[email protected]>
10  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_CORE_REDUCE_BY_HASH_POST_PHASE_HEADER
17 #define THRILL_CORE_REDUCE_BY_HASH_POST_PHASE_HEADER
18 
19 #include <thrill/api/context.hpp>
20 #include <thrill/common/logger.hpp>
25 #include <thrill/data/file.hpp>
26 
27 #include <algorithm>
28 #include <cassert>
29 #include <cmath>
30 #include <functional>
31 #include <string>
32 #include <utility>
33 #include <vector>
34 
35 namespace thrill {
36 namespace core {
37 
38 template <typename TableItem, typename Key, typename Value,
39  typename KeyExtractor, typename ReduceFunction, typename Emitter,
40  const bool VolatileKey,
41  typename ReduceConfig_ = DefaultReduceConfig,
42  typename IndexFunction = ReduceByHash<Key>,
43  typename KeyEqualFunction = std::equal_to<Key> >
45 {
46  static constexpr bool debug = false;
47 
48 public:
49  using ReduceConfig = ReduceConfig_;
51  TableItem, Value, Emitter, VolatileKey>;
52 
53  using Table = typename ReduceTableSelect<
54  ReduceConfig::table_impl_,
55  TableItem, Key, Value,
56  KeyExtractor, ReduceFunction, PhaseEmitter,
57  VolatileKey, ReduceConfig,
58  IndexFunction, KeyEqualFunction>::type;
59 
60  /*!
61  * A data structure which takes an arbitrary value and extracts a key using
62  * a key extractor function from that value. Afterwards, the value is hashed
63  * based on the key into some slot.
64  */
66  Context& ctx, size_t dia_id,
67  const KeyExtractor& key_extractor,
68  const ReduceFunction& reduce_function,
69  const Emitter& emit,
70  const ReduceConfig& config = ReduceConfig(),
71  const IndexFunction& index_function = IndexFunction(),
72  const KeyEqualFunction& key_equal_function = KeyEqualFunction())
73  : config_(config),
74  emitter_(emit),
75  table_(ctx, dia_id,
76  key_extractor, reduce_function, emitter_,
77  /* num_partitions */ 32, /* TODO(tb): parameterize */
78  config, /* immediate_flush */ false,
79  index_function, key_equal_function) { }
80 
81  //! non-copyable: delete copy-constructor
83  //! non-copyable: delete assignment operator
85 
86  void Initialize(size_t limit_memory_bytes) {
87  table_.Initialize(limit_memory_bytes);
88  }
89 
90  bool Insert(const TableItem& kv) {
91  return table_.Insert(kv);
92  }
93 
94  //! Flushes all items in the whole table.
95  template <bool DoCache>
96  void Flush(bool consume, data::File::Writer* writer = nullptr) {
97  LOG << "Flushing items";
98 
99  // list of remaining files, containing only partially reduced item pairs
100  // or items
101  std::vector<data::File> remaining_files;
102 
103  // read primary hash table, since ReduceByHash delivers items in any
104  // order, we can just emit items from fully reduced partitions.
105 
106  {
107  std::vector<data::File>& files = table_.partition_files();
108 
109  for (size_t id = 0; id < files.size(); ++id)
110  {
111  // get the actual reader from the file
112  data::File& file = files[id];
113 
114  // if items have been spilled, store for a second reduce
115  if (file.num_items() > 0) {
116  table_.SpillPartition(id);
117 
118  LOG << "partition " << id << " contains "
119  << file.num_items() << " partially reduced items";
120 
121  remaining_files.emplace_back(std::move(file));
122  }
123  else {
124  LOG << "partition " << id << " contains "
125  << table_.items_per_partition(id)
126  << " fully reduced items";
127 
128  table_.FlushPartitionEmit(
129  id, consume, /* grow */ false,
130  [this, writer](
131  const size_t& partition_id, const TableItem& p) {
132  if (DoCache) writer->Put(p);
133  emitter_.Emit(partition_id, p);
134  });
135  }
136  }
137  }
138 
139  if (remaining_files.size() == 0) {
140  LOG << "Flushed items directly.";
141  return;
142  }
143 
144  table_.Dispose();
145 
146  assert(consume && "Items were spilled hence Flushing must consume");
147 
148  // if partially reduce files remain, create new hash tables to process
149  // them iteratively.
150 
151  size_t iteration = 1;
152 
153  while (remaining_files.size())
154  {
155  sLOG << "ReducePostPhase: re-reducing items from"
156  << remaining_files.size() << "spilled files"
157  << "iteration" << iteration;
158  sLOG << "-- Try to increase the amount of RAM to avoid this.";
159 
160  std::vector<data::File> next_remaining_files;
161 
162  Table subtable(
163  table_.ctx(), table_.dia_id(),
164  table_.key_extractor(), table_.reduce_function(), emitter_,
165  /* num_partitions */ 32, config_, /* immediate_flush */ false,
166  IndexFunction(iteration, table_.index_function()),
167  table_.key_equal_function());
168 
169  subtable.Initialize(table_.limit_memory_bytes());
170 
171  size_t num_subfile = 0;
172 
173  for (data::File& file : remaining_files)
174  {
175  // insert all items from the partially reduced file
176  sLOG << "re-reducing subfile" << num_subfile++
177  << "containing" << file.num_items() << "items";
178 
179  data::File::ConsumeReader reader = file.GetConsumeReader();
180 
181  while (reader.HasNext()) {
182  subtable.Insert(reader.Next<TableItem>());
183  }
184 
185  // after insertion, flush fully reduced partitions and save
186  // remaining files for next iteration.
187 
188  std::vector<data::File>& subfiles = subtable.partition_files();
189 
190  for (size_t id = 0; id < subfiles.size(); ++id)
191  {
192  // get the actual reader from the file
193  data::File& subfile = subfiles[id];
194 
195  // if items have been spilled, store for a second reduce
196  if (subfile.num_items() > 0) {
197  subtable.SpillPartition(id);
198 
199  sLOG << "partition" << id << "contains"
200  << subfile.num_items() << "partially reduced items";
201 
202  next_remaining_files.emplace_back(std::move(subfile));
203  }
204  else {
205  sLOG << "partition" << id << "contains"
206  << subtable.items_per_partition(id)
207  << "fully reduced items";
208 
209  subtable.FlushPartitionEmit(
210  id, /* consume */ true, /* grow */ false,
211  [this, writer](
212  const size_t& partition_id, const TableItem& p) {
213  if (DoCache) writer->Put(p);
214  emitter_.Emit(partition_id, p);
215  });
216  }
217  }
218  }
219 
220  remaining_files = std::move(next_remaining_files);
221  ++iteration;
222  }
223 
224  LOG << "Flushed items";
225  }
226 
227  //! Push data into emitter
228  void PushData(bool consume = false) {
229  if (!cache_)
230  {
231  if (!table_.has_spilled_data()) {
232  // no items were spilled to disk, hence we can emit all data
233  // from RAM.
234  Flush</* DoCache */ false>(consume);
235  }
236  else {
237  // items were spilled, hence the reduce table must be emptied
238  // and we have to cache the output stream.
239  cache_ = table_.ctx().GetFilePtr(table_.dia_id());
240  data::File::Writer writer = cache_->GetWriter();
241  Flush</* DoCache */ true>(true, &writer);
242  }
243  }
244  else
245  {
246  // previous PushData() has stored data in cache_
247  data::File::Reader reader = cache_->GetReader(consume);
248  while (reader.HasNext())
249  emitter_.Emit(reader.Next<TableItem>());
250  }
251  }
252 
253  void Dispose() {
254  table_.Dispose();
255  if (cache_) cache_.reset();
256  }
257 
258  //! \name Accessors
259  //! \{
260 
261  //! Returns mutable reference to first table_
262  Table& table() { return table_; }
263 
264  //! Returns the total num of items in the table.
265  size_t num_items() const { return table_.num_items(); }
266 
267  //! \}
268 
269 private:
270  //! Stored reduce config to initialize the subtable.
271  ReduceConfig config_;
272 
273  //! Emitters used to parameterize hash table for output to next DIA node.
274  PhaseEmitter emitter_;
275 
276  //! the first-level hash table implementation
278 
279  //! File for storing data in-case we need multiple re-reduce levels.
281 };
282 
283 } // namespace core
284 } // namespace thrill
285 
286 #endif // !THRILL_CORE_REDUCE_BY_HASH_POST_PHASE_HEADER
287 
288 /******************************************************************************/
ReduceConfig config_
Stored reduce config to initialize the subtable.
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
Table table_
the first-level hash table implementation
Table & table()
Returns mutable reference to first table_.
size_t num_items() const
Returns the total num of items in the table.
Type selection via ReduceTableImpl enum.
data::FilePtr cache_
File for storing data in-case we need multiple re-reduce levels.
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void reset()
release contained pointer, frees object if this is the last reference.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
ReduceByHashPostPhase & operator=(const ReduceByHashPostPhase &)=delete
non-copyable: delete assignment operator
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
void PushData(bool consume=false)
Push data into emitter.
void Flush(bool consume, data::File::Writer *writer=nullptr)
Flushes all items in the whole table.
void Initialize(size_t limit_memory_bytes)
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
PhaseEmitter emitter_
Emitters used to parameterize hash table for output to next DIA node.
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
typename ReduceTableSelect< ReduceConfig::table_impl_, TableItem, Key, ValueType, KeyExtractor, ReduceFunction, PhaseEmitter, VolatileKey, ReduceConfig, thrill::core::ReduceByHash, KeyEqualFunction >::type Table
ReducePostPhaseEmitter< TableItem, Value, Emitter, VolatileKey > PhaseEmitter
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
ReduceByHashPostPhase(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const Emitter &emit, const ReduceConfig &config=ReduceConfig(), const IndexFunction &index_function=IndexFunction(), const KeyEqualFunction &key_equal_function=KeyEqualFunction())
A data structure which takes an arbitrary value and extracts a key using a key extractor function fro...