Thrill  0.1
mix_stream.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/mix_stream.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
13 
17 
19 
20 #include <algorithm>
21 #include <map>
22 #include <vector>
23 
24 namespace thrill {
25 namespace data {
26 
28  Multiplexer& multiplexer, size_t send_size_limit,
29  const StreamId& id,
30  size_t local_worker_id, size_t dia_id)
31  : StreamData(stream_set_base, multiplexer,
32  send_size_limit, id, local_worker_id, dia_id),
33  seq_(num_workers()),
34  queue_(multiplexer_.block_pool_, num_workers(),
35  local_worker_id, dia_id) {
37 }
38 
40  LOG << "~MixStreamData() deleted";
41 }
42 
43 void MixStreamData::set_dia_id(size_t dia_id) {
44  dia_id_ = dia_id;
45  queue_.set_dia_id(dia_id);
46 }
47 
49  return "MixStream";
50 }
51 
53  size_t hard_ram_limit = multiplexer_.block_pool_.hard_ram_limit();
54  size_t block_size_base = hard_ram_limit / 4
56  size_t block_size = tlx::round_down_to_power_of_two(block_size_base);
57  if (block_size == 0 || block_size > default_block_size)
58  block_size = default_block_size;
59 
60  {
61  std::unique_lock<std::mutex> lock(multiplexer_.mutex_);
66  }
67 
68  LOGC(my_worker_rank() == 0 && 0)
69  << "MixStreamData::GetWriters()"
70  << " hard_ram_limit=" << hard_ram_limit
71  << " block_size_base=" << block_size_base
72  << " block_size=" << block_size
73  << " active_streams=" << multiplexer_.active_streams_
74  << " max_active_streams=" << multiplexer_.max_active_streams_;
75 
76  tx_timespan_.StartEventually();
77 
78  Writers result(my_worker_rank());
79  result.reserve(num_workers());
80 
81  for (size_t host = 0; host < num_hosts(); ++host) {
82  for (size_t worker = 0; worker < workers_per_host(); ++worker) {
83  if (host == my_host_rank()) {
84  // construct loopback queue writer
85  auto target_stream_ptr = multiplexer_.MixLoopback(id_, worker);
86  result.emplace_back(
87  StreamSink(
88  StreamDataPtr(this),
90  target_stream_ptr,
91  id_,
93  host, worker),
94  block_size);
95  }
96  else {
97  result.emplace_back(
98  StreamSink(
99  StreamDataPtr(this),
103  id_,
105  host, worker),
106  block_size);
107  }
108  }
109  }
110 
111  assert(result.size() == num_workers());
112  return result;
113 }
114 
116  rx_timespan_.StartEventually();
117  return MixReader(queue_, consume, local_worker_id_);
118 }
119 
121  return GetMixReader(consume);
122 }
123 
125  if (is_closed_) return;
126  is_closed_ = true;
127 
128  // wait for all close packets to arrive.
129  for (size_t i = 0; i < num_hosts() * workers_per_host(); ++i) {
130  LOG << "MixStreamData::Close() wait for closing block"
131  << " local_worker_id_=" << local_worker_id_
132  << " remaining=" << sem_closing_blocks_.value();
134  }
135 
137 
138  {
139  std::unique_lock<std::mutex> lock(multiplexer_.mutex_);
142  }
143 
144  LOG << "MixStreamData::Close() finished"
145  << " id_=" << id_
146  << " local_worker_id_=" << local_worker_id_;
147 }
148 
149 bool MixStreamData::closed() const {
150  if (is_closed_) return true;
151  bool closed = true;
152  closed = closed && queue_.write_closed();
153  return closed;
154 }
155 
157  return queue_.is_queue_closed(from);
158 }
159 
160 struct MixStreamData::SeqReordering {
161  //! current top sequence number
162  uint32_t seq_ = 0;
163 
164  //! queue of waiting Blocks, ordered by sequence number
165  std::map<uint32_t, Block> waiting_;
166 };
167 
168 void MixStreamData::OnStreamBlock(size_t from, uint32_t seq, Block&& b) {
169  assert(from < num_workers());
170  rx_timespan_.StartEventually();
171 
172  sLOG << "MixStreamData::OnStreamBlock" << b
173  << "stream" << id_
174  << "from" << from
175  << "for worker" << my_worker_rank();
176 
177  if (TLX_UNLIKELY(seq != seq_[from].seq_ &&
179  // sequence mismatch: put into queue
180  die_unless(seq >= seq_[from].seq_);
181 
182  seq_[from].waiting_.insert(std::make_pair(seq, std::move(b)));
183 
184  return;
185  }
186 
187  OnStreamBlockOrdered(from, std::move(b));
188 
189  // try to process additional queued blocks
190  while (!seq_[from].waiting_.empty() &&
191  (seq_[from].waiting_.begin()->first == seq_[from].seq_ ||
192  seq_[from].waiting_.begin()->first == StreamMultiplexerHeader::final_seq))
193  {
194  sLOG << "MixStreamData::OnStreamBlock"
195  << "processing delayed block with seq"
196  << seq_[from].waiting_.begin()->first;
197 
199  from, std::move(seq_[from].waiting_.begin()->second));
200 
201  seq_[from].waiting_.erase(
202  seq_[from].waiting_.begin());
203  }
204 }
205 
207  // sequence number matches
208  if (b.IsValid()) {
209  rx_net_items_ += b.num_items();
210  rx_net_bytes_ += b.size();
211  rx_net_blocks_++;
212 
213  queue_.AppendBlock(from, std::move(b));
214  }
215  else {
216  sLOG << "MixStreamData::OnCloseStream"
217  << "stream" << id_
218  << "from" << from
219  << "for worker" << my_worker_rank()
220  << "remaining_closing_blocks_" << remaining_closing_blocks_;
221 
222  queue_.Close(from);
223 
224  die_unless(remaining_closing_blocks_ > 0);
225  if (--remaining_closing_blocks_ == 0) {
226  rx_lifetime_.StopEventually();
227  rx_timespan_.StopEventually();
228  }
229 
231  }
232 
233  seq_[from].seq_++;
234 }
235 
236 /******************************************************************************/
237 // MixStream
238 
240  : ptr_(ptr) { }
241 
243  ptr_->Close();
244 }
245 
246 const StreamId& MixStream::id() const {
247  return ptr_->id();
248 }
249 
251  return *ptr_;
252 }
253 
254 const StreamData& MixStream::data() const {
255  return *ptr_;
256 }
257 
259  return ptr_->GetWriters();
260 }
261 
263  return ptr_->GetMixReader(consume);
264 }
265 
267  return ptr_->GetReader(consume);
268 }
269 
270 } // namespace data
271 } // namespace thrill
272 
273 /******************************************************************************/
std::atomic< size_t > rx_net_items_
bool all_writers_closed_
bool if all writers were closed
void Close(size_t src)
append closing sentinel block from src (also delivered via the network).
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
common::StatsTimerStopped tx_timespan_
Timers from first rx / tx package until rx / tx direction is closed.
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
Base class for common structures for ConcatStream and MixedStream.
Definition: stream_data.hpp:46
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
size_t value() const
return the current value – should only be used for debugging.
Definition: semaphore.hpp:79
void set_dia_id(size_t dia_id)
StreamData & data() final
Return stream data reference.
Definition: mix_stream.cpp:250
MixStreamDataPtr ptr_
Definition: mix_stream.hpp:160
#define die_unless(X)
Definition: die.hpp:27
void Close() final
shuts the stream down.
Definition: mix_stream.cpp:124
std::mutex mutex_
protects critical sections
MixBlockQueue queue_
BlockQueue to store incoming Blocks with source.
Definition: mix_stream.hpp:104
size_t hard_ram_limit() noexcept
Hard limit on amount of memory used for ByteBlock.
Definition: block_pool.cpp:886
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
size_t my_worker_rank() const
Returns my_worker_rank_.
MixStreamDataPtr MixLoopback(size_t stream_id, size_t to_worker_id)
bool is_closed_
flag if Close() was completed
Definition: mix_stream.hpp:96
std::atomic< size_t > rx_net_bytes_
Multiplexer & multiplexer_
reference to multiplexer
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
MixStream(const MixStreamDataPtr &ptr)
Definition: mix_stream.cpp:239
MixBlockQueueReader MixReader
Definition: mix_stream.hpp:49
virtual Connection & connection(size_t id)=0
Return Connection to client id.
tlx::CountingPtr< StreamData > StreamDataPtr
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
std::atomic< size_t > remaining_closing_blocks_
std::atomic< size_t > max_active_streams_
maximu number of active Cat/MixStreams
MixReader GetReader(bool consume)
Open a MixReader (function name matches a method in File and CatStream).
Definition: mix_stream.cpp:120
const StreamId & id() const final
Return stream id.
Definition: mix_stream.cpp:246
size_t local_worker_id_
local worker id
MixReader GetMixReader(bool consume)
Definition: mix_stream.cpp:262
Writers GetWriters() final
Definition: mix_stream.cpp:52
void IntReleaseMixStream(size_t id, size_t local_worker_id)
release pointer onto a MixStream object
MixReader GetReader(bool consume)
Open a MixReader (function name matches a method in File and CatStream).
Definition: mix_stream.cpp:266
common::StatsTimerStopped rx_timespan_
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:59
Reader to retrieve items in unordered sequence from a MixBlockQueue.
std::vector< SeqReordering > seq_
Block Sequence numbers.
Definition: mix_stream.hpp:98
bool write_closed() const
check if writer side Close() was called.
size_t num_hosts() const
Number of hosts in system.
Definition: stream_data.hpp:99
bool is_queue_closed(size_t from)
check if inbound queue is closed
Definition: mix_stream.cpp:156
size_t wait(size_t delta=1, size_t slack=0)
Definition: semaphore.hpp:60
size_t signal()
Definition: semaphore.hpp:44
net::Group & group_
Holds NetConnections for outgoing Streams.
size_t workers_per_host() const
number of workers per host
bool closed() const final
Definition: mix_stream.cpp:149
tlx::Semaphore sem_closing_blocks_
number of received stream closing Blocks.
Base class for StreamSet.
void AppendBlock(size_t src, const Block &block)
append block delivered via the network from src.
MixStreamData(StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Creates a new stream instance.
Definition: mix_stream.cpp:27
bool is_queue_closed(size_t src)
check if inbound queue is closed
StreamId id_
our own stream id.
size_t dia_id_
associated DIANode id.
size_t num_workers() const
Number of workers in system.
size_t my_host_rank() const
Returns my_host_rank.
Definition: stream_data.hpp:97
static int round_down_to_power_of_two(int i)
does what it says: round down to next power of two
size_t StreamId
Definition: stream_data.hpp:32
size_t num_workers() const
total number of workers.
Definition: multiplexer.hpp:98
std::atomic< size_t > rx_net_blocks_
BlockPool & block_pool_
reference to host-global BlockPool.
void OnStreamBlockOrdered(size_t from, Block &&b)
called to process PinnedBlock in sequence
Definition: mix_stream.cpp:206
MixReader GetMixReader(bool consume)
Creates a BlockReader which mixes items from all workers.
Definition: mix_stream.cpp:115
void OnStreamBlock(size_t from, uint32_t seq, Block &&b)
called from Multiplexer when there is a new Block for this Stream.
Definition: mix_stream.cpp:168
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
const char * stream_type() final
return stream type string
Definition: mix_stream.cpp:48
std::atomic< size_t > active_streams_
number of active Cat/MixStreams
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
size_t workers_per_host() const
Returns workers_per_host.
common::StatsTimerStart rx_lifetime_
static const uint32_t final_seq
final sequence number
void set_dia_id(size_t dia_id)
Definition: mix_stream.cpp:43