Thrill  0.1
multiplexer.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/multiplexer.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_MULTIPLEXER_HEADER
14 #define THRILL_DATA_MULTIPLEXER_HEADER
15 
18 #include <thrill/net/group.hpp>
19 
20 #include <algorithm>
21 #include <atomic>
22 #include <memory>
23 
24 namespace thrill {
25 namespace data {
26 
27 //! \addtogroup data_layer
28 //! \{
29 
30 class StreamSetBase;
31 
32 template <typename Stream>
33 class StreamSet;
34 
35 class CatStreamData;
37 class CatStream;
39 
40 class MixStreamData;
42 class MixStream;
44 
47 
48 class BlockQueue;
49 class MixBlockQueueSink;
50 
52 
53 /*!
54  * Multiplexes virtual Connections on Dispatcher.
55  *
56  * A worker as a TCP conneciton to each other worker to exchange large amounts
57  * of data. Since multiple exchanges can occur at the same time on this single
58  * connection we use multiplexing. The slices are called Blocks and are
59  * indicated by a \ref MultiplexerHeader. Multiple Blocks form a Stream on a
60  * single TCP connection. The multiplexer multiplexes all streams on all
61  * sockets.
62  *
63  * All sockets are polled for headers. As soon as the a header arrives it is
64  * either attached to an existing stream or a new stream instance is
65  * created.
66  */
68 {
69  static constexpr bool debug = false;
70 
71 public:
74  size_t workers_per_host);
75 
76  //! non-copyable: delete copy-constructor
77  Multiplexer(const Multiplexer&) = delete;
78  //! non-copyable: delete assignment operator
79  Multiplexer& operator = (const Multiplexer&) = delete;
80 
81  //! Closes all client connections
82  ~Multiplexer();
83 
84  //! Closes all client connections
85  void Close();
86 
87  //! total number of hosts.
88  size_t num_hosts() const {
89  return group_.num_hosts();
90  }
91 
92  //! my rank among the hosts.
93  size_t my_host_rank() const {
94  return group_.my_host_rank();
95  }
96 
97  //! total number of workers.
98  size_t num_workers() const {
99  return num_hosts() * workers_per_host_;
100  }
101 
102  //! number of workers per host
103  size_t workers_per_host() const {
104  return workers_per_host_;
105  }
106 
107  //! Get the used BlockPool
109 
110  //! Get the JsonLogger from the BlockPool
112 
113  //! get network dispatcher
115 
116  //! get network group connection
117  net::Group& group() { return group_; }
118 
119  //! \name CatStreamData
120  //! \{
121 
122  //! Allocate the next stream
123  size_t AllocateCatStreamId(size_t local_worker_id);
124 
125  //! Get stream with given id, if it does not exist, create it.
127  size_t id, size_t local_worker_id, size_t dia_id);
128 
129  //! Request next stream.
130  CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id);
131 
132  //! \}
133 
134  //! \name MixStream
135  //! \{
136 
137  //! Allocate the next stream
138  size_t AllocateMixStreamId(size_t local_worker_id);
139 
140  //! Get stream with given id, if it does not exist, create it.
142  size_t id, size_t local_worker_id, size_t dia_id);
143 
144  //! Request next stream.
145  MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id);
146 
147  //! \}
148 
149 private:
150  //! reference to host-global memory manager
152 
153  //! reference to host-global BlockPool.
155 
156  //! dispatcher used for all communication by data::Multiplexer, the thread
157  //! never leaves the data components!
159 
160  //! Holds NetConnections for outgoing Streams
162 
163  //! Number of workers per host
165 
166  //! protects critical sections
167  std::mutex mutex_;
168 
169  //! closed
170  bool closed_ = false;
171 
172  //! number of parallel recv requests
174 
175  //! Calculated send queue size limit for StreamData semaphores
177 
178  //! number of active Cat/MixStreams
179  std::atomic<size_t> active_streams_ { 0 };
180 
181  //! maximu number of active Cat/MixStreams
182  std::atomic<size_t> max_active_streams_ { 0 };
183 
184  //! friends for access to network components
185  friend class CatStreamData;
186  friend class MixStreamData;
187  friend class StreamSink;
188 
189  //! Pointer to queue that is used for communication between two workers on
190  //! the same host.
191  CatStreamDataPtr CatLoopback(size_t stream_id, size_t to_worker_id);
192  MixStreamDataPtr MixLoopback(size_t stream_id, size_t to_worker_id);
193 
194  /**************************************************************************/
195 
196  //! pimpl data structure
197  struct Data;
198 
199  //! pimpl data structure
200  std::unique_ptr<Data> d_;
201 
203  size_t id, size_t local_worker_id, size_t dia_id);
205  size_t id, size_t local_worker_id, size_t dia_id);
206 
207  //! release pointer onto a CatStreamData object
208  void IntReleaseCatStream(size_t id, size_t local_worker_id);
209  //! release pointer onto a MixStream object
210  void IntReleaseMixStream(size_t id, size_t local_worker_id);
211 
212  /**************************************************************************/
213 
215 
216  //! expects the next MultiplexerHeader from a socket and passes to
217  //! OnMultiplexerHeader
218  void AsyncReadMultiplexerHeader(size_t peer, Connection& s);
219 
220  //! parses MultiplexerHeader and decides whether to receive Block or close
221  //! Stream
222  void OnMultiplexerHeader(
223  size_t peer, uint32_t seq, Connection& s, net::Buffer&& buffer);
224 
225  //! Receives and dispatches a Block to a CatStreamData
226  void OnCatStreamBlock(
227  size_t peer, Connection& s, const StreamMultiplexerHeader& header,
228  const CatStreamDataPtr& stream, PinnedByteBlockPtr&& bytes);
229 
230  //! Receives and dispatches a Block to a MixStream
231  void OnMixStreamBlock(
232  size_t peer, Connection& s, const StreamMultiplexerHeader& header,
233  const MixStreamDataPtr& stream, PinnedByteBlockPtr&& bytes);
234 };
235 
236 //! \}
237 
238 } // namespace data
239 } // namespace thrill
240 
241 #endif // !THRILL_DATA_MULTIPLEXER_HEADER
242 
243 /******************************************************************************/
StreamSink is an BlockSink that sends data via a network socket to the StreamData object on a differe...
Definition: stream_sink.hpp:38
CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id)
Request next stream.
common::JsonLogger & logger()
Get the JsonLogger from the BlockPool.
size_t AllocateMixStreamId(size_t local_worker_id)
Allocate the next stream.
std::mutex mutex_
protects critical sections
BlockPool & block_pool()
Get the used BlockPool.
A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them...
Definition: cat_stream.hpp:54
void IntReleaseCatStream(size_t id, size_t local_worker_id)
release pointer onto a CatStreamData object
CatStreamDataPtr GetOrCreateCatStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Get stream with given id, if it does not exist, create it.
MixStreamDataPtr MixLoopback(size_t stream_id, size_t to_worker_id)
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:205
MixStreamDataPtr IntGetOrCreateMixStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Ownership handle onto a MixStream.
Definition: mix_stream.hpp:126
~Multiplexer()
Closes all client connections.
mem::Manager & mem_manager_
reference to host-global memory manager
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
Definition: multiplexer.hpp:33
std::atomic< size_t > max_active_streams_
maximu number of active Cat/MixStreams
size_t num_hosts() const
total number of hosts.
Definition: multiplexer.hpp:88
void OnCatStreamBlock(size_t peer, Connection &s, const StreamMultiplexerHeader &header, const CatStreamDataPtr &stream, PinnedByteBlockPtr &&bytes)
Receives and dispatches a Block to a CatStreamData.
A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them...
Definition: mix_stream.hpp:44
void IntReleaseMixStream(size_t id, size_t local_worker_id)
release pointer onto a MixStream object
size_t workers_per_host_
Number of workers per host.
size_t my_host_rank() const
my rank among the hosts.
Definition: multiplexer.hpp:93
void OnMixStreamBlock(size_t peer, Connection &s, const StreamMultiplexerHeader &header, const MixStreamDataPtr &stream, PinnedByteBlockPtr &&bytes)
Receives and dispatches a Block to a MixStream.
size_t num_parallel_async_
number of parallel recv requests
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
Block header is sent before a sequence of blocks it indicates the number of elements and their bounda...
CatStreamDataPtr IntGetOrCreateCatStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Multiplexer(mem::Manager &mem_manager, BlockPool &block_pool, net::DispatcherThread &dispatcher, net::Group &group, size_t workers_per_host)
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
net::Group & group_
Holds NetConnections for outgoing Streams.
Ownership handle onto a CatStreamData.
Definition: cat_stream.hpp:155
size_t workers_per_host() const
number of workers per host
net::DispatcherThread & dispatcher()
get network dispatcher
size_t send_size_limit_
Calculated send queue size limit for StreamData semaphores.
MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id)
Request next stream.
net::Group & group()
get network group connection
CatStreamDataPtr CatLoopback(size_t stream_id, size_t to_worker_id)
size_t AllocateCatStreamId(size_t local_worker_id)
Allocate the next stream.
static const size_t bytes
number of bytes in uint_pair
Definition: uint_types.hpp:75
std::unique_ptr< Data > d_
pimpl data structure
static constexpr bool debug
Definition: multiplexer.hpp:69
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
Definition: group.hpp:47
net::DispatcherThread & dispatcher_
Object shared by allocators and other classes to track memory allocations.
Definition: manager.hpp:28
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
size_t num_workers() const
total number of workers.
Definition: multiplexer.hpp:98
void OnMultiplexerHeader(size_t peer, uint32_t seq, Connection &s, net::Buffer &&buffer)
JsonLogger is a receiver of JSON output objects for logging.
Definition: json_logger.hpp:69
void AsyncReadMultiplexerHeader(size_t peer, Connection &s)
Multiplexer & operator=(const Multiplexer &)=delete
non-copyable: delete assignment operator
BlockPool & block_pool_
reference to host-global BlockPool.
void Close()
Closes all client connections.
size_t my_host_rank() const
Return our rank among hosts in this group.
Definition: group.hpp:69
std::atomic< size_t > active_streams_
number of active Cat/MixStreams
virtual size_t num_hosts() const =0
Return number of connections in this group (= number computing hosts)
DispatcherThread contains a net::Dispatcher object and an associated thread that runs in the dispatch...
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
Definition: block_queue.hpp:47
MixStreamDataPtr GetOrCreateMixStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Get stream with given id, if it does not exist, create it.