Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
mix_stream.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/mix_stream.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
13 
17 
19 
20 #include <algorithm>
21 #include <map>
22 #include <vector>
23 
24 namespace thrill {
25 namespace data {
26 
27 MixStreamData::MixStreamData(Multiplexer& multiplexer, size_t send_size_limit,
28  const StreamId& id,
29  size_t local_worker_id, size_t dia_id)
30  : StreamData(multiplexer, send_size_limit, id, local_worker_id, dia_id),
31  seq_(num_workers()),
32  queue_(multiplexer_.block_pool_, num_workers(),
33  local_worker_id, dia_id) {
35 }
36 
38  LOG << "~MixStreamData() deleted";
39 }
40 
41 void MixStreamData::set_dia_id(size_t dia_id) {
42  dia_id_ = dia_id;
43  queue_.set_dia_id(dia_id);
44 }
45 
47  size_t hard_ram_limit = multiplexer_.block_pool_.hard_ram_limit();
48  size_t block_size_base = hard_ram_limit / 4
50  size_t block_size = tlx::round_down_to_power_of_two(block_size_base);
51  if (block_size == 0 || block_size > default_block_size)
52  block_size = default_block_size;
53 
54  {
55  std::unique_lock<std::mutex> lock(multiplexer_.mutex_);
60  }
61 
62  LOGC(my_worker_rank() == 0 && 0)
63  << "MixStreamData::GetWriters()"
64  << " hard_ram_limit=" << hard_ram_limit
65  << " block_size_base=" << block_size_base
66  << " block_size=" << block_size
67  << " active_streams=" << multiplexer_.active_streams_
68  << " max_active_streams=" << multiplexer_.max_active_streams_;
69 
70  tx_timespan_.StartEventually();
71 
72  Writers result(my_worker_rank());
73  result.reserve(num_workers());
74 
75  for (size_t host = 0; host < num_hosts(); ++host) {
76  for (size_t worker = 0; worker < workers_per_host(); ++worker) {
77  if (host == my_host_rank()) {
78  // construct loopback queue writer
79  auto target_stream_ptr = multiplexer_.MixLoopback(id_, worker);
80  result.emplace_back(
81  StreamSink(
82  StreamDataPtr(this),
84  target_stream_ptr,
85  id_,
87  host, worker),
88  block_size);
89  }
90  else {
91  result.emplace_back(
92  StreamSink(
93  StreamDataPtr(this),
97  id_,
99  host, worker),
100  block_size);
101  }
102  }
103  }
104 
105  assert(result.size() == num_workers());
106  return result;
107 }
108 
110  rx_timespan_.StartEventually();
111  return MixReader(queue_, consume, local_worker_id_);
112 }
113 
115  return GetMixReader(consume);
116 }
117 
119  if (is_closed_) return;
120  is_closed_ = true;
121 
122  // wait for all close packets to arrive.
123  for (size_t i = 0; i < num_hosts() * workers_per_host(); ++i) {
124  LOG << "MixStreamData::Close() wait for closing block"
125  << " local_worker_id_=" << local_worker_id_
126  << " remaining=" << sem_closing_blocks_.value();
128  }
129 
130  tx_lifetime_.StopEventually();
131  tx_timespan_.StopEventually();
132  OnAllClosed("MixStreamData");
133 
134  {
135  std::unique_lock<std::mutex> lock(multiplexer_.mutex_);
138  }
139 
140  LOG << "MixStreamData::Close() finished"
141  << " id_=" << id_
142  << " local_worker_id_=" << local_worker_id_;
143 }
144 
145 bool MixStreamData::closed() const {
146  if (is_closed_) return true;
147  bool closed = true;
148  closed = closed && queue_.write_closed();
149  return closed;
150 }
151 
152 struct MixStreamData::SeqReordering {
153  //! current top sequence number
154  uint32_t seq_ = 0;
155 
156  //! queue of waiting Blocks, ordered by sequence number
157  std::map<uint32_t, Block> waiting_;
158 };
159 
160 void MixStreamData::OnStreamBlock(size_t from, uint32_t seq, Block&& b) {
161  assert(from < num_workers());
162  rx_timespan_.StartEventually();
163 
164  rx_net_items_ += b.num_items();
165  rx_net_bytes_ += b.size();
166  rx_net_blocks_++;
167 
168  sLOG << "MixStreamData::OnStreamBlock" << b
169  << "stream" << id_
170  << "from" << from
171  << "for worker" << my_worker_rank();
172 
173  if (TLX_UNLIKELY(seq != seq_[from].seq_)) {
174  // sequence mismatch: put into queue
175  die_unless(seq >= seq_[from].seq_);
176 
177  seq_[from].waiting_.insert(std::make_pair(seq, std::move(b)));
178 
179  return;
180  }
181 
182  OnStreamBlockOrdered(from, std::move(b));
183 
184  // try to process additional queued blocks
185  while (!seq_[from].waiting_.empty() &&
186  seq_[from].waiting_.begin()->first == seq_[from].seq_)
187  {
188  sLOG << "MixStreamData::OnStreamBlock"
189  << "processing delayed block with seq"
190  << seq_[from].waiting_.begin()->first;
191 
193  from, std::move(seq_[from].waiting_.begin()->second));
194 
195  seq_[from].waiting_.erase(
196  seq_[from].waiting_.begin());
197  }
198 }
199 
201  // sequence number matches
202  if (b.IsValid()) {
203  queue_.AppendBlock(from, std::move(b));
204  }
205  else {
206  sLOG << "MixStreamData::OnCloseStream"
207  << "stream" << id_
208  << "from" << from
209  << "for worker" << my_worker_rank()
210  << "remaining_closing_blocks_" << remaining_closing_blocks_;
211 
212  queue_.Close(from);
213 
214  die_unless(remaining_closing_blocks_ > 0);
215  if (--remaining_closing_blocks_ == 0) {
216  rx_lifetime_.StopEventually();
217  rx_timespan_.StopEventually();
218  }
219 
221  }
222 
223  seq_[from].seq_++;
224 }
225 
226 /******************************************************************************/
227 // MixStream
228 
230  : ptr_(ptr) { }
231 
233  ptr_->Close();
234 }
235 
236 const StreamId& MixStream::id() const {
237  return ptr_->id();
238 }
239 
241  return *ptr_;
242 }
243 
244 const StreamData& MixStream::data() const {
245  return *ptr_;
246 }
247 
249  return ptr_->GetWriters();
250 }
251 
253  return ptr_->GetMixReader(consume);
254 }
255 
257  return ptr_->GetReader(consume);
258 }
259 
260 } // namespace data
261 } // namespace thrill
262 
263 /******************************************************************************/
std::atomic< size_t > rx_net_items_
void Close(size_t src)
append closing sentinel block from src (also delivered via the network).
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
common::StatsTimerStopped tx_timespan_
Timers from first rx / tx package until rx / tx direction is closed.
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
Base class for common structures for ConcatStream and MixedStream.
Definition: stream_data.hpp:45
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
size_t value() const
return the current value – should only be used for debugging.
Definition: semaphore.hpp:79
void set_dia_id(size_t dia_id)
StreamData & data() final
Return stream data reference.
Definition: mix_stream.cpp:240
size_t workers_per_host() const
Returns workers_per_host.
Definition: stream_data.hpp:97
MixStreamDataPtr ptr_
Definition: mix_stream.hpp:153
#define die_unless(X)
Definition: die.hpp:27
void Close() final
shuts the stream down.
Definition: mix_stream.cpp:118
std::mutex mutex_
protects critical sections
MixBlockQueue queue_
BlockQueue to store incoming Blocks with source.
Definition: mix_stream.hpp:97
size_t hard_ram_limit() noexcept
Hard limit on amount of memory used for ByteBlock.
Definition: block_pool.cpp:886
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
MixStreamDataPtr MixLoopback(size_t stream_id, size_t to_worker_id)
size_t num_hosts() const
Number of hosts in system.
Definition: stream_data.hpp:92
bool is_closed_
flag if Close() was completed
Definition: mix_stream.hpp:89
std::atomic< size_t > rx_net_bytes_
Multiplexer & multiplexer_
reference to multiplexer
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
MixStream(const MixStreamDataPtr &ptr)
Definition: mix_stream.cpp:229
MixBlockQueueReader MixReader
Definition: mix_stream.hpp:49
size_t my_host_rank() const
Returns my_host_rank.
Definition: stream_data.hpp:90
virtual Connection & connection(size_t id)=0
Return Connection to client id.
tlx::CountingPtr< StreamData > StreamDataPtr
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
std::atomic< size_t > remaining_closing_blocks_
std::atomic< size_t > max_active_streams_
maximu number of active Cat/MixStreams
size_t num_workers() const
total number of workers.
Definition: multiplexer.hpp:98
MixReader GetReader(bool consume)
Open a MixReader (function name matches a method in File and CatStream).
Definition: mix_stream.cpp:114
common::StatsTimerStart tx_lifetime_
Timers from creation of stream until rx / tx direction is closed.
const StreamId & id() const final
Return stream id.
Definition: mix_stream.cpp:236
MixReader GetMixReader(bool consume)
Definition: mix_stream.cpp:252
Writers GetWriters() final
Definition: mix_stream.cpp:46
void IntReleaseMixStream(size_t id, size_t local_worker_id)
release pointer onto a MixStream object
MixReader GetReader(bool consume)
Open a MixReader (function name matches a method in File and CatStream).
Definition: mix_stream.cpp:256
size_t my_worker_rank() const
Returns my_worker_rank_.
Definition: stream_data.hpp:99
common::StatsTimerStopped rx_timespan_
MixStreamData(Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Creates a new stream instance.
Definition: mix_stream.cpp:27
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
Reader to retrieve items in unordered sequence from a MixBlockQueue.
std::vector< SeqReordering > seq_
Block Sequence numbers.
Definition: mix_stream.hpp:91
size_t wait(size_t delta=1, size_t slack=0)
Definition: semaphore.hpp:60
size_t signal()
Definition: semaphore.hpp:44
bool closed() const final
Definition: mix_stream.cpp:145
tlx::Semaphore sem_closing_blocks_
number of received stream closing Blocks.
void AppendBlock(size_t src, const Block &block)
append block delivered via the network from src.
StreamId id_
our own stream id.
void OnAllClosed(const char *stream_type)
Definition: stream_data.cpp:35
size_t dia_id_
Associated DIANode id.
size_t workers_per_host() const
number of workers per host
static int round_down_to_power_of_two(int i)
does what it says: round down to next power of two
size_t StreamId
Definition: stream_data.hpp:32
std::atomic< size_t > rx_net_blocks_
BlockPool & block_pool_
reference to host-global BlockPool.
void OnStreamBlockOrdered(size_t from, Block &&b)
called to process PinnedBlock in sequence
Definition: mix_stream.cpp:200
MixReader GetMixReader(bool consume)
Creates a BlockReader which mixes items from all workers.
Definition: mix_stream.cpp:109
void OnStreamBlock(size_t from, uint32_t seq, Block &&b)
called from Multiplexer when there is a new Block for this Stream.
Definition: mix_stream.cpp:160
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
std::atomic< size_t > active_streams_
number of active Cat/MixStreams
size_t num_workers() const
Number of workers in system.
Definition: stream_data.hpp:94
bool write_closed() const
check if writer side Close() was called.
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
common::StatsTimerStart rx_lifetime_
void set_dia_id(size_t dia_id)
Definition: mix_stream.cpp:41