Thrill  0.1
mix_block_queue.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/mix_block_queue.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
13 
14 #include <vector>
15 
16 namespace thrill {
17 namespace data {
18 
19 /******************************************************************************/
20 // MixBlockQueue
21 
22 MixBlockQueue::MixBlockQueue(BlockPool& block_pool, size_t num_workers,
23  size_t local_worker_id, size_t dia_id)
24  : block_pool_(block_pool),
25  local_worker_id_(local_worker_id),
26  num_workers_(num_workers),
27  write_closed_(num_workers) {
28  queues_.reserve(num_workers);
29  for (size_t w = 0; w < num_workers; ++w) {
30  queues_.emplace_back(block_pool_, local_worker_id, dia_id);
31  }
32 }
33 
34 void MixBlockQueue::set_dia_id(size_t dia_id) {
35  for (size_t i = 0; i < queues_.size(); ++i) {
36  queues_[i].set_dia_id(dia_id);
37  }
38 }
39 
40 void MixBlockQueue::AppendBlock(size_t src, const Block& block) {
41  LOG << "MixBlockQueue::AppendBlock"
42  << " src=" << src << " block=" << block;
43  mix_queue_.emplace(SrcBlockPair { src, block });
44 }
45 
46 void MixBlockQueue::AppendBlock(size_t src, Block&& block) {
47  LOG << "MixBlockQueue::AppendBlock"
48  << " src=" << src << " block=" << block;
49  mix_queue_.emplace(SrcBlockPair { src, std::move(block) });
50 }
51 
52 void MixBlockQueue::Close(size_t src) {
53  LOG << "MixBlockQueue::Close()"
54  << " src=" << src
55  << " local_worker_id_=" << local_worker_id_
56  << " --write_open_count_=" << write_open_count_ - 1;
57  assert(!write_closed_[src]);
58  write_closed_[src] = true;
60 
61  // enqueue a closing Block.
62  mix_queue_.emplace(SrcBlockPair { src, Block() });
63 }
64 
66  return write_closed_[src];
67 }
68 
70  if (read_open_ == 0)
71  return SrcBlockPair {
72  size_t(-1), Block()
73  };
74  SrcBlockPair b;
75  mix_queue_.pop(b);
76  if (!b.block.IsValid()) {
77  LOG << "MixBlockQueue()"
78  << " read_open_ " << read_open_ << " -> " << read_open_ - 1;
79  --read_open_;
80  }
81  return b;
82 }
83 
84 /******************************************************************************/
85 // MixBlockQueueReader
86 
88  MixBlockQueue& mix_queue, bool consume, size_t local_worker_id)
89  : mix_queue_(mix_queue),
90  reread_(mix_queue.read_closed()) {
91 
92  if (!reread_) {
95 
96  for (size_t w = 0; w < mix_queue_.num_workers_; ++w) {
97  readers_.emplace_back(
98  mix_queue_.queues_[w].GetReader(consume, local_worker_id));
99  }
100  }
101  else {
102  // construct vector of BlockSources to read from queues_.
103  std::vector<DynBlockSource> result;
104  for (size_t w = 0; w < mix_queue_.num_workers_; ++w) {
105  result.emplace_back(mix_queue_.queues_[w].GetBlockSource(
106  consume, local_worker_id));
107  }
108  // move BlockQueueSources into concatenation BlockSource, and to Reader.
109  cat_reader_ = CatBlockReader(CatBlockSource(std::move(result)));
110  }
111 }
112 
114  // TODO(tb)
115 }
116 
118  // no full item available: get next block from mix queue
119  while (available_ == 0) {
120 
122  LOG << "MixBlockQueueReader::PullBlock()"
123  << " still open_=" << open_
124  << " src=" << src_blk.src << " block=" << src_blk.block
125  << " selected_=" << selected_
126  << " available_=" << available_
127  << " available_at_[src]=" << available_at_[src_blk.src];
128 
129  assert(src_blk.src < readers_.size());
130 
131  if (src_blk.block.IsValid()) {
132  // block for this reader.
133  selected_ = src_blk.src;
134 
135  size_t num_items = src_blk.block.num_items();
136 
137  // save block with data for reader
138  mix_queue_.queues_[src_blk.src].AppendBlock(
139  std::move(src_blk.block), /* is_last_block */ false);
140 
141  // add available items: one less than in the blocks.
142  available_at_[src_blk.src] += num_items;
143  available_ = available_at_[src_blk.src] - 1;
144  available_at_[src_blk.src] -= available_;
145  }
146  else {
147  // close block received: maybe get last item
148  assert(open_ > 0);
149  --open_;
150 
151  // save block with data for reader
152  mix_queue_.queues_[src_blk.src].AppendBlock(
153  std::move(src_blk.block), /* is_last_block */ false);
154 
155  // check if we can still read the last item
156  if (available_at_[src_blk.src]) {
157  assert(available_at_[src_blk.src] == 1);
158  selected_ = src_blk.src;
159  available_ = available_at_[src_blk.src];
160  available_at_[src_blk.src] -= available_;
161  }
162  else if (open_ == 0) return false;
163  }
164 
165  LOG << "MixBlockQueueReader::PullBlock() afterwards"
166  << " selected_=" << selected_
167  << " available_=" << available_
168  << " available_at_[src]=" << available_at_[src_blk.src];
169  }
170  return true;
171 }
172 
173 } // namespace data
174 } // namespace thrill
175 
176 /******************************************************************************/
data::CatBlockSource< DynBlockSource > CatBlockSource
void Close(size_t src)
append closing sentinel block from src (also delivered via the network).
MixBlockQueue(BlockPool &block_pool, size_t num_workers, size_t local_worker_id, size_t dia_id)
Constructor from BlockPool.
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
std::vector< unsigned char > write_closed_
flag to test for closed sources
void set_dia_id(size_t dia_id)
std::vector< BlockQueue > queues_
BlockQueues to deliver blocks to from mix queue.
MixBlockQueueReader(MixBlockQueue &mix_queue, bool consume, size_t local_worker_id)
pair of (source worker, Block) stored in the main mix queue.
size_t available_
number of available items on the selected reader
size_t selected_
reader currently selected
size_t open_
number of readers still open
~MixBlockQueueReader()
Possibly consume unread blocks.
Implements reading an unordered sequence of items from multiple workers, which sends Blocks...
MixBlockQueue & mix_queue_
reference to mix queue
std::vector< BlockQueue::Reader > readers_
sub-readers for each block queue in mix queue
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
size_t num_workers_
total number of workers in system.
void AppendBlock(size_t src, const Block &block)
append block delivered via the network from src.
bool is_queue_closed(size_t src)
check if inbound queue is closed
BlockReader< CatBlockSource > CatBlockReader
size_t num_items() const
return number of items beginning in this block
Definition: block.hpp:85
SrcBlockPair Pop()
Blocking retrieval of a (source,block) pair.
bool read_closed() const
check if reader side has returned a closing sentinel block
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
common::AtomicMovable< size_t > write_open_count_
counter on number of writers still open.
bool IsValid() const
Return whether the enclosed ByteBlock is valid.
Definition: block.hpp:74
common::ConcurrentBoundedQueue< SrcBlockPair > mix_queue_
the main mix queue, containing the block in the reception order.