Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
stream.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/stream.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Tobias Sturm <[email protected]>
7  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_STREAM_HEADER
14 #define THRILL_DATA_STREAM_HEADER
15 
19 #include <thrill/data/file.hpp>
23 
24 #include <mutex>
25 #include <vector>
26 
27 namespace thrill {
28 namespace data {
29 
30 //! \addtogroup data_layer
31 //! \{
32 
33 /******************************************************************************/
34 //! Stream - base class for CatStream and MixStream
35 
37 {
38 public:
41 
42  virtual ~Stream();
43 
44  //! Return stream id
45  virtual const StreamId& id() const = 0;
46 
47  //! Return stream data reference
48  virtual StreamData& data() = 0;
49 
50  //! Return stream data reference
51  virtual const StreamData& data() const = 0;
52 
53  //! shuts down the stream, waits for all closing blocks
54  void Close();
55 
56  //! Creates BlockWriters for each worker. BlockWriter can only be opened
57  //! once, otherwise the block sequence is incorrectly interleaved!
58  virtual Writers GetWriters() = 0;
59 
60  /*!
61  * Scatters a File to many worker: elements from [offset[0],offset[1]) are
62  * sent to the first worker, elements from [offset[1], offset[2]) are sent
63  * to the second worker, ..., elements from [offset[my_rank -
64  * 1],offset[my_rank]) are copied locally, ..., elements from
65  * [offset[num_workers - 1], offset[num_workers]) are sent to the last
66  * worker.
67  *
68  * The number of given offsets must be equal to the
69  * net::Group::num_hosts() * workers_per_host_ + 1.
70  *
71  * /param source File containing the data to be scattered.
72  *
73  * /param offsets - as described above. offsets.size must be equal to
74  * num_workers + 1
75  */
76  template <typename ItemType>
77  void Scatter(File& source, const std::vector<size_t>& offsets,
78  bool consume = false) {
79  // tx_timespan_.StartEventually();
80 
81  File::Reader reader = source.GetReader(consume);
82  size_t current = 0;
83 
84  {
85  // discard first items in Reader
86  size_t limit = offsets[0];
87 #if 0
88  for ( ; current < limit; ++current) {
89  assert(reader.HasNext());
90  // discard one item (with deserialization)
91  reader.template Next<ItemType>();
92  }
93 #else
94  if (current != limit) {
95  reader.template GetItemBatch<ItemType>(limit - current);
96  current = limit;
97  }
98 #endif
99  }
100 
101  Writers writers = GetWriters();
102 
103  size_t num_workers = writers.size();
104  assert(offsets.size() == num_workers + 1);
105 
106  for (size_t worker = 0; worker < num_workers; ++worker) {
107  // write [current,limit) to this worker
108  size_t limit = offsets[worker + 1];
109  assert(current <= limit);
110 #if 0
111  for ( ; current < limit; ++current) {
112  assert(reader.HasNext());
113  // move over one item (with deserialization and serialization)
114  writers[worker](reader.template Next<ItemType>());
115  }
116 #else
117  if (current != limit) {
118  writers[worker].AppendBlocks(
119  reader.template GetItemBatch<ItemType>(limit - current));
120  current = limit;
121  }
122 #endif
123  writers[worker].Close();
124  }
125 
126  // tx_timespan_.Stop();
127  }
128 
129  /**************************************************************************/
130 
131  //! \name Statistics
132  //! \{
133 
134  //! return number of items transmitted
135  size_t tx_items() const;
136 
137  //! return number of bytes transmitted
138  size_t tx_bytes() const;
139 
140  //! return number of blocks transmitted
141  size_t tx_blocks() const;
142 
143  //! return number of items received
144  size_t rx_items() const;
145 
146  //! return number of bytes received
147  size_t rx_bytes() const;
148 
149  //! return number of blocks received
150  size_t rx_blocks() const;
151 
152  /*------------------------------------------------------------------------*/
153 
154  //! return number of items transmitted via network excluding internal tx
155  size_t tx_net_items() const;
156 
157  //! return number of bytes transmitted via network excluding internal tx
158  size_t tx_net_bytes() const;
159 
160  //! return number of blocks transmitted via network excluding internal tx
161  size_t tx_net_blocks() const;
162 
163  //! return number of items received via network excluding internal tx
164  size_t rx_net_items() const;
165 
166  //! return number of bytes received via network excluding internal tx
167  size_t rx_net_bytes() const;
168 
169  //! return number of blocks received via network excluding internal tx
170  size_t rx_net_blocks() const;
171 
172  /*------------------------------------------------------------------------*/
173 
174  //! return number of items transmitted via internal loopback queues
175  size_t tx_int_items() const;
176 
177  //! return number of bytes transmitted via internal loopback queues
178  size_t tx_int_bytes() const;
179 
180  //! return number of blocks transmitted via internal loopback queues
181  size_t tx_int_blocks() const;
182 
183  //! return number of items received via network internal loopback queues
184  size_t rx_int_items() const;
185 
186  //! return number of bytes received via network internal loopback queues
187  size_t rx_int_bytes() const;
188 
189  //! return number of blocks received via network internal loopback queues
190  size_t rx_int_blocks() const;
191 
192  //! \}
193 };
194 
195 //! \}
196 
197 } // namespace data
198 } // namespace thrill
199 
200 #endif // !THRILL_DATA_STREAM_HEADER
201 
202 /******************************************************************************/
size_t tx_net_items() const
return number of items transmitted via network excluding internal tx
Definition: stream.cpp:58
size_t rx_net_bytes() const
return number of bytes received via network excluding internal tx
Definition: stream.cpp:74
size_t rx_blocks() const
return number of blocks received
Definition: stream.cpp:52
Base class for common structures for ConcatStream and MixedStream.
Definition: stream_data.hpp:45
void Scatter(File &source, const std::vector< size_t > &offsets, bool consume=false)
Scatters a File to many worker: elements from [offset[0],offset[1]) are sent to the first worker...
Definition: stream.hpp:77
size_t tx_items() const
return number of items transmitted
Definition: stream.cpp:32
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
size_t rx_items() const
return number of items received
Definition: stream.cpp:44
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
size_t tx_int_bytes() const
return number of bytes transmitted via internal loopback queues
Definition: stream.cpp:88
BlockWriter< StreamSink > Writer
Definition: stream_data.hpp:48
virtual const StreamId & id() const =0
Return stream id.
size_t rx_net_blocks() const
return number of blocks received via network excluding internal tx
Definition: stream.cpp:78
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:66
size_t tx_net_bytes() const
return number of bytes transmitted via network excluding internal tx
Definition: stream.cpp:62
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
void Close()
shuts down the stream, waits for all closing blocks
Definition: stream.cpp:26
size_t tx_bytes() const
return number of bytes transmitted
Definition: stream.cpp:36
virtual StreamData & data()=0
Return stream data reference.
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t tx_int_items() const
return number of items transmitted via internal loopback queues
Definition: stream.cpp:84
size_t rx_int_blocks() const
return number of blocks received via network internal loopback queues
Definition: stream.cpp:104
size_t tx_net_blocks() const
return number of blocks transmitted via network excluding internal tx
Definition: stream.cpp:66
size_t rx_bytes() const
return number of bytes received
Definition: stream.cpp:48
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
Definition: file.cpp:78
size_t rx_net_items() const
return number of items received via network excluding internal tx
Definition: stream.cpp:70
Stream - base class for CatStream and MixStream.
Definition: stream.hpp:36
virtual Writers GetWriters()=0
size_t StreamId
Definition: stream_data.hpp:32
size_t rx_int_items() const
return number of items received via network internal loopback queues
Definition: stream.cpp:96
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
size_t tx_int_blocks() const
return number of blocks transmitted via internal loopback queues
Definition: stream.cpp:92
size_t tx_blocks() const
return number of blocks transmitted
Definition: stream.cpp:40
Provides reference counting abilities for use with CountingPtr.
size_t rx_int_bytes() const
return number of bytes received via network internal loopback queues
Definition: stream.cpp:100
virtual ~Stream()
Definition: stream.cpp:23