Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
stream.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/stream.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Tobias Sturm <[email protected]>
7  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_STREAM_HEADER
14 #define THRILL_DATA_STREAM_HEADER
15 
19 #include <thrill/data/file.hpp>
23 
24 #include <mutex>
25 #include <vector>
26 
27 namespace thrill {
28 namespace data {
29 
30 //! \addtogroup data_layer
31 //! \{
32 
33 /******************************************************************************/
34 //! Stream - base class for CatStream and MixStream
35 
37 {
38 public:
41 
42  virtual ~Stream();
43 
44  //! Return stream id
45  virtual const StreamId& id() const = 0;
46 
47  //! Return stream data reference
48  virtual StreamData& data() = 0;
49 
50  //! Return stream data reference
51  virtual const StreamData& data() const = 0;
52 
53  //! shuts down the stream, waits for all closing blocks
54  void Close();
55 
56  //! Creates BlockWriters for each worker. BlockWriter can only be opened
57  //! once, otherwise the block sequence is incorrectly interleaved!
58  virtual Writers GetWriters() = 0;
59 
60  /*!
61  * Scatters a File to many worker: elements from [offset[0],offset[1]) are
62  * sent to the first worker, elements from [offset[1], offset[2]) are sent
63  * to the second worker, ..., elements from [offset[my_rank -
64  * 1],offset[my_rank]) are copied locally, ..., elements from
65  * [offset[num_workers - 1], offset[num_workers]) are sent to the last
66  * worker.
67  *
68  * The number of given offsets must be equal to the
69  * net::Group::num_hosts() * workers_per_host_ + 1.
70  *
71  * /param source File containing the data to be scattered.
72  *
73  * /param offsets - as described above. offsets.size must be equal to
74  * num_workers + 1
75  */
76  template <typename ItemType>
77  void Scatter(File& source, const std::vector<size_t>& offsets,
78  bool consume = false) {
79  if (consume)
80  return ScatterConsume<ItemType>(source, offsets);
81  else
82  return ScatterKeep<ItemType>(source, offsets);
83  }
84 
85  /*!
86  * Consuming Version of Scatter() see documentation there.
87  */
88  template <typename ItemType>
89  void ScatterConsume(File& source, const std::vector<size_t>& offsets) {
90 
91  Writers writers = GetWriters();
92 
93  size_t num_workers = writers.size();
94  assert(offsets.size() == num_workers + 1);
95 
96  size_t my_rank = data().my_worker_rank();
97 
98  // send blocks from my_rank to last worker, store the other Blocks
99 
100  std::vector<std::vector<Block> > block_store;
101 
102  File::ConsumeReader reader =
103  source.GetConsumeReader(/* prefetch_size */ 0);
104  size_t current = 0;
105 
106  {
107  // discard first items in Reader
108  size_t limit = offsets[0];
109  if (current != limit) {
110  reader.template GetItemBatch<ItemType>(limit - current);
111  current = limit;
112  }
113  }
114 
115  for (size_t worker = 0; worker < num_workers; ++worker) {
116  // write [current,limit) to this worker
117  size_t limit = offsets[worker + 1];
118  assert(current <= limit);
119  if (worker < my_rank) {
120  if (current != limit) {
121  block_store.emplace_back(
122  reader.template GetItemBatch<ItemType>(limit - current));
123  }
124  else {
125  block_store.emplace_back(std::vector<Block>());
126  }
127  }
128  else {
129  if (current != limit) {
130  writers[worker].AppendBlocks(
131  reader.template GetItemBatch<ItemType>(limit - current));
132  }
133  }
134  current = limit;
135  }
136 
137  assert(block_store.size() == my_rank);
138 
139  for (size_t worker = 0; worker < my_rank; ++worker) {
140  if (!block_store[worker].empty())
141  writers[worker].AppendBlocks(std::move(block_store[worker]));
142  }
143  }
144 
145  /*!
146  * Keep Version of Scatter() see documentation there.
147  */
148  template <typename ItemType>
149  void ScatterKeep(File& source, const std::vector<size_t>& offsets) {
150 
151  File::KeepReader reader = source.GetKeepReader(/* prefetch_size */ 0);
152  size_t current = 0;
153 
154  {
155  // discard first items in Reader
156  size_t limit = offsets[0];
157 #if 0
158  for ( ; current < limit; ++current) {
159  assert(reader.HasNext());
160  // discard one item (with deserialization)
161  reader.template Next<ItemType>();
162  }
163 #else
164  if (current != limit) {
165  reader.template GetItemBatch<ItemType>(limit - current);
166  current = limit;
167  }
168 #endif
169  }
170 
171  Writers writers = GetWriters();
172 
173  size_t num_workers = writers.size();
174  assert(offsets.size() == num_workers + 1);
175 
176  for (size_t worker = 0; worker < num_workers; ++worker) {
177  // write [current,limit) to this worker
178  size_t limit = offsets[worker + 1];
179  assert(current <= limit);
180 #if 0
181  for ( ; current < limit; ++current) {
182  assert(reader.HasNext());
183  // move over one item (with deserialization and serialization)
184  writers[worker](reader.template Next<ItemType>());
185  }
186 #else
187  if (current != limit) {
188  writers[worker].AppendBlocks(
189  reader.template GetItemBatch<ItemType>(limit - current));
190  current = limit;
191  }
192 #endif
193  writers[worker].Close();
194  }
195  }
196 
197  /**************************************************************************/
198 
199  //! \name Statistics
200  //! \{
201 
202  //! return number of items transmitted
203  size_t tx_items() const;
204 
205  //! return number of bytes transmitted
206  size_t tx_bytes() const;
207 
208  //! return number of blocks transmitted
209  size_t tx_blocks() const;
210 
211  //! return number of items received
212  size_t rx_items() const;
213 
214  //! return number of bytes received
215  size_t rx_bytes() const;
216 
217  //! return number of blocks received
218  size_t rx_blocks() const;
219 
220  /*------------------------------------------------------------------------*/
221 
222  //! return number of items transmitted via network excluding internal tx
223  size_t tx_net_items() const;
224 
225  //! return number of bytes transmitted via network excluding internal tx
226  size_t tx_net_bytes() const;
227 
228  //! return number of blocks transmitted via network excluding internal tx
229  size_t tx_net_blocks() const;
230 
231  //! return number of items received via network excluding internal tx
232  size_t rx_net_items() const;
233 
234  //! return number of bytes received via network excluding internal tx
235  size_t rx_net_bytes() const;
236 
237  //! return number of blocks received via network excluding internal tx
238  size_t rx_net_blocks() const;
239 
240  /*------------------------------------------------------------------------*/
241 
242  //! return number of items transmitted via internal loopback queues
243  size_t tx_int_items() const;
244 
245  //! return number of bytes transmitted via internal loopback queues
246  size_t tx_int_bytes() const;
247 
248  //! return number of blocks transmitted via internal loopback queues
249  size_t tx_int_blocks() const;
250 
251  //! return number of items received via network internal loopback queues
252  size_t rx_int_items() const;
253 
254  //! return number of bytes received via network internal loopback queues
255  size_t rx_int_bytes() const;
256 
257  //! return number of blocks received via network internal loopback queues
258  size_t rx_int_blocks() const;
259 
260  //! \}
261 };
262 
263 //! \}
264 
265 } // namespace data
266 } // namespace thrill
267 
268 #endif // !THRILL_DATA_STREAM_HEADER
269 
270 /******************************************************************************/
size_t tx_net_items() const
return number of items transmitted via network excluding internal tx
Definition: stream.cpp:58
size_t rx_net_bytes() const
return number of bytes received via network excluding internal tx
Definition: stream.cpp:74
size_t rx_blocks() const
return number of blocks received
Definition: stream.cpp:52
Base class for common structures for ConcatStream and MixedStream.
Definition: stream_data.hpp:46
void Scatter(File &source, const std::vector< size_t > &offsets, bool consume=false)
Scatters a File to many worker: elements from [offset[0],offset[1]) are sent to the first worker...
Definition: stream.hpp:77
size_t tx_items() const
return number of items transmitted
Definition: stream.cpp:32
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
size_t rx_items() const
return number of items received
Definition: stream.cpp:44
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
size_t tx_int_bytes() const
return number of bytes transmitted via internal loopback queues
Definition: stream.cpp:88
BlockWriter< StreamSink > Writer
Definition: stream_data.hpp:51
virtual const StreamId & id() const =0
Return stream id.
size_t rx_net_blocks() const
return number of blocks received via network excluding internal tx
Definition: stream.cpp:78
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:92
void ScatterKeep(File &source, const std::vector< size_t > &offsets)
Keep Version of Scatter() see documentation there.
Definition: stream.hpp:149
size_t tx_net_bytes() const
return number of bytes transmitted via network excluding internal tx
Definition: stream.cpp:62
size_t my_worker_rank() const
Returns my_worker_rank_.
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:59
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:73
void Close()
shuts down the stream, waits for all closing blocks
Definition: stream.cpp:26
size_t tx_bytes() const
return number of bytes transmitted
Definition: stream.cpp:36
virtual StreamData & data()=0
Return stream data reference.
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t tx_int_items() const
return number of items transmitted via internal loopback queues
Definition: stream.cpp:84
size_t rx_int_blocks() const
return number of blocks received via network internal loopback queues
Definition: stream.cpp:104
size_t tx_net_blocks() const
return number of blocks transmitted via network excluding internal tx
Definition: stream.cpp:66
size_t rx_bytes() const
return number of bytes received
Definition: stream.cpp:48
void ScatterConsume(File &source, const std::vector< size_t > &offsets)
Consuming Version of Scatter() see documentation there.
Definition: stream.hpp:89
size_t rx_net_items() const
return number of items received via network excluding internal tx
Definition: stream.cpp:70
Stream - base class for CatStream and MixStream.
Definition: stream.hpp:36
virtual Writers GetWriters()=0
size_t StreamId
Definition: stream_data.hpp:32
size_t rx_int_items() const
return number of items received via network internal loopback queues
Definition: stream.cpp:96
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
size_t tx_int_blocks() const
return number of blocks transmitted via internal loopback queues
Definition: stream.cpp:92
KeepReader GetKeepReader(size_t prefetch_size=File::default_prefetch_size_) const
Get BlockReader for beginning of File.
Definition: file.cpp:68
size_t tx_blocks() const
return number of blocks transmitted
Definition: stream.cpp:40
Provides reference counting abilities for use with CountingPtr.
size_t rx_int_bytes() const
return number of bytes received via network internal loopback queues
Definition: stream.cpp:100
virtual ~Stream()
Definition: stream.cpp:23