Thrill  0.1
block_queue.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/block_queue.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Tobias Sturm <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_BLOCK_QUEUE_HEADER
14 #define THRILL_DATA_BLOCK_QUEUE_HEADER
15 
19 #include <thrill/data/block.hpp>
23 #include <thrill/data/file.hpp>
24 
25 #include <atomic>
26 
27 namespace thrill {
28 namespace data {
29 
30 //! \addtogroup data_layer
31 //! \{
32 
33 class BlockQueueSink;
34 class ConsumeBlockQueueSource;
35 
36 /*!
37  * A BlockQueue is a thread-safe queue used to hand-over Block objects between
38  * threads. It is currently used by the Multiplexer to queue received Blocks and
39  * deliver them (later) to their destination.
40  *
41  * The BlockQueue itself is also a BlockSink (so one can attach a BlockWriter to
42  * it). To read items from the queue, one needs to use a BlockReader
43  * instantiated with a BlockQueueSource. Both are easily available via
44  * GetWriter() and GetReader(). Each block is available only *once* via the
45  * BlockQueueSource.
46  */
47 class BlockQueue final : public BlockSink
48 {
49 public:
50  static constexpr bool debug = false;
51 
55 
57 
58  //! Constructor from BlockPool
60  size_t dia_id,
61  const CloseCallback& close_callback = CloseCallback());
62 
63  //! non-copyable: delete copy-constructor
64  BlockQueue(const BlockQueue&) = delete;
65  //! non-copyable: delete assignment operator
66  BlockQueue& operator = (const BlockQueue&) = delete;
67  //! move-constructor: default
68  BlockQueue(BlockQueue&&) = default;
69  //! move-assignment operator: default
70  BlockQueue& operator = (BlockQueue&&) = default;
71 
72  void AppendBlock(const Block& b, bool /* is_last_block */) final {
73  LOG << "BlockQueue::AppendBlock() " << b;
74  item_counter_ += b.num_items();
75  byte_counter_ += b.size();
77  queue_.emplace(b);
78  }
79  void AppendBlock(Block&& b, bool /* is_last_block */) final {
80  LOG << "BlockQueue::AppendBlock() move " << b;
81  item_counter_ += b.num_items();
82  byte_counter_ += b.size();
84  queue_.emplace(std::move(b));
85  }
86 
87  //! Close called by BlockWriter.
88  void Close() final;
89 
90  static constexpr bool allocate_can_fail_ = false;
91 
92  Block Pop() {
93  if (read_closed_) return Block();
94  Block b;
95  queue_.pop(b);
96  read_closed_ = !b.IsValid();
97  return b;
98  }
99 
100  //! change dia_id after construction (needed because it may be unknown at
101  //! construction)
102  void set_dia_id(size_t dia_id) {
103  file_.set_dia_id(dia_id);
104  }
105 
106  //! set the close callback
108  close_callback_ = cb;
109  }
110 
111  //! check if writer side Close() was called.
112  bool write_closed() const { return write_closed_; }
113 
114  bool empty() const { return queue_.empty(); }
115 
116  //! check if reader side has returned a closing sentinel block
117  bool read_closed() const { return read_closed_; }
118 
119  //! return number of block in the queue. Use this ONLY for DEBUGGING!
120  size_t size() { return queue_.size() - (write_closed() ? 1 : 0); }
121 
122  //! Returns item_counter_
123  size_t item_counter() const { return item_counter_; }
124  //! Returns byte_counter_
125  size_t byte_counter() const { return byte_counter_; }
126  //! Returns block_counter_
127  size_t block_counter() const { return block_counter_; }
128  //! Returns timespan_
129  const common::StatsTimer& timespan() const { return timespan_; }
130 
131  //! Return a BlockWriter delivering to this BlockQueue.
132  Writer GetWriter(size_t block_size = default_block_size);
133 
134  //! return BlockReader specifically for a BlockQueue
135  ConsumeReader GetConsumeReader(size_t local_worker_id);
136 
137  //! return polymorphic BlockSource variant
138  DynBlockSource GetBlockSource(bool consume, size_t local_worker_id);
139 
140  //! return polymorphic BlockReader variant
141  Reader GetReader(bool consume, size_t local_worker_id);
142 
143 private:
145 
147 
148  //! whether Pop() has returned a closing Block; hence, if we received the
149  //! close message from the writer
150  bool read_closed_ = false;
151 
152  //! number of items transfered by the Queue
153  size_t item_counter_ = 0;
154  //! number of bytes transfered by the Queue
155  size_t byte_counter_ = 0;
156  //! number of blocks transfered by the Queue
157  size_t block_counter_ = 0;
158  //! timespan of existance
160 
161  //! File to cache blocks for implementing CacheBlockQueueSource.
163 
164  //! callback to issue when the writer closes the Queue -- for delivering
165  //! stats
167 
168  //! for access to file_
169  friend class CacheBlockQueueSource;
170 };
171 
172 /*!
173  * BlockSink which interfaces to a File
174  */
175 class BlockQueueSink final : public BlockSink
176 {
177  static constexpr bool debug = false;
178 
179 public:
181  : BlockSink(nullptr, -1), queue_(nullptr)
182  { }
183 
184  explicit BlockQueueSink(BlockQueue* queue)
185  : BlockSink(queue->block_pool(), queue->local_worker_id()),
186  queue_(std::move(queue)) {
187  LOG << "BlockQueueSink() new for " << queue;
188  }
189 
190  //! default copy-constructor
191  BlockQueueSink(const BlockQueueSink&) = default;
192  //! default assignment operator
193  BlockQueueSink& operator = (const BlockQueueSink&) = default;
194 
196  LOG << "~BlockQueueSink() for " << queue_;
197  }
198 
199  //! \name Methods of a BlockSink
200  //! \{
201 
202  //! Append a block to this file, the block must contain given number of
203  //! items after the offset first.
204  void AppendBlock(const Block& b, bool is_last_block) final {
205  assert(queue_);
206  return queue_->AppendBlock(b, is_last_block);
207  }
208 
209  //! Append a block to this file, the block must contain given number of
210  //! items after the offset first.
211  void AppendBlock(Block&& b, bool is_last_block) final {
212  assert(queue_);
213  return queue_->AppendBlock(std::move(b), is_last_block);
214  }
215 
216  void Close() final {
217  if (queue_) {
218  queue_->Close();
219  queue_ = nullptr;
220  }
221  }
222 
223  //! \}
224 
225 private:
227 };
228 
229 /*!
230  * A BlockSource to read Block from a BlockQueue using a BlockReader. Each Block
231  * is *taken* from the BlockQueue, hence the BlockQueue can be read only once!
232  */
234 {
235  static constexpr bool debug = BlockQueue::debug;
236 
237 public:
238  //! Start reading from a BlockQueue
239  explicit ConsumeBlockQueueSource(BlockQueue& queue, size_t local_worker_id);
240 
241  void Prefetch(size_t /* prefetch */);
242 
243  //! Advance to next block of file, delivers current_ and end_ for
244  //! BlockReader. Returns false if the source is empty.
245  PinnedBlock NextBlock();
246 
247 private:
248  //! BlockQueue that blocks are retrieved from
250 
251  //! local worker id of the thread _reading_ the BlockQueue
253 };
254 
255 /*!
256  * A BlockSource to read Blocks from a BlockQueue using a BlockReader, and at
257  * the same time CACHE all items received. All Blocks read from the BlockQueue
258  * are saved in the cache File. If the cache BlockQueue is initially already
259  * closed, then Blocks are read from the File instead.
260  */
262 {
263  static constexpr bool debug = BlockQueue::debug;
264 
265 public:
266  //! Start reading from a BlockQueue
267  explicit CacheBlockQueueSource(BlockQueue* queue, size_t local_worker_id);
268 
269  //! non-copyable: delete copy-constructor
271  //! non-copyable: delete assignment operator
273  //! move-constructor: default
275 
276  void Prefetch(size_t /* prefetch */);
277 
278  //! Return next block for BlockQueue, store into caching File and return it.
279  PinnedBlock NextBlock();
280 
281  //! Consume remaining blocks and cache them in the File.
283 
284 private:
285  //! Reference to BlockQueue
287 
288  //! local worker id of the thread _reading_ the BlockQueue
290 };
291 
292 //! \}
293 
294 } // namespace data
295 } // namespace thrill
296 
297 #endif // !THRILL_DATA_BLOCK_QUEUE_HEADER
298 
299 /******************************************************************************/
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
size_t item_counter_
number of items transfered by the Queue
BlockQueueSink(BlockQueue *queue)
void set_dia_id(size_t dia_id)
BlockQueue * queue_
Reference to BlockQueue.
size_t byte_counter_
number of bytes transfered by the Queue
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
size_t block_counter_
number of blocks transfered by the Queue
STL namespace.
friend class CacheBlockQueueSource
for access to file_
ConsumeReader GetConsumeReader(size_t local_worker_id)
return BlockReader specifically for a BlockQueue
Definition: block_queue.cpp:47
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
common::ConcurrentBoundedQueue< Block > queue_
File file_
File to cache blocks for implementing CacheBlockQueueSource.
void set_dia_id(size_t dia_id)
Definition: file.hpp:252
void AppendBlock(const Block &b, bool is_last_block) final
static constexpr bool debug
Definition: block_queue.hpp:50
BlockSink which interfaces to a File.
size_t local_worker_id_
local worker id of the thread reading the BlockQueue
size_t item_counter() const
Returns item_counter_.
common::StatsTimerStart timespan_
timespan of existance
common::AtomicMovable< bool > write_closed_
void AppendBlock(Block &&b, bool is_last_block) final
static constexpr bool allocate_can_fail_
Definition: block_queue.hpp:90
Reader GetReader(bool consume, size_t local_worker_id)
return polymorphic BlockReader variant
Definition: block_queue.cpp:85
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
BlockQueue & queue_
BlockQueue that blocks are retrieved from.
size_t local_worker_id() const
local worker id to associate pinned block with
Definition: block_sink.hpp:94
Writer GetWriter(size_t block_size=default_block_size)
Return a BlockWriter delivering to this BlockQueue.
Definition: block_queue.cpp:43
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
This is a queue, similar to std::queue and tbb::concurrent_bounded_queue, except that it uses mutexes...
tlx::delegate< void(BlockQueue &)> CloseCallback
Definition: block_queue.hpp:56
BlockPool * block_pool() const
Returns block_pool_.
Definition: block_sink.hpp:69
size_t block_counter() const
Returns block_counter_.
size_t local_worker_id_
local worker id of the thread reading the BlockQueue
void set_close_callback(const CloseCallback &cb)
set the close callback
void AppendBlock(Block &&b, bool) final
Appends the (unpinned) Block.
Definition: block_queue.hpp:79
bool write_closed() const
check if writer side Close() was called.
CloseCallback close_callback_
bool read_closed() const
check if reader side has returned a closing sentinel block
size_t byte_counter() const
Returns byte_counter_.
BlockQueue & operator=(const BlockQueue &)=delete
non-copyable: delete assignment operator
void AppendBlock(const Block &b, bool) final
Appends the (unpinned) Block.
Definition: block_queue.hpp:72
void Close() final
Closes the sink. Must not be called multiple times.
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
Definition: block_sink.hpp:28
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
BlockQueue(BlockPool &block_pool, size_t local_worker_id, size_t dia_id, const CloseCallback &close_callback=CloseCallback())
Constructor from BlockPool.
Definition: block_queue.cpp:19
const common::StatsTimer & timespan() const
Returns timespan_.
size_t size()
return number of block in the queue. Use this ONLY for DEBUGGING!
BlockReader< DynBlockSource > DynBlockReader
Instantiation of BlockReader for reading from the polymorphic source.
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
DynBlockSource GetBlockSource(bool consume, size_t local_worker_id)
return polymorphic BlockSource variant
Definition: block_queue.cpp:52
bool IsValid() const
Return whether the enclosed ByteBlock is valid.
Definition: block.hpp:74
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
Definition: block_queue.hpp:47
A BlockSource to read Blocks from a BlockQueue using a BlockReader, and at the same time CACHE all it...
A BlockSource to read Block from a BlockQueue using a BlockReader.
void Close() final
Close called by BlockWriter.
Definition: block_queue.cpp:28
This is the actual BlockSource used to instantiate BlockReader.