Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
mix_stream.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/mix_stream.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
13 
17 
19 
20 #include <algorithm>
21 #include <map>
22 #include <vector>
23 
24 namespace thrill {
25 namespace data {
26 
27 MixStreamData::MixStreamData(Multiplexer& multiplexer, size_t send_size_limit,
28  const StreamId& id,
29  size_t local_worker_id, size_t dia_id)
30  : StreamData(multiplexer, send_size_limit, id, local_worker_id, dia_id),
31  seq_(num_workers()),
32  queue_(multiplexer_.block_pool_, num_workers(),
33  local_worker_id, dia_id) {
35 }
36 
38  LOG << "~MixStreamData() deleted";
39 }
40 
41 void MixStreamData::set_dia_id(size_t dia_id) {
42  dia_id_ = dia_id;
43  queue_.set_dia_id(dia_id);
44 }
45 
47  size_t hard_ram_limit = multiplexer_.block_pool_.hard_ram_limit();
48  size_t block_size_base = hard_ram_limit / 4
50  size_t block_size = tlx::round_down_to_power_of_two(block_size_base);
51  if (block_size == 0 || block_size > default_block_size)
52  block_size = default_block_size;
53 
54  {
55  std::unique_lock<std::mutex> lock(multiplexer_.mutex_);
60  }
61 
62  LOGC(my_worker_rank() == 0 && 1)
63  << "MixStreamData::GetWriters()"
64  << " hard_ram_limit=" << hard_ram_limit
65  << " block_size_base=" << block_size_base
66  << " block_size=" << block_size
67  << " active_streams=" << multiplexer_.active_streams_
68  << " max_active_streams=" << multiplexer_.max_active_streams_;
69 
70  tx_timespan_.StartEventually();
71 
72  Writers result(my_worker_rank());
73  result.reserve(num_workers());
74 
75  for (size_t host = 0; host < num_hosts(); ++host) {
76  for (size_t worker = 0; worker < workers_per_host(); ++worker) {
77  if (host == my_host_rank()) {
78  // construct loopback queue writer
79  auto target_stream_ptr = multiplexer_.MixLoopback(id_, worker);
80  result.emplace_back(
81  StreamSink(
82  StreamDataPtr(this),
84  target_stream_ptr,
85  id_,
87  host, worker),
88  block_size);
89  }
90  else {
91  result.emplace_back(
92  StreamSink(
93  StreamDataPtr(this),
97  id_,
99  host, worker),
100  block_size);
101  }
102  }
103  }
104 
105  assert(result.size() == num_workers());
106  return result;
107 }
108 
110  rx_timespan_.StartEventually();
111  return MixReader(queue_, consume, local_worker_id_);
112 }
113 
115  return GetMixReader(consume);
116 }
117 
119  if (is_closed_) return;
120  is_closed_ = true;
121 
122  // wait for all close packets to arrive.
123  for (size_t i = 0; i < num_hosts() * workers_per_host(); ++i) {
124  LOG << "MixStreamData::Close() wait for closing block"
125  << " local_worker_id_=" << local_worker_id_
126  << " remaining=" << sem_closing_blocks_.value();
128  }
129 
130  tx_lifetime_.StopEventually();
131  tx_timespan_.StopEventually();
132  OnAllClosed("MixStreamData");
133 
134  {
135  std::unique_lock<std::mutex> lock(multiplexer_.mutex_);
138  }
139 
140  LOG << "MixStreamData::Close() finished"
141  << " id_=" << id_
142  << " local_worker_id_=" << local_worker_id_;
143 }
144 
145 bool MixStreamData::closed() const {
146  if (is_closed_) return true;
147  bool closed = true;
148  closed = closed && queue_.write_closed();
149  return closed;
150 }
151 
152 struct MixStreamData::SeqReordering {
153  //! current top sequence number
154  uint32_t seq_ = 0;
155 
156  //! queue of waiting Blocks, ordered by sequence number
157  std::map<uint32_t, PinnedBlock> waiting_;
158 };
159 
160 void MixStreamData::OnStreamBlock(size_t from, uint32_t seq, PinnedBlock&& b) {
161  assert(from < num_workers());
162  rx_timespan_.StartEventually();
163 
164  rx_net_items_ += b.num_items();
165  rx_net_bytes_ += b.size();
166  rx_net_blocks_++;
167 
168  sLOG << "MixStreamData::OnStreamBlock" << b
169  << "stream" << id_
170  << "from" << from
171  << "for worker" << my_worker_rank();
172 
173  if (TLX_UNLIKELY(seq != seq_[from].seq_)) {
174  // sequence mismatch: put into queue
175  die_unless(seq >= seq_[from].seq_);
176 
177  seq_[from].waiting_.insert(
178  std::make_pair(seq, std::move(b)));
179 
180  return;
181  }
182 
183  OnStreamBlockOrdered(from, std::move(b));
184 
185  // try to process additional queued blocks
186  while (!seq_[from].waiting_.empty() &&
187  seq_[from].waiting_.begin()->first == seq_[from].seq_)
188  {
189  sLOG << "MixStreamData::OnStreamBlock"
190  << "processing delayed block with seq"
191  << seq_[from].waiting_.begin()->first;
192 
194  from, std::move(seq_[from].waiting_.begin()->second));
195 
196  seq_[from].waiting_.erase(
197  seq_[from].waiting_.begin());
198  }
199 }
200 
202  // sequence number matches
203  if (b.IsValid()) {
204  queue_.AppendBlock(from, std::move(b).MoveToBlock());
205  }
206  else {
207  sLOG << "MixStreamData::OnCloseStream"
208  << "stream" << id_
209  << "from" << from
210  << "for worker" << my_worker_rank()
211  << "remaining_closing_blocks_" << remaining_closing_blocks_;
212 
213  queue_.Close(from);
214 
215  die_unless(remaining_closing_blocks_ > 0);
216  if (--remaining_closing_blocks_ == 0) {
217  rx_lifetime_.StopEventually();
218  rx_timespan_.StopEventually();
219  }
220 
222  }
223 
224  seq_[from].seq_++;
225 }
226 
227 /******************************************************************************/
228 // MixStream
229 
231  : ptr_(ptr) { }
232 
234  ptr_->Close();
235 }
236 
237 const StreamId& MixStream::id() const {
238  return ptr_->id();
239 }
240 
242  return *ptr_;
243 }
244 
245 const StreamData& MixStream::data() const {
246  return *ptr_;
247 }
248 
250  return ptr_->GetWriters();
251 }
252 
254  return ptr_->GetMixReader(consume);
255 }
256 
258  return ptr_->GetReader(consume);
259 }
260 
261 } // namespace data
262 } // namespace thrill
263 
264 /******************************************************************************/
std::atomic< size_t > rx_net_items_
void Close(size_t src)
append closing sentinel block from src (also delivered via the network).
common::StatsTimerStopped tx_timespan_
Timers from first rx / tx package until rx / tx direction is closed.
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
Base class for common structures for ConcatStream and MixedStream.
Definition: stream_data.hpp:45
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
size_t value() const
return the current value – should only be used for debugging.
Definition: semaphore.hpp:69
void set_dia_id(size_t dia_id)
StreamData & data() final
Return stream data reference.
Definition: mix_stream.cpp:241
size_t workers_per_host() const
Returns workers_per_host.
Definition: stream_data.hpp:97
MixStreamDataPtr ptr_
Definition: mix_stream.hpp:153
#define die_unless(X)
Definition: die.hpp:27
void Close() final
shuts the stream down.
Definition: mix_stream.cpp:118
std::mutex mutex_
protects critical sections
MixBlockQueue queue_
BlockQueue to store incoming Blocks with source.
Definition: mix_stream.hpp:97
int round_down_to_power_of_two(int i)
does what it says: round down to next power of two
size_t hard_ram_limit() noexcept
Hard limit on amount of memory used for ByteBlock.
Definition: block_pool.cpp:886
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
MixStreamDataPtr MixLoopback(size_t stream_id, size_t to_worker_id)
void OnStreamBlockOrdered(size_t from, PinnedBlock &&b)
called to process PinnedBlock in sequence
Definition: mix_stream.cpp:201
size_t wait(size_t delta=1)
Definition: semaphore.hpp:60
size_t num_hosts() const
Number of hosts in system.
Definition: stream_data.hpp:92
bool is_closed_
flag if Close() was completed
Definition: mix_stream.hpp:89
std::atomic< size_t > rx_net_bytes_
Multiplexer & multiplexer_
reference to multiplexer
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
MixStream(const MixStreamDataPtr &ptr)
Definition: mix_stream.cpp:230
MixBlockQueueReader MixReader
Definition: mix_stream.hpp:49
size_t my_host_rank() const
Returns my_host_rank.
Definition: stream_data.hpp:90
virtual Connection & connection(size_t id)=0
Return Connection to client id.
tlx::CountingPtr< StreamData > StreamDataPtr
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
std::atomic< size_t > remaining_closing_blocks_
std::atomic< size_t > max_active_streams_
maximu number of active Cat/MixStreams
size_t num_workers() const
total number of workers.
Definition: multiplexer.hpp:98
MixReader GetReader(bool consume)
Open a MixReader (function name matches a method in File and CatStream).
Definition: mix_stream.cpp:114
common::StatsTimerStart tx_lifetime_
Timers from creation of stream until rx / tx direction is closed.
const StreamId & id() const final
Return stream id.
Definition: mix_stream.cpp:237
MixReader GetMixReader(bool consume)
Definition: mix_stream.cpp:253
Writers GetWriters() final
Definition: mix_stream.cpp:46
void IntReleaseMixStream(size_t id, size_t local_worker_id)
release pointer onto a MixStream object
MixReader GetReader(bool consume)
Open a MixReader (function name matches a method in File and CatStream).
Definition: mix_stream.cpp:257
size_t my_worker_rank() const
Returns my_worker_rank_.
Definition: stream_data.hpp:99
common::StatsTimerStopped rx_timespan_
MixStreamData(Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Creates a new stream instance.
Definition: mix_stream.cpp:27
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
Reader to retrieve items in unordered sequence from a MixBlockQueue.
std::vector< SeqReordering > seq_
Block Sequence numbers.
Definition: mix_stream.hpp:91
size_t signal()
Definition: semaphore.hpp:44
bool closed() const final
Definition: mix_stream.cpp:145
tlx::Semaphore sem_closing_blocks_
number of received stream closing Blocks.
void AppendBlock(size_t src, const Block &block)
append block delivered via the network from src.
StreamId id_
our own stream id.
void OnAllClosed(const char *stream_type)
Definition: stream_data.cpp:35
size_t dia_id_
Associated DIANode id.
size_t workers_per_host() const
number of workers per host
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
size_t StreamId
Definition: stream_data.hpp:32
void OnStreamBlock(size_t from, uint32_t seq, PinnedBlock &&b)
called from Multiplexer when there is a new Block for this Stream.
Definition: mix_stream.cpp:160
std::atomic< size_t > rx_net_blocks_
BlockPool & block_pool_
reference to host-global BlockPool.
MixReader GetMixReader(bool consume)
Creates a BlockReader which mixes items from all workers.
Definition: mix_stream.cpp:109
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
std::atomic< size_t > active_streams_
number of active Cat/MixStreams
size_t num_workers() const
Number of workers in system.
Definition: stream_data.hpp:94
bool write_closed() const
check if writer side Close() was called.
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
common::StatsTimerStart rx_lifetime_
void set_dia_id(size_t dia_id)
Definition: mix_stream.cpp:41