Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
mix_block_queue.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/mix_block_queue.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_DATA_MIX_BLOCK_QUEUE_HEADER
13 #define THRILL_DATA_MIX_BLOCK_QUEUE_HEADER
14 
17 #include <thrill/common/logger.hpp>
18 #include <thrill/data/block.hpp>
23 #include <thrill/data/file.hpp>
24 
25 #include <vector>
26 
27 namespace thrill {
28 namespace data {
29 
30 //! \addtogroup data_layer
31 //! \{
32 
33 class MixStreamData;
34 class MixBlockQueueReader;
35 
37 
38 /*!
39  * Implements reading an unordered sequence of items from multiple workers,
40  * which sends Blocks. This class is mainly used to implement MixChannel.
41  *
42  * When Blocks arrive from the net, the Multiplexer pushes (src, Blocks) pairs
43  * to MixChannel, which pushes them into a MixBlockQueue. The
44  * MixBlockQueue stores these in a ConcurrentBoundedQueue for atomic reading.
45  *
46  * When the MixChannel should be read, MixBlockQueueReader is used, which
47  * retrieves Blocks from the queue. The Reader contains one complete BlockReader
48  * for each inbound worker, and these BlockReaders are attached to BlockQueue
49  * instances inside the MixBlockQueue.
50  *
51  * To enable unordered reading from multiple workers, the only remaining thing
52  * to do is to fetch Blocks from the main mix queue and put them into the
53  * right BlockQueue for the sub-readers to consume. By taking the Blocks from
54  * the main mix queue, the Reader only blocks when no inbound Blocks are
55  * available.
56  *
57  * To enable switching between items from different workers, the
58  * MixBlockQueueReader keeps track of how many _whole_ items are available on
59  * each reader. This number is simply -1 of the number of items known to start
60  * in the received blocks. The last item _may_ span further Blocks, and cannot
61  * be fetched without infinitely blocking the sub-reader, since no thread will
62  * deliver the next Block.
63  */
65 {
66  static constexpr bool debug = false;
67 
68 public:
69  //! pair of (source worker, Block) stored in the main mix queue.
70  struct SrcBlockPair {
71  size_t src;
73  };
74 
76 
77  //! Constructor from BlockPool
78  MixBlockQueue(BlockPool& block_pool, size_t num_workers,
79  size_t local_worker_id, size_t dia_id);
80 
81  //! non-copyable: delete copy-constructor
82  MixBlockQueue(const MixBlockQueue&) = delete;
83  //! non-copyable: delete assignment operator
84  MixBlockQueue& operator = (const MixBlockQueue&) = delete;
85  //! move-constructor: default
86  MixBlockQueue(MixBlockQueue&&) = default;
87  //! move-assignment operator: default
89 
90  //! change dia_id after construction (needed because it may be unknown at
91  //! construction)
92  void set_dia_id(size_t dia_id);
93 
94  //! return block pool
96 
97  //! append block delivered via the network from src.
98  void AppendBlock(size_t src, const Block& block);
99 
100  //! append block delivered via the network from src.
101  void AppendBlock(size_t src, Block&& block);
102 
103  //! append closing sentinel block from src (also delivered via the network).
104  void Close(size_t src);
105 
106  //! Blocking retrieval of a (source,block) pair.
107  SrcBlockPair Pop();
108 
109  //! check if writer side Close() was called.
110  bool write_closed() const { return write_open_count_ == 0; }
111 
112  //! check if reader side has returned a closing sentinel block
113  bool read_closed() const { return read_open_ == 0; }
114 
115 private:
117 
119 
120  //! the main mix queue, containing the block in the reception order.
122 
123  //! total number of workers in system.
124  size_t num_workers_;
125 
126  //! counter on number of writers still open.
128 
129  //! flag to test for closed sources
130  std::vector<unsigned char> write_closed_;
131 
132  //! number of times Pop() has not yet returned a closing Block; hence, if we
133  //! received the close message from the writer.
135 
136  //! BlockQueues to deliver blocks to from mix queue.
137  std::vector<BlockQueue> queues_;
138 
139  //! for access to queues_ and other internals.
140  friend class MixBlockQueueReader;
141 };
142 
143 /*!
144  * Reader to retrieve items in unordered sequence from a MixBlockQueue. This
145  * is not a full implementation of _all_ methods available in a normal
146  * BlockReader. Mainly, this is because only retrieval of _whole_ items are
147  * possible. Due to the unordered sequence, these probably have to be all of
148  * equal type as well.
149  *
150  * The Reader supports all combinations of consuming and keeping. However, do
151  * not assume that the second round of reading delivers items in the same order
152  * as the first. This is because once items are cached inside the BlockQueues of
153  * MixBlockQueue, we use a plain CatReader to deliver them again (which is
154  * probably faster as it has a sequential access pattern).
155  *
156  * See \ref MixBlockQueue for more information on how items are read.
157  */
159 {
160  static constexpr bool debug = false;
161 
162 public:
165 
167  bool consume, size_t local_worker_id);
168 
169  //! non-copyable: delete copy-constructor
170  MixBlockQueueReader(const MixBlockQueueReader&) = delete;
171  //! non-copyable: delete assignment operator
173  //! move-constructor: default
175  //! move-assignment operator: default
177 
178  //! Possibly consume unread blocks.
180 
181  //! HasNext() returns true if at least one more item is available.
182  bool HasNext() {
183  if (reread_) return cat_reader_.HasNext();
184 
185  if (available_) return true;
186  if (open_ == 0) return false;
187 
188  return PullBlock();
189  }
190 
191  //! Next() reads a complete item T
192  template <typename T>
193  T Next() {
194  assert(HasNext());
195 
196  if (reread_) {
197  return cat_reader_.template Next<T>();
198  }
199  else {
200  if (!available_) {
201  if (!PullBlock()) {
202  throw std::runtime_error(
203  "Data underflow in MixBlockQueueReader.");
204  }
205  }
206 
207  assert(available_ > 0);
208  assert(selected_ < readers_.size());
209 
210  --available_;
211  return readers_[selected_].template Next<T>();
212  }
213  }
214 
215 private:
216  //! reference to mix queue
218 
219  //! flag whether we are rereading the mix queue by reading the files using
220  //! a cat_reader_.
221  const bool reread_;
222 
223  //! \name Attributes for Mix Reading
224  //! \{
225 
226  //! sub-readers for each block queue in mix queue
227  std::vector<BlockQueue::Reader> readers_;
228 
229  //! reader currently selected
230  size_t selected_ = size_t(-1);
231 
232  //! number of available items on the selected reader
233  size_t available_ = 0;
234 
235  //! number of additional items available at reader (excluding current
236  //! available_)
237  std::vector<size_t> available_at_;
238 
239  //! number of readers still open
241 
242  //! \}
243 
244  //! for rereading the mix queue: use a cat reader on the embedded
245  //! BlockQueue's files.
247 
248  bool PullBlock();
249 };
250 
251 //! \}
252 
253 } // namespace data
254 } // namespace thrill
255 
256 #endif // !THRILL_DATA_MIX_BLOCK_QUEUE_HEADER
257 
258 /******************************************************************************/
data::CatBlockSource< DynBlockSource > CatBlockSource
void Close(size_t src)
append closing sentinel block from src (also delivered via the network).
MixBlockQueue(BlockPool &block_pool, size_t num_workers, size_t local_worker_id, size_t dia_id)
Constructor from BlockPool.
bool HasNext()
HasNext() returns true if at least one more item is available.
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
std::vector< unsigned char > write_closed_
flag to test for closed sources
void set_dia_id(size_t dia_id)
This is a queue, similar to std::queue and tbb::concurrent_bounded_queue, except that it uses mutexes...
double T
std::vector< BlockQueue > queues_
BlockQueues to deliver blocks to from mix queue.
MixBlockQueueReader(MixBlockQueue &mix_queue, bool consume, size_t local_worker_id)
pair of (source worker, Block) stored in the main mix queue.
size_t available_
number of available items on the selected reader
CatBlockSource is a BlockSource which concatenates all Blocks available from a vector of BlockSources...
size_t selected_
reader currently selected
size_t open_
number of readers still open
~MixBlockQueueReader()
Possibly consume unread blocks.
friend class MixBlockQueueReader
for access to queues_ and other internals.
BlockPool & block_pool()
return block pool
Implements reading an unordered sequence of items from multiple workers, which sends Blocks...
MixBlockQueue & operator=(const MixBlockQueue &)=delete
non-copyable: delete assignment operator
Reader to retrieve items in unordered sequence from a MixBlockQueue.
MixBlockQueue & mix_queue_
reference to mix queue
std::vector< BlockQueue::Reader > readers_
sub-readers for each block queue in mix queue
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
static constexpr bool debug
T Next()
Next() reads a complete item T.
size_t num_workers_
total number of workers in system.
void AppendBlock(size_t src, const Block &block)
append block delivered via the network from src.
MixBlockQueueReader & operator=(const MixBlockQueueReader &)=delete
non-copyable: delete assignment operator
bool read_closed() const
check if reader side has returned a closing sentinel block
SrcBlockPair Pop()
Blocking retrieval of a (source,block) pair.
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
common::AtomicMovable< size_t > write_open_count_
counter on number of writers still open.
bool write_closed() const
check if writer side Close() was called.
common::ConcurrentBoundedQueue< SrcBlockPair > mix_queue_
the main mix queue, containing the block in the reception order.