Thrill  0.1
stream_data.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/stream_data.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Tobias Sturm <[email protected]>
7  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_STREAM_DATA_HEADER
14 #define THRILL_DATA_STREAM_DATA_HEADER
15 
19 #include <thrill/data/file.hpp>
21 #include <tlx/semaphore.hpp>
22 
23 #include <mutex>
24 #include <vector>
25 
26 namespace thrill {
27 namespace data {
28 
29 //! \addtogroup data_layer
30 //! \{
31 
32 using StreamId = size_t;
33 
34 enum class MagicByte : uint8_t {
36 };
37 
38 class StreamSink;
39 class StreamSetBase;
40 
41 /*!
42  * Base class for common structures for ConcatStream and MixedStream. This is
43  * also a virtual base class use by Multiplexer to pass blocks to streams!
44  * Instead, it contains common items like stats.
45  */
47 {
48 public:
49  static constexpr bool debug = false;
50 
52 
53  /*!
54  * An extra class derived from std::vector<> for delivery of the
55  * BlockWriters of a Stream. The purpose is to enforce a custom way to close
56  * stream writers cyclically such that PE k first sends it's Close-packet to
57  * k+1, k+2, etc.
58  */
59  class Writers : public std::vector<BlockWriter<StreamSink> >
60  {
61  public:
62  Writers(size_t my_worker_rank = 0);
63 
64  //! copyable: default copy-constructor
65  Writers(const Writers&) = default;
66  //! copyable: default assignment operator
67  Writers& operator = (const Writers&) = default;
68  //! move-constructor: default
69  Writers(Writers&&) = default;
70  //! move-assignment operator: default
71  Writers& operator = (Writers&&) = default;
72 
73  //! custom destructor to close writers is a cyclic fashion
74  void Close();
75 
76  //! custom destructor to close writers is a cyclic fashion
77  ~Writers();
78 
79  private:
80  //! rank of this worker
82  };
83 
84  StreamData(StreamSetBase* stream_set_base,
85  Multiplexer& multiplexer, size_t send_size_limit,
86  const StreamId& id, size_t local_worker_id, size_t dia_id);
87 
88  virtual ~StreamData();
89 
90  //! Return stream id
91  const StreamId& id() const { return id_; }
92 
93  //! return stream type string
94  virtual const char * stream_type() = 0;
95 
96  //! Returns my_host_rank
97  size_t my_host_rank() const { return multiplexer_.my_host_rank(); }
98  //! Number of hosts in system
99  size_t num_hosts() const { return multiplexer_.num_hosts(); }
100  //! Number of workers in system
101  size_t num_workers() const { return multiplexer_.num_workers(); }
102 
103  //! Returns workers_per_host
104  size_t workers_per_host() const { return multiplexer_.workers_per_host(); }
105  //! Returns my_worker_rank_
106  size_t my_worker_rank() const {
107  return my_host_rank() * workers_per_host() + local_worker_id_;
108  }
109 
110  /*------------------------------------------------------------------------*/
111 
112  //! shuts the stream down.
113  virtual void Close() = 0;
114 
115  virtual bool closed() const = 0;
116 
117  //! Creates BlockWriters for each worker. BlockWriter can only be opened
118  //! once, otherwise the block sequence is incorrectly interleaved!
119  virtual Writers GetWriters() = 0;
120 
121  //! method called from StreamSink when it is closed, used to aggregate Close
122  //! messages to remote hosts
123  void OnWriterClosed(size_t peer_worker_rank, bool sent);
124 
125  //! method called when all StreamSink writers have finished
126  void OnAllWritersClosed();
127 
128  /*------------------------------------------------------------------------*/
129  ///////// expose these members - getters would be too java-ish /////////////
130 
131  //! StatsCounter for incoming data transfer. Does not include loopback data
132  //! transfer
133  std::atomic<size_t>
134  rx_net_items_ { 0 }, rx_net_bytes_ { 0 }, rx_net_blocks_ { 0 };
135 
136  //! StatsCounters for outgoing data transfer - shared by all sinks. Does
137  //! not include loopback data transfer
138  std::atomic<size_t>
139  tx_net_items_ { 0 }, tx_net_bytes_ { 0 }, tx_net_blocks_ { 0 };
140 
141  //! StatsCounter for incoming data transfer. Exclusively contains only
142  //! loopback (internal) data transfer
143  std::atomic<size_t>
144  rx_int_items_ { 0 }, rx_int_bytes_ { 0 }, rx_int_blocks_ { 0 };
145 
146  //! StatsCounters for outgoing data transfer - shared by all sinks.
147  //! Exclusively contains only loopback (internal) data transfer
148  std::atomic<size_t>
149  tx_int_items_ { 0 }, tx_int_bytes_ { 0 }, tx_int_blocks_ { 0 };
150 
151  //! Timers from creation of stream until rx / tx direction is closed.
153 
154  //! Timers from first rx / tx package until rx / tx direction is closed.
156 
157  //! semaphore to stall the amount of PinnedBlocks (measured in bytes) passed
158  //! to the network layer for transmission.
160 
161  ///////////////////////////////////////////////////////////////////////////
162 
163 protected:
164  //! our own stream id.
166 
167  //! pointer to StreamSetBase containing this StreamData
169 
170  //! local worker id
172 
173  //! associated DIANode id.
174  size_t dia_id_;
175 
176  //! reference to multiplexer
178 
179  //! number of remaining expected stream closing operations. Required to know
180  //! when to stop rx_lifetime
181  std::atomic<size_t> remaining_closing_blocks_;
182 
183  //! number of received stream closing Blocks.
185 
186  //! number of writers closed via StreamSink.
187  size_t writers_closed_ = 0;
188 
189  //! bool if all writers were closed
190  bool all_writers_closed_ = false;
191 
192  //! friends for access to multiplexer_
193  friend class StreamSink;
194 };
195 
197 
198 /*!
199  * Base class for StreamSet.
200  */
202 {
203 public:
204  static constexpr bool debug = false;
205 
206  virtual ~StreamSetBase() { }
207 
208  //! Close all streams in the set.
209  virtual void Close() = 0;
210 
211  //! method called from StreamSink when it is closed, used to aggregate Close
212  //! messages to remote hosts
213  virtual void OnWriterClosed(size_t peer_worker_rank, bool sent) = 0;
214 };
215 
216 /*!
217  * Simple structure that holds a all stream instances for the workers on the
218  * local host for a given stream id.
219  */
220 template <typename StreamData>
221 class StreamSet : public StreamSetBase
222 {
223 public:
225 
226  //! Creates a StreamSet with the given number of streams (num workers per
227  //! host).
228  StreamSet(Multiplexer& multiplexer, size_t send_size_limit,
229  StreamId id, size_t workers_per_host, size_t dia_id);
230 
231  //! Returns the stream that will be consumed by the worker with the given
232  //! local id
233  StreamDataPtr Peer(size_t local_worker_id);
234 
235  //! Release local_worker_id, returns true when all individual streams are
236  //! done.
237  bool Release(size_t local_worker_id);
238 
239  //! Close all StreamData objects
240  void Close() final;
241 
242  //! method called from StreamSink when it is closed, used to aggregate Close
243  //! messages to remote hosts
244  void OnWriterClosed(size_t peer_worker_rank, bool sent);
245 
246  //! Returns my_host_rank
247  size_t my_host_rank() const { return multiplexer_.my_host_rank(); }
248  //! Number of hosts in system
249  size_t num_hosts() const { return multiplexer_.num_hosts(); }
250  //! Returns workers_per_host
251  size_t workers_per_host() const { return multiplexer_.workers_per_host(); }
252 
253  inline MagicByte magic_byte() const;
254 
255 private:
256  //! reference to multiplexer
258  //! stream id
260  //! 'owns' all streams belonging to one stream id for all local workers.
261  std::vector<StreamDataPtr> streams_;
262  //! countdown to destruction
263  size_t remaining_;
264  //! number of writers closed per host, message is set when all are closed
265  std::vector<size_t> writers_closed_per_host_;
266  //! number of writers closed per host, message is set when all are closed
267  std::vector<size_t> writers_closed_per_host_sent_;
268  //! mutex for working on the data structure
269  std::mutex mutex_;
270 };
271 
272 //! \}
273 
274 } // namespace data
275 } // namespace thrill
276 
277 #endif // !THRILL_DATA_STREAM_DATA_HEADER
278 
279 /******************************************************************************/
StreamSink is an BlockSink that sends data via a network socket to the StreamData object on a differe...
Definition: stream_sink.hpp:38
common::StatsTimerStopped tx_timespan_
Timers from first rx / tx package until rx / tx direction is closed.
Base class for common structures for ConcatStream and MixedStream.
Definition: stream_data.hpp:46
A simple semaphore implementation using C++11 synchronization methods.
Definition: semaphore.hpp:26
StreamSetBase * stream_set_base_
pointer to StreamSetBase containing this StreamData
size_t my_worker_rank() const
Returns my_worker_rank_.
Multiplexer & multiplexer_
reference to multiplexer
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
Multiplexer & multiplexer_
reference to multiplexer
std::vector< StreamDataPtr > streams_
&#39;owns&#39; all streams belonging to one stream id for all local workers.
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
Definition: multiplexer.hpp:33
std::atomic< size_t > remaining_closing_blocks_
StreamId id_
stream id
common::StatsTimerStart tx_lifetime_
Timers from creation of stream until rx / tx direction is closed.
size_t local_worker_id_
local worker id
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:59
std::vector< size_t > writers_closed_per_host_sent_
number of writers closed per host, message is set when all are closed
size_t my_worker_rank_
rank of this worker
Definition: stream_data.hpp:81
std::vector< size_t > writers_closed_per_host_
number of writers closed per host, message is set when all are closed
size_t num_hosts() const
Number of hosts in system.
Definition: stream_data.hpp:99
size_t workers_per_host() const
Returns workers_per_host.
size_t remaining_
countdown to destruction
size_t num_hosts() const
Number of hosts in system.
tlx::Semaphore sem_closing_blocks_
number of received stream closing Blocks.
static constexpr bool debug
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
Base class for StreamSet.
const StreamId & id() const
Return stream id.
Definition: stream_data.hpp:91
StreamId id_
our own stream id.
size_t dia_id_
associated DIANode id.
size_t num_workers() const
Number of workers in system.
size_t my_host_rank() const
Returns my_host_rank.
Definition: stream_data.hpp:97
size_t StreamId
Definition: stream_data.hpp:32
tlx::Semaphore sem_queue_
std::mutex mutex_
mutex for working on the data structure
Provides reference counting abilities for use with CountingPtr.
size_t workers_per_host() const
Returns workers_per_host.