Thrill  0.1
mix_block_queue.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/mix_block_queue.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_DATA_MIX_BLOCK_QUEUE_HEADER
13 #define THRILL_DATA_MIX_BLOCK_QUEUE_HEADER
14 
17 #include <thrill/common/logger.hpp>
18 #include <thrill/data/block.hpp>
23 #include <thrill/data/file.hpp>
24 
25 #include <vector>
26 
27 namespace thrill {
28 namespace data {
29 
30 //! \addtogroup data_layer
31 //! \{
32 
33 class MixStreamData;
34 class MixBlockQueueReader;
35 
37 
38 /*!
39  * Implements reading an unordered sequence of items from multiple workers,
40  * which sends Blocks. This class is mainly used to implement MixChannel.
41  *
42  * When Blocks arrive from the net, the Multiplexer pushes (src, Blocks) pairs
43  * to MixChannel, which pushes them into a MixBlockQueue. The
44  * MixBlockQueue stores these in a ConcurrentBoundedQueue for atomic reading.
45  *
46  * When the MixChannel should be read, MixBlockQueueReader is used, which
47  * retrieves Blocks from the queue. The Reader contains one complete BlockReader
48  * for each inbound worker, and these BlockReaders are attached to BlockQueue
49  * instances inside the MixBlockQueue.
50  *
51  * To enable unordered reading from multiple workers, the only remaining thing
52  * to do is to fetch Blocks from the main mix queue and put them into the
53  * right BlockQueue for the sub-readers to consume. By taking the Blocks from
54  * the main mix queue, the Reader only blocks when no inbound Blocks are
55  * available.
56  *
57  * To enable switching between items from different workers, the
58  * MixBlockQueueReader keeps track of how many _whole_ items are available on
59  * each reader. This number is simply -1 of the number of items known to start
60  * in the received blocks. The last item _may_ span further Blocks, and cannot
61  * be fetched without infinitely blocking the sub-reader, since no thread will
62  * deliver the next Block.
63  */
65 {
66  static constexpr bool debug = false;
67 
68 public:
69  //! pair of (source worker, Block) stored in the main mix queue.
70  struct SrcBlockPair {
71  size_t src;
73  };
74 
76 
77  //! Constructor from BlockPool
78  MixBlockQueue(BlockPool& block_pool, size_t num_workers,
79  size_t local_worker_id, size_t dia_id);
80 
81  //! non-copyable: delete copy-constructor
82  MixBlockQueue(const MixBlockQueue&) = delete;
83  //! non-copyable: delete assignment operator
84  MixBlockQueue& operator = (const MixBlockQueue&) = delete;
85  //! move-constructor: default
86  MixBlockQueue(MixBlockQueue&&) = default;
87  //! move-assignment operator: default
89 
90  //! change dia_id after construction (needed because it may be unknown at
91  //! construction)
92  void set_dia_id(size_t dia_id);
93 
94  //! return block pool
96 
97  //! append block delivered via the network from src.
98  void AppendBlock(size_t src, const Block& block);
99 
100  //! append block delivered via the network from src.
101  void AppendBlock(size_t src, Block&& block);
102 
103  //! append closing sentinel block from src (also delivered via the network).
104  void Close(size_t src);
105 
106  //! Blocking retrieval of a (source,block) pair.
107  SrcBlockPair Pop();
108 
109  //! check if writer side Close() was called.
110  bool write_closed() const { return write_open_count_ == 0; }
111 
112  //! check if reader side has returned a closing sentinel block
113  bool read_closed() const { return read_open_ == 0; }
114 
115  //! check if inbound queue is closed
116  bool is_queue_closed(size_t src);
117 
118 private:
120 
122 
123  //! the main mix queue, containing the block in the reception order.
125 
126  //! total number of workers in system.
127  size_t num_workers_;
128 
129  //! counter on number of writers still open.
131 
132  //! flag to test for closed sources
133  std::vector<unsigned char> write_closed_;
134 
135  //! number of times Pop() has not yet returned a closing Block; hence, if we
136  //! received the close message from the writer.
138 
139  //! BlockQueues to deliver blocks to from mix queue.
140  std::vector<BlockQueue> queues_;
141 
142  //! for access to queues_ and other internals.
143  friend class MixBlockQueueReader;
144 };
145 
146 /*!
147  * Reader to retrieve items in unordered sequence from a MixBlockQueue. This
148  * is not a full implementation of _all_ methods available in a normal
149  * BlockReader. Mainly, this is because only retrieval of _whole_ items are
150  * possible. Due to the unordered sequence, these probably have to be all of
151  * equal type as well.
152  *
153  * The Reader supports all combinations of consuming and keeping. However, do
154  * not assume that the second round of reading delivers items in the same order
155  * as the first. This is because once items are cached inside the BlockQueues of
156  * MixBlockQueue, we use a plain CatReader to deliver them again (which is
157  * probably faster as it has a sequential access pattern).
158  *
159  * See \ref MixBlockQueue for more information on how items are read.
160  */
162 {
163  static constexpr bool debug = false;
164 
165 public:
168 
170  bool consume, size_t local_worker_id);
171 
172  //! non-copyable: delete copy-constructor
173  MixBlockQueueReader(const MixBlockQueueReader&) = delete;
174  //! non-copyable: delete assignment operator
176  //! move-constructor: default
178  //! move-assignment operator: default
180 
181  //! Possibly consume unread blocks.
183 
184  //! HasNext() returns true if at least one more item is available.
185  bool HasNext() {
186  if (reread_) return cat_reader_.HasNext();
187 
188  if (available_) return true;
189  if (open_ == 0) return false;
190 
191  return PullBlock();
192  }
193 
194  //! Next() reads a complete item T
195  template <typename T>
196  T Next() {
197  assert(HasNext());
198 
199  if (reread_) {
200  return cat_reader_.template Next<T>();
201  }
202  else {
203  if (!available_) {
204  if (!PullBlock()) {
205  throw std::runtime_error(
206  "Data underflow in MixBlockQueueReader.");
207  }
208  }
209 
210  assert(available_ > 0);
211  assert(selected_ < readers_.size());
212 
213  --available_;
214  return readers_[selected_].template Next<T>();
215  }
216  }
217 
218 private:
219  //! reference to mix queue
221 
222  //! flag whether we are rereading the mix queue by reading the files using
223  //! a cat_reader_.
224  const bool reread_;
225 
226  //! \name Attributes for Mix Reading
227  //! \{
228 
229  //! sub-readers for each block queue in mix queue
230  std::vector<BlockQueue::Reader> readers_;
231 
232  //! reader currently selected
233  size_t selected_ = size_t(-1);
234 
235  //! number of available items on the selected reader
236  size_t available_ = 0;
237 
238  //! number of additional items available at reader (excluding current
239  //! available_)
240  std::vector<size_t> available_at_;
241 
242  //! number of readers still open
243  size_t open_ = mix_queue_.num_workers_;
244 
245  //! \}
246 
247  //! for rereading the mix queue: use a cat reader on the embedded
248  //! BlockQueue's files.
249  CatBlockReader cat_reader_ { CatBlockSource() };
250 
251  bool PullBlock();
252 };
253 
254 //! \}
255 
256 } // namespace data
257 } // namespace thrill
258 
259 #endif // !THRILL_DATA_MIX_BLOCK_QUEUE_HEADER
260 
261 /******************************************************************************/
void Close(size_t src)
append closing sentinel block from src (also delivered via the network).
MixBlockQueue(BlockPool &block_pool, size_t num_workers, size_t local_worker_id, size_t dia_id)
Constructor from BlockPool.
bool HasNext()
HasNext() returns true if at least one more item is available.
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
std::vector< unsigned char > write_closed_
flag to test for closed sources
void set_dia_id(size_t dia_id)
double T
std::vector< BlockQueue > queues_
BlockQueues to deliver blocks to from mix queue.
pair of (source worker, Block) stored in the main mix queue.
CatBlockSource is a BlockSource which concatenates all Blocks available from a vector of BlockSources...
friend class MixBlockQueueReader
for access to queues_ and other internals.
BlockPool & block_pool()
return block pool
Implements reading an unordered sequence of items from multiple workers, which sends Blocks...
MixBlockQueue & operator=(const MixBlockQueue &)=delete
non-copyable: delete assignment operator
Reader to retrieve items in unordered sequence from a MixBlockQueue.
MixBlockQueue & mix_queue_
reference to mix queue
std::vector< BlockQueue::Reader > readers_
sub-readers for each block queue in mix queue
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
bool write_closed() const
check if writer side Close() was called.
static constexpr bool debug
This is a queue, similar to std::queue and tbb::concurrent_bounded_queue, except that it uses mutexes...
T Next()
Next() reads a complete item T.
size_t num_workers_
total number of workers in system.
void AppendBlock(size_t src, const Block &block)
append block delivered via the network from src.
bool is_queue_closed(size_t src)
check if inbound queue is closed
SrcBlockPair Pop()
Blocking retrieval of a (source,block) pair.
bool read_closed() const
check if reader side has returned a closing sentinel block
common::AtomicMovable< size_t > write_open_count_
counter on number of writers still open.
common::ConcurrentBoundedQueue< SrcBlockPair > mix_queue_
the main mix queue, containing the block in the reception order.