Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
cat_stream.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/cat_stream.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_CAT_STREAM_HEADER
14 #define THRILL_DATA_CAT_STREAM_HEADER
15 
18 #include <thrill/data/stream.hpp>
19 
20 #include <string>
21 #include <vector>
22 
23 namespace thrill {
24 namespace data {
25 
26 //! \addtogroup data_layer
27 //! \{
28 
29 /*!
30  * A Stream is a virtual set of connections to all other worker instances,
31  * hence a "Stream" bundles them to a logical communication context. We call an
32  * individual connection from a worker to another worker a "Host".
33  *
34  * To use a Stream, one can get a vector of BlockWriter via OpenWriters() of
35  * outbound Stream. The vector is of size of workers in the system.
36  * One can then write items destined to the
37  * corresponding worker. The written items are buffered into a Block and only
38  * sent when the Block is full. To force a send, use BlockWriter::Flush(). When
39  * all items are sent, the BlockWriters **must** be closed using
40  * BlockWriter::Close().
41  *
42  * To read the inbound Connection items, one can get a vector of BlockReader via
43  * OpenReaders(), which can then be used to read items sent by individual
44  * workers.
45  *
46  * Alternatively, one can use OpenReader() to get a BlockReader which delivers
47  * all items from *all* worker in worker order (concatenating all inbound
48  * Connections).
49  *
50  * As soon as all attached streams of the Stream have been Close() the number of
51  * expected streams is reached, the stream is marked as finished and no more
52  * data will arrive.
53  */
54 class CatStreamData final : public StreamData
55 {
56 public:
57  static constexpr bool debug = false;
58  static constexpr bool debug_data = false;
59 
62 
65 
68 
69  using Handle = CatStream;
70 
71  //! Creates a new stream instance
72  CatStreamData(Multiplexer& multiplexer, size_t send_size_limit,
73  const StreamId& id, size_t local_worker_id, size_t dia_id);
74 
75  //! non-copyable: delete copy-constructor
76  CatStreamData(const CatStreamData&) = delete;
77  //! non-copyable: delete assignment operator
78  CatStreamData& operator = (const CatStreamData&) = delete;
79  //! move-constructor: default
80  CatStreamData(CatStreamData&&) = default;
81 
82  ~CatStreamData() final;
83 
84  //! change dia_id after construction (needed because it may be unknown at
85  //! construction)
86  void set_dia_id(size_t dia_id);
87 
88  //! Creates BlockWriters for each worker. BlockWriter can only be opened
89  //! once, otherwise the block sequence is incorrectly interleaved!
90  Writers GetWriters() final;
91 
92  //! Creates a BlockReader for each worker. The BlockReaders are attached to
93  //! the BlockQueues in the Stream and wait for further Blocks to arrive or
94  //! the Stream's remote close. These Readers _always_ consume!
95  std::vector<Reader> GetReaders();
96 
97  //! Gets a CatBlockSource which includes all incoming queues of this stream.
98  CatBlockSource GetCatBlockSource(bool consume);
99 
100  //! Creates a BlockReader which concatenates items from all workers in
101  //! worker rank order. The BlockReader is attached to one \ref
102  //! CatBlockSource which includes all incoming queues of this stream.
103  CatReader GetCatReader(bool consume);
104 
105  //! Open a CatReader (function name matches a method in File and MixStream).
106  CatReader GetReader(bool consume);
107 
108  //! shuts the stream down.
109  void Close() final;
110 
111  //! Indicates if the stream is closed - meaning all remaining streams have
112  //! been closed. This does *not* include the loopback stream
113  bool closed() const final;
114 
115 private:
116  bool is_closed_ = false;
117 
118  struct SeqReordering;
119 
120  //! Block Sequence numbers
121  std::vector<SeqReordering> seq_;
122 
123  //! BlockQueues to store incoming Blocks with no attached destination.
125 
126  //! for calling methods to deliver blocks
127  friend class Multiplexer;
128 
129  //! called from Multiplexer when there is a new Block on a
130  //! Stream.
131  void OnStreamBlock(size_t from, uint32_t seq, PinnedBlock&& b);
132 
133  void OnStreamBlockOrdered(size_t from, PinnedBlock&& b);
134 
135  //! Returns the loopback queue for the worker of this stream.
136  BlockQueue * loopback_queue(size_t from_worker_id);
137 };
138 
139 // we have two types of CatStream smart pointers: one for internal use in the
140 // Multiplexer (ordinary CountingPtr), and another for public handles in the
141 // DIANodes. Once all public handles are deleted, the CatStream is deactivated.
142 using CatStreamDataPtr = tlx::CountingPtr<CatStreamData>;
143 
144 using CatStreamSet = StreamSet<CatStreamData>;
145 using CatStreamSetPtr = tlx::CountingPtr<CatStreamSet>;
146 
147 //! Ownership handle onto a CatStreamData
148 class CatStream final : public Stream
149 {
150 public:
153 
155 
156  explicit CatStream(const CatStreamDataPtr& ptr);
157 
158  //! When the user handle is destroyed, close the stream (but maybe not
159  //! destroy the data object)
160  ~CatStream();
161 
162  const StreamId& id() const final;
163 
164  //! Return stream data reference
165  StreamData& data() final;
166 
167  //! Return stream data reference
168  const StreamData& data() const final;
169 
170  //! Creates BlockWriters for each worker. BlockWriter can only be opened
171  //! once, otherwise the block sequence is incorrectly interleaved!
172  Writers GetWriters() final;
173 
174  //! Creates a BlockReader for each worker. The BlockReaders are attached to
175  //! the BlockQueues in the Stream and wait for further Blocks to arrive or
176  //! the Stream's remote close. These Readers _always_ consume!
177  std::vector<Reader> GetReaders();
178 
179  //! Creates a BlockReader which concatenates items from all workers in
180  //! worker rank order. The BlockReader is attached to one \ref
181  //! CatBlockSource which includes all incoming queues of this stream.
182  CatReader GetCatReader(bool consume);
183 
184  //! Open a CatReader (function name matches a method in File and MixStream).
185  CatReader GetReader(bool consume);
186 
187 private:
189 };
190 
191 using CatStreamPtr = tlx::CountingPtr<CatStream>;
192 
193 //! \}
194 
195 } // namespace data
196 } // namespace thrill
197 
198 #endif // !THRILL_DATA_CAT_STREAM_HEADER
199 
200 /******************************************************************************/
Writers GetWriters() final
Definition: cat_stream.cpp:83
std::vector< BlockQueue > queues_
BlockQueues to store incoming Blocks with no attached destination.
Definition: cat_stream.hpp:124
static constexpr bool debug
Definition: cat_stream.hpp:57
Base class for common structures for ConcatStream and MixedStream.
Definition: stream_data.hpp:45
A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them...
Definition: cat_stream.hpp:54
BlockQueueReader Reader
Definition: cat_stream.hpp:66
CatReader GetCatReader(bool consume)
Definition: cat_stream.cpp:179
CatBlockSource is a BlockSource which concatenates all Blocks available from a vector of BlockSources...
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
CatReader GetReader(bool consume)
Open a CatReader (function name matches a method in File and MixStream).
Definition: cat_stream.cpp:183
BlockWriter< StreamSink > Writer
Definition: stream_data.hpp:48
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
Definition: multiplexer.hpp:33
void OnStreamBlock(size_t from, uint32_t seq, PinnedBlock &&b)
Definition: cat_stream.cpp:235
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
std::vector< Reader > GetReaders()
Definition: cat_stream.cpp:148
void Close() final
shuts the stream down.
Definition: cat_stream.cpp:187
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
CatBlockSource GetCatBlockSource(bool consume)
Gets a CatBlockSource which includes all incoming queues of this stream.
Definition: cat_stream.cpp:163
Ownership handle onto a CatStreamData.
Definition: cat_stream.hpp:148
CatStreamData(Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Creates a new stream instance.
Definition: cat_stream.cpp:28
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
High-performance smart pointer used as a wrapping reference counting pointer.
BlockQueue * loopback_queue(size_t from_worker_id)
Returns the loopback queue for the worker of this stream.
Definition: cat_stream.cpp:306
BlockReader< CatBlockSource > CatBlockReader
Definition: cat_stream.hpp:64
static constexpr bool debug_data
Definition: cat_stream.hpp:58
Stream - base class for CatStream and MixStream.
Definition: stream.hpp:36
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
size_t StreamId
Definition: stream_data.hpp:32
bool closed() const final
Definition: cat_stream.cpp:219
void set_dia_id(size_t dia_id)
Definition: cat_stream.cpp:76
const StreamId & id() const
Return stream id.
Definition: stream_data.hpp:87
BlockReader< BlockQueueSource > BlockQueueReader
Definition: cat_stream.hpp:61
void OnStreamBlockOrdered(size_t from, PinnedBlock &&b)
Definition: cat_stream.cpp:281
CatStreamData & operator=(const CatStreamData &)=delete
non-copyable: delete assignment operator
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
Definition: block_queue.hpp:47
A BlockSource to read Block from a BlockQueue using a BlockReader.
std::vector< SeqReordering > seq_
Block Sequence numbers.
Definition: cat_stream.hpp:118