Thrill  0.1
cat_stream.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/cat_stream.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_CAT_STREAM_HEADER
14 #define THRILL_DATA_CAT_STREAM_HEADER
15 
18 #include <thrill/data/stream.hpp>
19 
20 #include <string>
21 #include <vector>
22 
23 namespace thrill {
24 namespace data {
25 
26 //! \addtogroup data_layer
27 //! \{
28 
29 /*!
30  * A Stream is a virtual set of connections to all other worker instances,
31  * hence a "Stream" bundles them to a logical communication context. We call an
32  * individual connection from a worker to another worker a "Host".
33  *
34  * To use a Stream, one can get a vector of BlockWriter via OpenWriters() of
35  * outbound Stream. The vector is of size of workers in the system.
36  * One can then write items destined to the
37  * corresponding worker. The written items are buffered into a Block and only
38  * sent when the Block is full. To force a send, use BlockWriter::Flush(). When
39  * all items are sent, the BlockWriters **must** be closed using
40  * BlockWriter::Close().
41  *
42  * To read the inbound Connection items, one can get a vector of BlockReader via
43  * OpenReaders(), which can then be used to read items sent by individual
44  * workers.
45  *
46  * Alternatively, one can use OpenReader() to get a BlockReader which delivers
47  * all items from *all* worker in worker order (concatenating all inbound
48  * Connections).
49  *
50  * As soon as all attached streams of the Stream have been Close() the number of
51  * expected streams is reached, the stream is marked as finished and no more
52  * data will arrive.
53  */
54 class CatStreamData final : public StreamData
55 {
56 public:
57  static constexpr bool debug = false;
58  static constexpr bool debug_data = false;
59 
62 
65 
68 
69  using Handle = CatStream;
70 
71  //! Creates a new stream instance
72  CatStreamData(StreamSetBase* stream_set_base,
73  Multiplexer& multiplexer, size_t send_size_limit,
74  const StreamId& id, size_t local_worker_id, size_t dia_id);
75 
76  //! non-copyable: delete copy-constructor
77  CatStreamData(const CatStreamData&) = delete;
78  //! non-copyable: delete assignment operator
79  CatStreamData& operator = (const CatStreamData&) = delete;
80  //! move-constructor: default
81  CatStreamData(CatStreamData&&) = default;
82 
83  ~CatStreamData() final;
84 
85  //! return stream type string
86  const char * stream_type() final;
87 
88  //! change dia_id after construction (needed because it may be unknown at
89  //! construction)
90  void set_dia_id(size_t dia_id);
91 
92  //! Creates BlockWriters for each worker. BlockWriter can only be opened
93  //! once, otherwise the block sequence is incorrectly interleaved!
94  Writers GetWriters() final;
95 
96  //! Creates a BlockReader for each worker. The BlockReaders are attached to
97  //! the BlockQueues in the Stream and wait for further Blocks to arrive or
98  //! the Stream's remote close. These Readers _always_ consume!
100 
101  //! Gets a CatBlockSource which includes all incoming queues of this stream.
102  CatBlockSource GetCatBlockSource(bool consume);
103 
104  //! Creates a BlockReader which concatenates items from all workers in
105  //! worker rank order. The BlockReader is attached to one \ref
106  //! CatBlockSource which includes all incoming queues of this stream.
107  CatReader GetCatReader(bool consume);
108 
109  //! Open a CatReader (function name matches a method in File and MixStream).
110  CatReader GetReader(bool consume);
111 
112  //! shuts the stream down.
113  void Close() final;
114 
115  //! Indicates if the stream is closed - meaning all remaining streams have
116  //! been closed. This does *not* include the loopback stream
117  bool closed() const final;
118 
119  //! check if inbound queue is closed
120  bool is_queue_closed(size_t from);
121 
122 private:
123  bool is_closed_ = false;
124 
125  struct SeqReordering;
126 
127  //! Block Sequence numbers
128  std::vector<SeqReordering> seq_;
129 
130  //! BlockQueues to store incoming Blocks with no attached destination.
132 
133  //! for calling methods to deliver blocks
134  friend class Multiplexer;
135 
136  //! called from Multiplexer when there is a new Block on a
137  //! Stream.
138  void OnStreamBlock(size_t from, uint32_t seq, Block&& b);
139 
140  void OnStreamBlockOrdered(size_t from, Block&& b);
141 
142  //! Returns the loopback queue for the worker of this stream.
143  BlockQueue * loopback_queue(size_t from_worker_id);
144 };
145 
146 // we have two types of CatStream smart pointers: one for internal use in the
147 // Multiplexer (ordinary CountingPtr), and another for public handles in the
148 // DIANodes. Once all public handles are deleted, the CatStream is deactivated.
149 using CatStreamDataPtr = tlx::CountingPtr<CatStreamData>;
150 
151 using CatStreamSet = StreamSet<CatStreamData>;
152 using CatStreamSetPtr = tlx::CountingPtr<CatStreamSet>;
153 
154 //! Ownership handle onto a CatStreamData
155 class CatStream final : public Stream
156 {
157 public:
160 
162 
163  explicit CatStream(const CatStreamDataPtr& ptr);
164 
165  //! When the user handle is destroyed, close the stream (but maybe not
166  //! destroy the data object)
167  ~CatStream();
168 
169  const StreamId& id() const final;
170 
171  //! Return stream data reference
172  StreamData& data() final;
173 
174  //! Return stream data reference
175  const StreamData& data() const final;
176 
177  //! Creates BlockWriters for each worker. BlockWriter can only be opened
178  //! once, otherwise the block sequence is incorrectly interleaved!
179  Writers GetWriters() final;
180 
181  //! Creates a BlockReader for each worker. The BlockReaders are attached to
182  //! the BlockQueues in the Stream and wait for further Blocks to arrive or
183  //! the Stream's remote close. These Readers _always_ consume!
184  std::vector<Reader> GetReaders();
185 
186  //! Creates a BlockReader which concatenates items from all workers in
187  //! worker rank order. The BlockReader is attached to one \ref
188  //! CatBlockSource which includes all incoming queues of this stream.
189  CatReader GetCatReader(bool consume);
190 
191  //! Open a CatReader (function name matches a method in File and MixStream).
192  CatReader GetReader(bool consume);
193 
194 private:
196 };
197 
198 using CatStreamPtr = tlx::CountingPtr<CatStream>;
199 
200 //! \}
201 
202 } // namespace data
203 } // namespace thrill
204 
205 #endif // !THRILL_DATA_CAT_STREAM_HEADER
206 
207 /******************************************************************************/
Writers GetWriters() final
Definition: cat_stream.cpp:89
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
std::vector< BlockQueue > queues_
BlockQueues to store incoming Blocks with no attached destination.
Definition: cat_stream.hpp:131
static constexpr bool debug
Definition: cat_stream.hpp:57
bool is_queue_closed(size_t from)
check if inbound queue is closed
Definition: cat_stream.cpp:231
Base class for common structures for ConcatStream and MixedStream.
Definition: stream_data.hpp:46
A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them...
Definition: cat_stream.hpp:54
BlockQueueReader Reader
Definition: cat_stream.hpp:66
CatReader GetCatReader(bool consume)
Definition: cat_stream.cpp:185
STL namespace.
const char * stream_type() final
return stream type string
Definition: cat_stream.cpp:85
CatBlockSource is a BlockSource which concatenates all Blocks available from a vector of BlockSources...
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
CatReader GetReader(bool consume)
Open a CatReader (function name matches a method in File and MixStream).
Definition: cat_stream.cpp:189
BlockWriter< StreamSink > Writer
Definition: stream_data.hpp:51
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
Definition: multiplexer.hpp:33
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:59
std::vector< Reader > GetReaders()
Definition: cat_stream.cpp:154
void Close() final
shuts the stream down.
Definition: cat_stream.cpp:193
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
CatBlockSource GetCatBlockSource(bool consume)
Gets a CatBlockSource which includes all incoming queues of this stream.
Definition: cat_stream.cpp:169
Ownership handle onto a CatStreamData.
Definition: cat_stream.hpp:155
void OnStreamBlock(size_t from, uint32_t seq, Block &&b)
Definition: cat_stream.cpp:243
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
Base class for StreamSet.
CatStreamData(StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Creates a new stream instance.
Definition: cat_stream.cpp:28
High-performance smart pointer used as a wrapping reference counting pointer.
const StreamId & id() const
Return stream id.
Definition: stream_data.hpp:91
BlockQueue * loopback_queue(size_t from_worker_id)
Returns the loopback queue for the worker of this stream.
Definition: cat_stream.cpp:316
void OnStreamBlockOrdered(size_t from, Block &&b)
Definition: cat_stream.cpp:287
BlockReader< CatBlockSource > CatBlockReader
Definition: cat_stream.hpp:64
static constexpr bool debug_data
Definition: cat_stream.hpp:58
Stream - base class for CatStream and MixStream.
Definition: stream.hpp:36
size_t StreamId
Definition: stream_data.hpp:32
bool closed() const final
Definition: cat_stream.cpp:223
void set_dia_id(size_t dia_id)
Definition: cat_stream.cpp:78
BlockReader< BlockQueueSource > BlockQueueReader
Definition: cat_stream.hpp:61
CatStreamData & operator=(const CatStreamData &)=delete
non-copyable: delete assignment operator
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
Definition: block_queue.hpp:47
A BlockSource to read Block from a BlockQueue using a BlockReader.
std::vector< SeqReordering > seq_
Block Sequence numbers.
Definition: cat_stream.hpp:125