Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
mix_block_queue.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/mix_block_queue.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
13 
14 #include <vector>
15 
16 namespace thrill {
17 namespace data {
18 
19 /******************************************************************************/
20 // MixBlockQueue
21 
22 MixBlockQueue::MixBlockQueue(BlockPool& block_pool, size_t num_workers,
23  size_t local_worker_id, size_t dia_id)
24  : block_pool_(block_pool),
25  local_worker_id_(local_worker_id),
26  num_workers_(num_workers),
27  write_closed_(num_workers) {
28  queues_.reserve(num_workers);
29  for (size_t w = 0; w < num_workers; ++w) {
30  queues_.emplace_back(block_pool_, local_worker_id, dia_id);
31  }
32 }
33 
34 void MixBlockQueue::set_dia_id(size_t dia_id) {
35  for (size_t i = 0; i < queues_.size(); ++i) {
36  queues_[i].set_dia_id(dia_id);
37  }
38 }
39 
40 void MixBlockQueue::AppendBlock(size_t src, const Block& block) {
41  LOG << "MixBlockQueue::AppendBlock"
42  << " src=" << src << " block=" << block;
43  mix_queue_.emplace(SrcBlockPair { src, block });
44 }
45 
46 void MixBlockQueue::AppendBlock(size_t src, Block&& block) {
47  LOG << "MixBlockQueue::AppendBlock"
48  << " src=" << src << " block=" << block;
49  mix_queue_.emplace(SrcBlockPair { src, std::move(block) });
50 }
51 
52 void MixBlockQueue::Close(size_t src) {
53  LOG << "MixBlockQueue::Close()"
54  << " src=" << src
55  << " local_worker_id_=" << local_worker_id_
56  << " --write_open_count_=" << write_open_count_ - 1;
57  assert(!write_closed_[src]);
58  write_closed_[src] = true;
60 
61  // enqueue a closing Block.
62  mix_queue_.emplace(SrcBlockPair { src, Block() });
63 }
64 
66  if (read_open_ == 0)
67  return SrcBlockPair {
68  size_t(-1), Block()
69  };
70  SrcBlockPair b;
71  mix_queue_.pop(b);
72  if (!b.block.IsValid()) {
73  LOG << "MixBlockQueue()"
74  << " read_open_ " << read_open_ << " -> " << read_open_ - 1;
75  --read_open_;
76  }
77  return b;
78 }
79 
80 /******************************************************************************/
81 // MixBlockQueueReader
82 
84  MixBlockQueue& mix_queue, bool consume, size_t local_worker_id)
85  : mix_queue_(mix_queue),
86  reread_(mix_queue.read_closed()) {
87 
88  if (!reread_) {
91 
92  for (size_t w = 0; w < mix_queue_.num_workers_; ++w) {
93  readers_.emplace_back(
94  mix_queue_.queues_[w].GetReader(consume, local_worker_id));
95  }
96  }
97  else {
98  // construct vector of BlockSources to read from queues_.
99  std::vector<DynBlockSource> result;
100  for (size_t w = 0; w < mix_queue_.num_workers_; ++w) {
101  result.emplace_back(mix_queue_.queues_[w].GetBlockSource(
102  consume, local_worker_id));
103  }
104  // move BlockQueueSources into concatenation BlockSource, and to Reader.
105  cat_reader_ = CatBlockReader(CatBlockSource(std::move(result)));
106  }
107 }
108 
110  // TODO(tb)
111 }
112 
114  // no full item available: get next block from mix queue
115  while (available_ == 0) {
116 
118  LOG << "MixBlockQueueReader::PullBlock()"
119  << " still open_=" << open_
120  << " src=" << src_blk.src << " block=" << src_blk.block
121  << " selected_=" << selected_
122  << " available_=" << available_
123  << " available_at_[src]=" << available_at_[src_blk.src];
124 
125  assert(src_blk.src < readers_.size());
126 
127  if (src_blk.block.IsValid()) {
128  // block for this reader.
129  selected_ = src_blk.src;
130 
131  size_t num_items = src_blk.block.num_items();
132 
133  // save block with data for reader
134  mix_queue_.queues_[src_blk.src].AppendBlock(
135  std::move(src_blk.block), /* is_last_block */ false);
136 
137  // add available items: one less than in the blocks.
138  available_at_[src_blk.src] += num_items;
139  available_ = available_at_[src_blk.src] - 1;
140  available_at_[src_blk.src] -= available_;
141  }
142  else {
143  // close block received: maybe get last item
144  assert(open_ > 0);
145  --open_;
146 
147  // save block with data for reader
148  mix_queue_.queues_[src_blk.src].AppendBlock(
149  std::move(src_blk.block), /* is_last_block */ false);
150 
151  // check if we can still read the last item
152  if (available_at_[src_blk.src]) {
153  assert(available_at_[src_blk.src] == 1);
154  selected_ = src_blk.src;
155  available_ = available_at_[src_blk.src];
156  available_at_[src_blk.src] -= available_;
157  }
158  else if (open_ == 0) return false;
159  }
160 
161  LOG << "MixBlockQueueReader::PullBlock() afterwards"
162  << " selected_=" << selected_
163  << " available_=" << available_
164  << " available_at_[src]=" << available_at_[src_blk.src];
165  }
166  return true;
167 }
168 
169 } // namespace data
170 } // namespace thrill
171 
172 /******************************************************************************/
data::CatBlockSource< DynBlockSource > CatBlockSource
void Close(size_t src)
append closing sentinel block from src (also delivered via the network).
MixBlockQueue(BlockPool &block_pool, size_t num_workers, size_t local_worker_id, size_t dia_id)
Constructor from BlockPool.
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
std::vector< unsigned char > write_closed_
flag to test for closed sources
void set_dia_id(size_t dia_id)
std::vector< BlockQueue > queues_
BlockQueues to deliver blocks to from mix queue.
MixBlockQueueReader(MixBlockQueue &mix_queue, bool consume, size_t local_worker_id)
pair of (source worker, Block) stored in the main mix queue.
size_t available_
number of available items on the selected reader
size_t num_items() const
return number of items beginning in this block
Definition: block.hpp:85
size_t selected_
reader currently selected
size_t open_
number of readers still open
~MixBlockQueueReader()
Possibly consume unread blocks.
bool IsValid() const
Return whether the enclosed ByteBlock is valid.
Definition: block.hpp:74
Implements reading an unordered sequence of items from multiple workers, which sends Blocks...
MixBlockQueue & mix_queue_
reference to mix queue
std::vector< BlockQueue::Reader > readers_
sub-readers for each block queue in mix queue
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
size_t num_workers_
total number of workers in system.
void AppendBlock(size_t src, const Block &block)
append block delivered via the network from src.
BlockReader< CatBlockSource > CatBlockReader
SrcBlockPair Pop()
Blocking retrieval of a (source,block) pair.
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
common::AtomicMovable< size_t > write_open_count_
counter on number of writers still open.
common::ConcurrentBoundedQueue< SrcBlockPair > mix_queue_
the main mix queue, containing the block in the reception order.