Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
stream_data.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/stream_data.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Tobias Sturm <[email protected]>
7  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_STREAM_DATA_HEADER
14 #define THRILL_DATA_STREAM_DATA_HEADER
15 
19 #include <thrill/data/file.hpp>
21 #include <tlx/semaphore.hpp>
22 
23 #include <mutex>
24 #include <vector>
25 
26 namespace thrill {
27 namespace data {
28 
29 //! \addtogroup data_layer
30 //! \{
31 
32 using StreamId = size_t;
33 
34 enum class MagicByte : uint8_t {
36 };
37 
38 class StreamSink;
39 
40 /*!
41  * Base class for common structures for ConcatStream and MixedStream. This is
42  * also a virtual base class use by Multiplexer to pass blocks to streams!
43  * Instead, it contains common items like stats.
44  */
46 {
47 public:
49 
50  /*!
51  * An extra class derived from std::vector<> for delivery of the
52  * BlockWriters of a Stream. The purpose is to enforce a custom way to close
53  * stream writers cyclically such that PE k first sends it's Close-packet to
54  * k+1, k+2, etc.
55  */
56  class Writers : public std::vector<BlockWriter<StreamSink> >
57  {
58  public:
59  Writers(size_t my_worker_rank = 0);
60 
61  //! copyable: default copy-constructor
62  Writers(const Writers&) = default;
63  //! copyable: default assignment operator
64  Writers& operator = (const Writers&) = default;
65  //! move-constructor: default
66  Writers(Writers&&) = default;
67  //! move-assignment operator: default
68  Writers& operator = (Writers&&) = default;
69 
70  //! custom destructor to close writers is a cyclic fashion
71  void Close();
72 
73  //! custom destructor to close writers is a cyclic fashion
74  ~Writers();
75 
76  private:
77  //! rank of this worker
79  };
80 
81  StreamData(Multiplexer& multiplexer, size_t send_size_limit,
82  const StreamId& id, size_t local_worker_id, size_t dia_id);
83 
84  virtual ~StreamData();
85 
86  //! Return stream id
87  const StreamId& id() const { return id_; }
88 
89  //! Returns my_host_rank
90  size_t my_host_rank() const { return multiplexer_.my_host_rank(); }
91  //! Number of hosts in system
92  size_t num_hosts() const { return multiplexer_.num_hosts(); }
93  //! Number of workers in system
94  size_t num_workers() const { return multiplexer_.num_workers(); }
95 
96  //! Returns workers_per_host
97  size_t workers_per_host() const { return multiplexer_.workers_per_host(); }
98  //! Returns my_worker_rank_
99  size_t my_worker_rank() const {
101  }
102 
103  void OnAllClosed(const char* stream_type);
104 
105  //! shuts the stream down.
106  virtual void Close() = 0;
107 
108  virtual bool closed() const = 0;
109 
110  //! Creates BlockWriters for each worker. BlockWriter can only be opened
111  //! once, otherwise the block sequence is incorrectly interleaved!
112  virtual Writers GetWriters() = 0;
113 
114  ///////// expose these members - getters would be too java-ish /////////////
115 
116  //! StatsCounter for incoming data transfer. Does not include loopback data
117  //! transfer
118  std::atomic<size_t>
120 
121  //! StatsCounters for outgoing data transfer - shared by all sinks. Does
122  //! not include loopback data transfer
123  std::atomic<size_t>
125 
126  //! StatsCounter for incoming data transfer. Exclusively contains only
127  //! loopback (internal) data transfer
128  std::atomic<size_t>
130 
131  //! StatsCounters for outgoing data transfer - shared by all sinks.
132  //! Exclusively contains only loopback (internal) data transfer
133  std::atomic<size_t>
135 
136  //! Timers from creation of stream until rx / tx direction is closed.
138 
139  //! Timers from first rx / tx package until rx / tx direction is closed.
141 
142  //! semaphore to stall the amount of PinnedBlocks (measured in bytes) passed
143  //! to the network layer for transmission.
145 
146  ///////////////////////////////////////////////////////////////////////////
147 
148 protected:
149  //! our own stream id.
151 
153 
154  //! Associated DIANode id.
155  size_t dia_id_;
156 
157  //! reference to multiplexer
159 
160  //! number of remaining expected stream closing operations. Required to know
161  //! when to stop rx_lifetime
162  std::atomic<size_t> remaining_closing_blocks_;
163 
164  //! number of received stream closing Blocks.
166 
167  //! friends for access to multiplexer_
168  friend class StreamSink;
169 };
170 
172 
173 /*!
174  * Base class for StreamSet.
175  */
177 {
178 public:
179  virtual ~StreamSetBase() { }
180 
181  //! Close all streams in the set.
182  virtual void Close() = 0;
183 };
184 
185 /*!
186  * Simple structure that holds a all stream instances for the workers on the
187  * local host for a given stream id.
188  */
189 template <typename StreamData>
190 class StreamSet : public StreamSetBase
191 {
192 public:
194 
195  //! Creates a StreamSet with the given number of streams (num workers per
196  //! host).
197  StreamSet(Multiplexer& multiplexer, size_t send_size_limit,
198  StreamId id, size_t workers_per_host, size_t dia_id) {
199  for (size_t i = 0; i < workers_per_host; ++i) {
200  streams_.emplace_back(
201  tlx::make_counting<StreamData>(
202  multiplexer, send_size_limit, id, i, dia_id));
203  }
204  remaining_ = workers_per_host;
205  }
206 
207  //! Returns the stream that will be consumed by the worker with the given
208  //! local id
209  StreamDataPtr Peer(size_t local_worker_id) {
210  assert(local_worker_id < streams_.size());
211  return streams_[local_worker_id];
212  }
213 
214  //! Release local_worker_id, returns true when all individual streams are
215  //! done.
216  bool Release(size_t local_worker_id) {
217  assert(local_worker_id < streams_.size());
218  if (streams_[local_worker_id]) {
219  assert(remaining_ > 0);
220  streams_[local_worker_id].reset();
221  --remaining_;
222  }
223  return (remaining_ == 0);
224  }
225 
226  void Close() final {
227  for (StreamDataPtr& c : streams_)
228  c->Close();
229  }
230 
231 private:
232  //! 'owns' all streams belonging to one stream id for all local workers.
233  std::vector<StreamDataPtr> streams_;
234  //! countdown to destruction
235  size_t remaining_;
236 };
237 
238 //! \}
239 
240 } // namespace data
241 } // namespace thrill
242 
243 #endif // !THRILL_DATA_STREAM_DATA_HEADER
244 
245 /******************************************************************************/
std::atomic< size_t > rx_net_items_
StreamSink is an BlockSink that sends data via a network socket to the StreamData object on a differe...
Definition: stream_sink.hpp:38
common::StatsTimerStopped tx_timespan_
Timers from first rx / tx package until rx / tx direction is closed.
Base class for common structures for ConcatStream and MixedStream.
Definition: stream_data.hpp:45
size_t my_host_rank() const
my rank among the hosts.
Definition: multiplexer.hpp:93
A simple semaphore implementation using C++11 synchronization methods.
Definition: semaphore.hpp:26
size_t workers_per_host() const
Returns workers_per_host.
Definition: stream_data.hpp:97
std::atomic< size_t > tx_net_items_
size_t num_hosts() const
total number of hosts.
Definition: multiplexer.hpp:88
std::atomic< size_t > rx_int_items_
bool Release(size_t local_worker_id)
virtual Writers GetWriters()=0
size_t num_hosts() const
Number of hosts in system.
Definition: stream_data.hpp:92
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
std::atomic< size_t > rx_net_bytes_
Multiplexer & multiplexer_
reference to multiplexer
StreamData(Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Definition: stream_data.cpp:23
size_t my_host_rank() const
Returns my_host_rank.
Definition: stream_data.hpp:90
std::atomic< size_t > tx_int_bytes_
std::vector< StreamDataPtr > streams_
'owns' all streams belonging to one stream id for all local workers.
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
std::atomic< size_t > remaining_closing_blocks_
size_t num_workers() const
total number of workers.
Definition: multiplexer.hpp:98
StreamSet(Multiplexer &multiplexer, size_t send_size_limit, StreamId id, size_t workers_per_host, size_t dia_id)
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:66
std::atomic< size_t > rx_int_bytes_
common::StatsTimerStart tx_lifetime_
Timers from creation of stream until rx / tx direction is closed.
size_t my_worker_rank() const
Returns my_worker_rank_.
Definition: stream_data.hpp:99
common::StatsTimerStopped rx_timespan_
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
size_t my_worker_rank_
rank of this worker
Definition: stream_data.hpp:78
size_t remaining_
countdown to destruction
std::atomic< size_t > tx_net_bytes_
~Writers()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:74
tlx::Semaphore sem_closing_blocks_
number of received stream closing Blocks.
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
Base class for StreamSet.
Writers & operator=(const Writers &)=default
copyable: default assignment operator
Writers(size_t my_worker_rank=0)
Definition: stream_data.cpp:62
StreamId id_
our own stream id.
virtual void Close()=0
shuts the stream down.
void OnAllClosed(const char *stream_type)
Definition: stream_data.cpp:35
size_t dia_id_
Associated DIANode id.
std::atomic< size_t > rx_int_blocks_
std::atomic< size_t > tx_net_blocks_
virtual bool closed() const =0
size_t workers_per_host() const
number of workers per host
virtual void Close()=0
Close all streams in the set.
void Close() final
Close all streams in the set.
size_t StreamId
Definition: stream_data.hpp:32
std::atomic< size_t > tx_int_blocks_
const StreamId & id() const
Return stream id.
Definition: stream_data.hpp:87
std::atomic< size_t > rx_net_blocks_
std::atomic< size_t > tx_int_items_
tlx::Semaphore sem_queue_
size_t num_workers() const
Number of workers in system.
Definition: stream_data.hpp:94
Provides reference counting abilities for use with CountingPtr.
StreamDataPtr Peer(size_t local_worker_id)
common::StatsTimerStart rx_lifetime_