Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
multiplexer.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/multiplexer.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_MULTIPLEXER_HEADER
14 #define THRILL_DATA_MULTIPLEXER_HEADER
15 
18 #include <thrill/net/group.hpp>
19 
20 #include <algorithm>
21 #include <atomic>
22 #include <memory>
23 
24 namespace thrill {
25 namespace data {
26 
27 //! \addtogroup data_layer
28 //! \{
29 
30 class StreamSetBase;
31 
32 template <typename Stream>
33 class StreamSet;
34 
35 class CatStreamData;
37 class CatStream;
39 
40 class MixStreamData;
42 class MixStream;
44 
47 
48 class BlockQueue;
49 class MixBlockQueueSink;
50 
52 
53 /*!
54  * Multiplexes virtual Connections on Dispatcher.
55  *
56  * A worker as a TCP conneciton to each other worker to exchange large amounts
57  * of data. Since multiple exchanges can occur at the same time on this single
58  * connection we use multiplexing. The slices are called Blocks and are
59  * indicated by a \ref MultiplexerHeader. Multiple Blocks form a Stream on a
60  * single TCP connection. The multiplexer multiplexes all streams on all
61  * sockets.
62  *
63  * All sockets are polled for headers. As soon as the a header arrives it is
64  * either attached to an existing stream or a new stream instance is
65  * created.
66  */
68 {
69  static constexpr bool debug = false;
70 
71 public:
73  net::DispatcherThread& dispatcher, net::Group& group,
74  size_t workers_per_host);
75 
76  //! non-copyable: delete copy-constructor
77  Multiplexer(const Multiplexer&) = delete;
78  //! non-copyable: delete assignment operator
79  Multiplexer& operator = (const Multiplexer&) = delete;
80 
81  //! Closes all client connections
82  ~Multiplexer();
83 
84  //! Closes all client connections
85  void Close();
86 
87  //! total number of hosts.
88  size_t num_hosts() const {
89  return group_.num_hosts();
90  }
91 
92  //! my rank among the hosts.
93  size_t my_host_rank() const {
94  return group_.my_host_rank();
95  }
96 
97  //! total number of workers.
98  size_t num_workers() const {
99  return num_hosts() * workers_per_host_;
100  }
101 
102  //! number of workers per host
103  size_t workers_per_host() const {
104  return workers_per_host_;
105  }
106 
107  //! Get the used BlockPool
109 
110  //! Get the JsonLogger from the BlockPool
112 
113  //! \name CatStreamData
114  //! \{
115 
116  //! Allocate the next stream
117  size_t AllocateCatStreamId(size_t local_worker_id);
118 
119  //! Get stream with given id, if it does not exist, create it.
121  size_t id, size_t local_worker_id, size_t dia_id);
122 
123  //! Request next stream.
124  CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id);
125 
126  //! \}
127 
128  //! \name MixStream
129  //! \{
130 
131  //! Allocate the next stream
132  size_t AllocateMixStreamId(size_t local_worker_id);
133 
134  //! Get stream with given id, if it does not exist, create it.
136  size_t id, size_t local_worker_id, size_t dia_id);
137 
138  //! Request next stream.
139  MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id);
140 
141  //! \}
142 
143 private:
144  //! reference to host-global memory manager
146 
147  //! reference to host-global BlockPool.
149 
150  //! dispatcher used for all communication by data::Multiplexer, the thread
151  //! never leaves the data components!
153 
154  // Holds NetConnections for outgoing Streams
156 
157  //! Number of workers per host
159 
160  //! protects critical sections
161  std::mutex mutex_;
162 
163  //! closed
164  bool closed_ = false;
165 
166  //! number of parallel recv requests
168 
169  //! Calculated send queue size limit for StreamData semaphores
171 
172  //! number of active Cat/MixStreams
173  std::atomic<size_t> active_streams_ { 0 };
174 
175  //! maximu number of active Cat/MixStreams
176  std::atomic<size_t> max_active_streams_ { 0 };
177 
178  //! friends for access to network components
179  friend class CatStreamData;
180  friend class MixStreamData;
181  friend class StreamSink;
182 
183  //! Pointer to queue that is used for communication between two workers on
184  //! the same host.
185  CatStreamDataPtr CatLoopback(size_t stream_id, size_t to_worker_id);
186  MixStreamDataPtr MixLoopback(size_t stream_id, size_t to_worker_id);
187 
188  /**************************************************************************/
189 
190  //! pimpl data structure
191  struct Data;
192 
193  //! pimpl data structure
194  std::unique_ptr<Data> d_;
195 
197  size_t id, size_t local_worker_id, size_t dia_id);
199  size_t id, size_t local_worker_id, size_t dia_id);
200 
201  //! release pointer onto a CatStreamData object
202  void IntReleaseCatStream(size_t id, size_t local_worker_id);
203  //! release pointer onto a MixStream object
204  void IntReleaseMixStream(size_t id, size_t local_worker_id);
205 
206  /**************************************************************************/
207 
209 
210  //! expects the next MultiplexerHeader from a socket and passes to
211  //! OnMultiplexerHeader
212  void AsyncReadMultiplexerHeader(size_t peer, Connection& s);
213 
214  //! parses MultiplexerHeader and decides whether to receive Block or close
215  //! Stream
216  void OnMultiplexerHeader(
217  size_t peer, uint32_t seq, Connection& s, net::Buffer&& buffer);
218 
219  //! Receives and dispatches a Block to a CatStreamData
220  void OnCatStreamBlock(
221  size_t peer, Connection& s, const StreamMultiplexerHeader& header,
222  const CatStreamDataPtr& stream, PinnedByteBlockPtr&& bytes);
223 
224  //! Receives and dispatches a Block to a MixStream
225  void OnMixStreamBlock(
226  size_t peer, Connection& s, const StreamMultiplexerHeader& header,
227  const MixStreamDataPtr& stream, PinnedByteBlockPtr&& bytes);
228 };
229 
230 //! \}
231 
232 } // namespace data
233 } // namespace thrill
234 
235 #endif // !THRILL_DATA_MULTIPLEXER_HEADER
236 
237 /******************************************************************************/
StreamSink is an BlockSink that sends data via a network socket to the StreamData object on a differe...
Definition: stream_sink.hpp:38
CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id)
Request next stream.
common::JsonLogger & logger()
Get the JsonLogger from the BlockPool.
size_t my_host_rank() const
my rank among the hosts.
Definition: multiplexer.hpp:93
size_t AllocateMixStreamId(size_t local_worker_id)
Allocate the next stream.
std::mutex mutex_
protects critical sections
BlockPool & block_pool()
Get the used BlockPool.
A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them...
Definition: cat_stream.hpp:54
void IntReleaseCatStream(size_t id, size_t local_worker_id)
release pointer onto a CatStreamData object
CatStreamDataPtr GetOrCreateCatStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Get stream with given id, if it does not exist, create it.
size_t num_hosts() const
total number of hosts.
Definition: multiplexer.hpp:88
MixStreamDataPtr MixLoopback(size_t stream_id, size_t to_worker_id)
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:182
MixStreamDataPtr IntGetOrCreateMixStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Ownership handle onto a MixStream.
Definition: mix_stream.hpp:119
~Multiplexer()
Closes all client connections.
mem::Manager & mem_manager_
reference to host-global memory manager
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
Definition: multiplexer.hpp:33
std::atomic< size_t > max_active_streams_
maximu number of active Cat/MixStreams
size_t num_workers() const
total number of workers.
Definition: multiplexer.hpp:98
void OnCatStreamBlock(size_t peer, Connection &s, const StreamMultiplexerHeader &header, const CatStreamDataPtr &stream, PinnedByteBlockPtr &&bytes)
Receives and dispatches a Block to a CatStreamData.
A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them...
Definition: mix_stream.hpp:44
void IntReleaseMixStream(size_t id, size_t local_worker_id)
release pointer onto a MixStream object
size_t workers_per_host_
Number of workers per host.
size_t my_host_rank() const
Return our rank among hosts in this group.
Definition: group.hpp:69
void OnMixStreamBlock(size_t peer, Connection &s, const StreamMultiplexerHeader &header, const MixStreamDataPtr &stream, PinnedByteBlockPtr &&bytes)
Receives and dispatches a Block to a MixStream.
size_t num_parallel_async_
number of parallel recv requests
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
Block header is sent before a sequence of blocks it indicates the number of elements and their bounda...
CatStreamDataPtr IntGetOrCreateCatStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Multiplexer(mem::Manager &mem_manager, BlockPool &block_pool, net::DispatcherThread &dispatcher, net::Group &group, size_t workers_per_host)
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
Ownership handle onto a CatStreamData.
Definition: cat_stream.hpp:148
size_t send_size_limit_
Calculated send queue size limit for StreamData semaphores.
MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id)
Request next stream.
CatStreamDataPtr CatLoopback(size_t stream_id, size_t to_worker_id)
size_t AllocateCatStreamId(size_t local_worker_id)
Allocate the next stream.
static const size_t bytes
number of bytes in uint_pair
Definition: uint_types.hpp:75
std::unique_ptr< Data > d_
pimpl data structure
static constexpr bool debug
Definition: multiplexer.hpp:69
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
Definition: group.hpp:47
size_t workers_per_host() const
number of workers per host
net::DispatcherThread & dispatcher_
Object shared by allocators and other classes to track memory allocations.
Definition: manager.hpp:28
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
void OnMultiplexerHeader(size_t peer, uint32_t seq, Connection &s, net::Buffer &&buffer)
JsonLogger is a receiver of JSON output objects for logging.
Definition: json_logger.hpp:69
void AsyncReadMultiplexerHeader(size_t peer, Connection &s)
Multiplexer & operator=(const Multiplexer &)=delete
non-copyable: delete assignment operator
BlockPool & block_pool_
reference to host-global BlockPool.
void Close()
Closes all client connections.
std::atomic< size_t > active_streams_
number of active Cat/MixStreams
virtual size_t num_hosts() const =0
Return number of connections in this group (= number computing hosts)
DispatcherThread contains a net::Dispatcher object and an associated thread that runs in the dispatch...
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
Definition: block_queue.hpp:47
MixStreamDataPtr GetOrCreateMixStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Get stream with given id, if it does not exist, create it.