Thrill  0.1
mix_stream.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/mix_stream.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_MIX_STREAM_HEADER
14 #define THRILL_DATA_MIX_STREAM_HEADER
15 
17 #include <thrill/data/stream.hpp>
19 
20 #include <string>
21 #include <vector>
22 
23 namespace thrill {
24 namespace data {
25 
26 //! \addtogroup data_layer
27 //! \{
28 
29 /*!
30  * A Stream is a virtual set of connections to all other worker instances,
31  * hence a "Stream" bundles them to a logical communication context. We call an
32  * individual connection from a worker to another worker a "Host".
33  *
34  * To use a Stream, one can get a vector of BlockWriter via OpenWriters() of
35  * outbound Stream. The vector is of size of workers in the system. One can
36  * then write items destined to the corresponding worker. The written items are
37  * buffered into a Block and only sent when the Block is full. To force a send,
38  * use BlockWriter::Flush(). When all items are sent, the BlockWriters **must**
39  * be closed using BlockWriter::Close().
40  *
41  * The MixStream allows reading of items from all workers in an unordered
42  * sequence, without waiting for any of the workers to complete sending items.
43  */
44 class MixStreamData final : public StreamData
45 {
46  static constexpr bool debug = false;
47 
48 public:
50 
51  using Handle = MixStream;
52 
53  //! Creates a new stream instance
54  MixStreamData(StreamSetBase* stream_set_base,
55  Multiplexer& multiplexer, size_t send_size_limit,
56  const StreamId& id, size_t local_worker_id, size_t dia_id);
57 
58  //! non-copyable: delete copy-constructor
59  MixStreamData(const MixStreamData&) = delete;
60  //! non-copyable: delete assignment operator
61  MixStreamData& operator = (const MixStreamData&) = delete;
62  //! move-constructor: default
63  MixStreamData(MixStreamData&&) = default;
64 
65  ~MixStreamData() final;
66 
67  //! return stream type string
68  const char * stream_type() final;
69 
70  //! change dia_id after construction (needed because it may be unknown at
71  //! construction)
72  void set_dia_id(size_t dia_id);
73 
74  //! Creates BlockWriters for each worker. BlockWriter can only be opened
75  //! once, otherwise the block sequence is incorrectly interleaved!
76  Writers GetWriters() final;
77 
78  //! Creates a BlockReader which mixes items from all workers.
79  MixReader GetMixReader(bool consume);
80 
81  //! Open a MixReader (function name matches a method in File and CatStream).
82  MixReader GetReader(bool consume);
83 
84  //! shuts the stream down.
85  void Close() final;
86 
87  //! Indicates if the stream is closed - meaning all remaining outbound
88  //! queues have been closed.
89  bool closed() const final;
90 
91  //! check if inbound queue is closed
92  bool is_queue_closed(size_t from);
93 
94 private:
95  //! flag if Close() was completed
96  bool is_closed_ = false;
97 
98  struct SeqReordering;
99 
100  //! Block Sequence numbers
101  std::vector<SeqReordering> seq_;
102 
103  //! BlockQueue to store incoming Blocks with source.
105 
106  //! for calling methods to deliver blocks
107  friend class Multiplexer;
108  friend class StreamSink;
109 
110  //! called from Multiplexer when there is a new Block for this Stream.
111  void OnStreamBlock(size_t from, uint32_t seq, Block&& b);
112 
113  //! called to process PinnedBlock in sequence
114  void OnStreamBlockOrdered(size_t from, Block&& b);
115 };
116 
117 // we have two types of MixStream smart pointers: one for internal use in the
118 // Multiplexer (ordinary CountingPtr), and another for public handles in the
119 // DIANodes. Once all public handles are deleted, the MixStream is deactivated.
120 using MixStreamDataPtr = tlx::CountingPtr<MixStreamData>;
121 
122 using MixStreamSet = StreamSet<MixStreamData>;
123 using MixStreamSetPtr = tlx::CountingPtr<MixStreamSet>;
124 
125 //! Ownership handle onto a MixStream
126 class MixStream final : public Stream
127 {
128 public:
130 
132 
133  explicit MixStream(const MixStreamDataPtr& ptr);
134 
135  //! When the user handle is destroyed, close the stream (but maybe not
136  //! destroy the data object)
137  ~MixStream();
138 
139  //! Return stream id
140  const StreamId& id() const final;
141 
142  //! Return stream data reference
143  StreamData& data() final;
144 
145  //! Return stream data reference
146  const StreamData& data() const final;
147 
148  //! Creates BlockWriters for each worker. BlockWriter can only be opened
149  //! once, otherwise the block sequence is incorrectly interleaved!
151 
152  //! Creates a BlockReader which concatenates items from all workers in an
153  //! arbitrary order.
154  MixReader GetMixReader(bool consume);
155 
156  //! Open a MixReader (function name matches a method in File and CatStream).
157  MixReader GetReader(bool consume);
158 
159 private:
161 };
162 
163 using MixStreamPtr = tlx::CountingPtr<MixStream>;
164 
165 //! \}
166 
167 } // namespace data
168 } // namespace thrill
169 
170 #endif // !THRILL_DATA_MIX_STREAM_HEADER
171 
172 /******************************************************************************/
StreamSink is an BlockSink that sends data via a network socket to the StreamData object on a differe...
Definition: stream_sink.hpp:38
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
Base class for common structures for ConcatStream and MixedStream.
Definition: stream_data.hpp:46
MixStreamData & operator=(const MixStreamData &)=delete
non-copyable: delete assignment operator
void Close() final
shuts the stream down.
Definition: mix_stream.cpp:124
MixBlockQueue queue_
BlockQueue to store incoming Blocks with source.
Definition: mix_stream.hpp:104
static constexpr bool debug
Definition: mix_stream.hpp:46
STL namespace.
Ownership handle onto a MixStream.
Definition: mix_stream.hpp:126
bool is_closed_
flag if Close() was completed
Definition: mix_stream.hpp:96
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
BlockWriter< StreamSink > Writer
Definition: stream_data.hpp:51
MixBlockQueueReader MixReader
Definition: mix_stream.hpp:49
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
Definition: multiplexer.hpp:33
MixReader GetReader(bool consume)
Open a MixReader (function name matches a method in File and CatStream).
Definition: mix_stream.cpp:120
A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them...
Definition: mix_stream.hpp:44
Implements reading an unordered sequence of items from multiple workers, which sends Blocks...
Writers GetWriters() final
Definition: mix_stream.cpp:52
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:59
Reader to retrieve items in unordered sequence from a MixBlockQueue.
std::vector< SeqReordering > seq_
Block Sequence numbers.
Definition: mix_stream.hpp:98
bool is_queue_closed(size_t from)
check if inbound queue is closed
Definition: mix_stream.cpp:156
bool closed() const final
Definition: mix_stream.cpp:149
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
Base class for StreamSet.
MixStreamData(StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Creates a new stream instance.
Definition: mix_stream.cpp:27
const StreamId & id() const
Return stream id.
Definition: stream_data.hpp:91
Stream - base class for CatStream and MixStream.
Definition: stream.hpp:36
size_t StreamId
Definition: stream_data.hpp:32
void OnStreamBlockOrdered(size_t from, Block &&b)
called to process PinnedBlock in sequence
Definition: mix_stream.cpp:206
MixReader GetMixReader(bool consume)
Creates a BlockReader which mixes items from all workers.
Definition: mix_stream.cpp:115
void OnStreamBlock(size_t from, uint32_t seq, Block &&b)
called from Multiplexer when there is a new Block for this Stream.
Definition: mix_stream.cpp:168
const char * stream_type() final
return stream type string
Definition: mix_stream.cpp:48
void set_dia_id(size_t dia_id)
Definition: mix_stream.cpp:43