Thrill  0.1
stream_data.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/stream_data.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015-2017 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/data/stream.hpp>
13 
17 
18 namespace thrill {
19 namespace data {
20 
21 /******************************************************************************/
22 // StreamData
23 
25  Multiplexer& multiplexer, size_t send_size_limit,
26  const StreamId& id,
27  size_t local_worker_id, size_t dia_id)
28  : sem_queue_(send_size_limit),
29  id_(id),
30  stream_set_base_(stream_set_base),
31  local_worker_id_(local_worker_id),
32  dia_id_(dia_id),
33  multiplexer_(multiplexer)
34 { }
35 
36 StreamData::~StreamData() = default;
37 
38 void StreamData::OnWriterClosed(size_t peer_worker_rank, bool sent) {
40 
41  LOG << "StreamData::OnWriterClosed()"
42  << " my_worker_rank= " << my_worker_rank()
43  << " peer_worker_rank=" << peer_worker_rank
44  << " writers_closed_=" << writers_closed_;
45 
46  die_unless(writers_closed_ <= num_hosts() * workers_per_host());
47 
48  stream_set_base_->OnWriterClosed(peer_worker_rank, sent);
49 
50  if (writers_closed_ == num_hosts() * workers_per_host()) {
51  LOG << "StreamData::OnWriterClosed() final close received";
52 
53  tx_lifetime_.StopEventually();
54  tx_timespan_.StopEventually();
55 
57  all_writers_closed_ = true;
58  }
59 }
60 
63  << "class" << "StreamData"
64  << "event" << "close"
65  << "id" << id_
66  << "type" << stream_type()
67  << "dia_id" << dia_id_
68  << "worker_rank"
71  << "rx_net_items" << rx_net_items_
72  << "rx_net_bytes" << rx_net_bytes_
73  << "rx_net_blocks" << rx_net_blocks_
74  << "tx_net_items" << tx_net_items_
75  << "tx_net_bytes" << tx_net_bytes_
76  << "tx_net_blocks" << tx_net_blocks_
77  << "rx_int_items" << rx_int_items_
78  << "rx_int_bytes" << rx_int_bytes_
79  << "rx_int_blocks" << rx_int_blocks_
80  << "tx_int_items" << tx_int_items_
81  << "tx_int_bytes" << tx_int_bytes_
82  << "tx_int_blocks" << tx_int_blocks_;
83 }
84 
85 /******************************************************************************/
86 // StreamData::Writers
87 
89  : my_worker_rank_(my_worker_rank)
90 { }
91 
93  // close BlockWriters in a cyclic fashion
94  size_t s = size();
95  for (size_t i = 0; i < s; ++i) {
96  operator [] ((i + my_worker_rank_) % s).Close();
97  }
98 }
99 
101  Close();
102 }
103 
104 /******************************************************************************/
105 // StreamSet
106 
107 template <typename StreamData>
108 StreamSet<StreamData>::StreamSet(Multiplexer& multiplexer, size_t send_size_limit,
109  StreamId id, size_t workers_per_host, size_t dia_id)
110  : multiplexer_(multiplexer), id_(id) {
111  for (size_t i = 0; i < workers_per_host; ++i) {
112  streams_.emplace_back(
113  tlx::make_counting<StreamData>(
114  this, multiplexer, send_size_limit, id, i, dia_id));
115  }
119 }
120 
121 template <typename StreamData>
123  assert(local_worker_id < streams_.size());
124  return streams_[local_worker_id];
125 }
126 
127 template <typename StreamData>
128 bool StreamSet<StreamData>::Release(size_t local_worker_id) {
129  std::unique_lock<std::mutex> lock(mutex_);
130  assert(local_worker_id < streams_.size());
131  if (streams_[local_worker_id]) {
132  assert(remaining_ > 0);
133  streams_[local_worker_id].reset();
134  --remaining_;
135  }
136  return (remaining_ == 0);
137 }
138 
139 template <typename StreamData>
141  for (StreamDataPtr& c : streams_)
142  c->Close();
143 }
144 
145 template <typename StreamData>
146 void StreamSet<StreamData>::OnWriterClosed(size_t peer_worker_rank, bool sent) {
147  std::unique_lock<std::mutex> lock(mutex_);
148 
149  size_t peer_host_rank = peer_worker_rank / workers_per_host();
150  die_unless(peer_host_rank < writers_closed_per_host_.size());
151 
152  writers_closed_per_host_[peer_host_rank]++;
153  if (sent)
154  writers_closed_per_host_sent_[peer_host_rank]++;
155 
156  LOG << "StreamSet::OnWriterClosed()"
157  << " my_host_rank= " << my_host_rank()
158  << " peer_host_rank=" << peer_host_rank
159  << " peer_worker_rank=" << peer_worker_rank
160  << " writers_closed_per_host_[]="
161  << writers_closed_per_host_[peer_host_rank];
162 
163  die_unless(writers_closed_per_host_[peer_host_rank] <=
165 
166  if (writers_closed_per_host_[peer_host_rank] ==
168  {
169  if (peer_host_rank == my_host_rank())
170  return;
171 
172  if (writers_closed_per_host_[peer_host_rank] ==
173  writers_closed_per_host_sent_[peer_host_rank])
174  {
175  LOG << "StreamSet::OnWriterClosed() final close already-done"
176  << " my_host_rank=" << my_host_rank()
177  << " peer_host_rank=" << peer_host_rank
178  << " writers_closed_per_host_[]="
179  << writers_closed_per_host_[peer_host_rank];
180  return;
181  }
182 
183  LOG << "StreamSet::OnWriterClosed() final close "
184  << " my_host_rank=" << my_host_rank()
185  << " peer_host_rank=" << peer_host_rank
186  << " writers_closed_per_host_[]="
187  << writers_closed_per_host_[peer_host_rank];
188 
190  header.magic = magic_byte();
191  header.stream_id = id_;
195 
197  header.Serialize(bb);
198 
199  net::Buffer buffer = bb.ToBuffer();
200  assert(buffer.size() == MultiplexerHeader::total_size);
201 
202  net::Connection& conn = multiplexer_.group().connection(peer_host_rank);
203 
205  conn, 42 + (conn.tx_seq_.fetch_add(2) & 0xFFFF),
206  std::move(buffer));
207  }
208 }
209 
210 template <>
213 }
214 
215 template <>
218 }
219 
220 template class StreamSet<CatStreamData>;
221 template class StreamSet<MixStreamData>;
222 
223 } // namespace data
224 } // namespace thrill
225 
226 /******************************************************************************/
std::atomic< size_t > rx_net_items_
bool all_writers_closed_
bool if all writers were closed
common::JsonLogger & logger()
Get the JsonLogger from the BlockPool.
common::StatsTimerStopped tx_timespan_
Timers from first rx / tx package until rx / tx direction is closed.
void OnWriterClosed(size_t peer_worker_rank, bool sent)
static const uint32_t all_workers
virtual worker which receives all-close messages
std::atomic< size_t > tx_net_items_
uint32_t sender_worker
global worker rank of sender
#define die_unless(X)
Definition: die.hpp:27
StreamSetBase * stream_set_base_
pointer to StreamSetBase containing this StreamData
size_t my_host_rank() const
Returns my_host_rank.
size_t my_worker_rank() const
Returns my_worker_rank_.
std::atomic< size_t > rx_int_items_
bool Release(size_t local_worker_id)
Buffer ToBuffer()
Explicit conversion to Buffer MOVING the memory ownership.
Multiplexer & multiplexer_
reference to multiplexer
uint32_t seq
sequence number in Stream
std::atomic< size_t > rx_net_bytes_
Multiplexer & multiplexer_
reference to multiplexer
virtual Connection & connection(size_t id)=0
Return Connection to client id.
std::atomic< size_t > tx_int_bytes_
std::vector< StreamDataPtr > streams_
&#39;owns&#39; all streams belonging to one stream id for all local workers.
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
Definition: multiplexer.hpp:33
virtual void OnWriterClosed(size_t peer_worker_rank, bool sent)=0
StreamSet(Multiplexer &multiplexer, size_t send_size_limit, StreamId id, size_t workers_per_host, size_t dia_id)
StreamId id_
stream id
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:92
std::atomic< size_t > rx_int_bytes_
static constexpr size_t total_size
common::StatsTimerStart tx_lifetime_
Timers from creation of stream until rx / tx direction is closed.
size_t local_worker_id_
local worker id
StreamDataPtr Peer(size_t local_worker_id)
void Serialize(net::BufferBuilder &bb) const
Serializes the whole block struct into a buffer.
size_t writers_closed_
number of writers closed via StreamSink.
std::vector< size_t > writers_closed_per_host_sent_
number of writers closed per host, message is set when all are closed
size_t my_worker_rank_
rank of this worker
Definition: stream_data.hpp:81
void OnAllWritersClosed()
method called when all StreamSink writers have finished
Definition: stream_data.cpp:61
virtual const char * stream_type()=0
return stream type string
Block header is sent before a sequence of blocks it indicates the number of elements and their bounda...
std::vector< size_t > writers_closed_per_host_
number of writers closed per host, message is set when all are closed
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
size_t num_hosts() const
Number of hosts in system.
Definition: stream_data.hpp:99
size_t workers_per_host() const
Returns workers_per_host.
size_t remaining_
countdown to destruction
std::atomic< size_t > tx_net_bytes_
size_t workers_per_host() const
number of workers per host
~Writers()
custom destructor to close writers is a cyclic fashion
net::DispatcherThread & dispatcher()
get network dispatcher
size_t num_hosts() const
Number of hosts in system.
Base class for StreamSet.
net::Group & group()
get network group connection
MagicByte magic_byte() const
Writers(size_t my_worker_rank=0)
Definition: stream_data.cpp:88
void OnWriterClosed(size_t peer_worker_rank, bool sent)
Definition: stream_data.cpp:38
StreamId id_
our own stream id.
size_t dia_id_
associated DIANode id.
std::atomic< size_t > rx_int_blocks_
void AsyncWrite(Connection &c, uint32_t seq, Buffer &&buffer, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
std::atomic< size_t > tx_net_blocks_
size_t my_host_rank() const
Returns my_host_rank.
Definition: stream_data.hpp:97
BufferBuilder represents a dynamically growable area of memory, which can be modified by appending in...
void Close() final
Close all StreamData objects.
size_t StreamId
Definition: stream_data.hpp:32
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
std::atomic< size_t > tx_int_blocks_
std::atomic< size_t > rx_net_blocks_
StreamData(StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Definition: stream_data.cpp:24
std::atomic< size_t > tx_int_items_
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
std::mutex mutex_
mutex for working on the data structure
size_type size() const noexcept
return number of items in Buffer
Definition: buffer.hpp:141
size_t workers_per_host() const
Returns workers_per_host.
static const uint32_t final_seq
final sequence number