Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
mix_stream.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/mix_stream.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_MIX_STREAM_HEADER
14 #define THRILL_DATA_MIX_STREAM_HEADER
15 
17 #include <thrill/data/stream.hpp>
19 
20 #include <string>
21 #include <vector>
22 
23 namespace thrill {
24 namespace data {
25 
26 //! \addtogroup data_layer
27 //! \{
28 
29 /*!
30  * A Stream is a virtual set of connections to all other worker instances,
31  * hence a "Stream" bundles them to a logical communication context. We call an
32  * individual connection from a worker to another worker a "Host".
33  *
34  * To use a Stream, one can get a vector of BlockWriter via OpenWriters() of
35  * outbound Stream. The vector is of size of workers in the system. One can
36  * then write items destined to the corresponding worker. The written items are
37  * buffered into a Block and only sent when the Block is full. To force a send,
38  * use BlockWriter::Flush(). When all items are sent, the BlockWriters **must**
39  * be closed using BlockWriter::Close().
40  *
41  * The MixStream allows reading of items from all workers in an unordered
42  * sequence, without waiting for any of the workers to complete sending items.
43  */
44 class MixStreamData final : public StreamData
45 {
46  static constexpr bool debug = false;
47 
48 public:
50 
51  using Handle = MixStream;
52 
53  //! Creates a new stream instance
54  MixStreamData(Multiplexer& multiplexer, size_t send_size_limit,
55  const StreamId& id, size_t local_worker_id, size_t dia_id);
56 
57  //! non-copyable: delete copy-constructor
58  MixStreamData(const MixStreamData&) = delete;
59  //! non-copyable: delete assignment operator
60  MixStreamData& operator = (const MixStreamData&) = delete;
61  //! move-constructor: default
62  MixStreamData(MixStreamData&&) = default;
63 
64  ~MixStreamData() final;
65 
66  //! change dia_id after construction (needed because it may be unknown at
67  //! construction)
68  void set_dia_id(size_t dia_id);
69 
70  //! Creates BlockWriters for each worker. BlockWriter can only be opened
71  //! once, otherwise the block sequence is incorrectly interleaved!
72  Writers GetWriters() final;
73 
74  //! Creates a BlockReader which mixes items from all workers.
75  MixReader GetMixReader(bool consume);
76 
77  //! Open a MixReader (function name matches a method in File and CatStream).
78  MixReader GetReader(bool consume);
79 
80  //! shuts the stream down.
81  void Close() final;
82 
83  //! Indicates if the stream is closed - meaning all remaining outbound
84  //! queues have been closed.
85  bool closed() const final;
86 
87 private:
88  //! flag if Close() was completed
89  bool is_closed_ = false;
90 
91  struct SeqReordering;
92 
93  //! Block Sequence numbers
94  std::vector<SeqReordering> seq_;
95 
96  //! BlockQueue to store incoming Blocks with source.
98 
99  //! for calling methods to deliver blocks
100  friend class Multiplexer;
101  friend class StreamSink;
102 
103  //! called from Multiplexer when there is a new Block for this Stream.
104  void OnStreamBlock(size_t from, uint32_t seq, PinnedBlock&& b);
105 
106  //! called to process PinnedBlock in sequence
107  void OnStreamBlockOrdered(size_t from, PinnedBlock&& b);
108 };
109 
110 // we have two types of MixStream smart pointers: one for internal use in the
111 // Multiplexer (ordinary CountingPtr), and another for public handles in the
112 // DIANodes. Once all public handles are deleted, the MixStream is deactivated.
113 using MixStreamDataPtr = tlx::CountingPtr<MixStreamData>;
114 
115 using MixStreamSet = StreamSet<MixStreamData>;
116 using MixStreamSetPtr = tlx::CountingPtr<MixStreamSet>;
117 
118 //! Ownership handle onto a MixStream
119 class MixStream final : public Stream
120 {
121 public:
123 
125 
126  explicit MixStream(const MixStreamDataPtr& ptr);
127 
128  //! When the user handle is destroyed, close the stream (but maybe not
129  //! destroy the data object)
130  ~MixStream();
131 
132  //! Return stream id
133  const StreamId& id() const final;
134 
135  //! Return stream data reference
136  StreamData& data() final;
137 
138  //! Return stream data reference
139  const StreamData& data() const final;
140 
141  //! Creates BlockWriters for each worker. BlockWriter can only be opened
142  //! once, otherwise the block sequence is incorrectly interleaved!
144 
145  //! Creates a BlockReader which concatenates items from all workers in an
146  //! arbitrary order.
147  MixReader GetMixReader(bool consume);
148 
149  //! Open a MixReader (function name matches a method in File and CatStream).
150  MixReader GetReader(bool consume);
151 
152 private:
154 };
155 
156 using MixStreamPtr = tlx::CountingPtr<MixStream>;
157 
158 //! \}
159 
160 } // namespace data
161 } // namespace thrill
162 
163 #endif // !THRILL_DATA_MIX_STREAM_HEADER
164 
165 /******************************************************************************/
StreamSink is an BlockSink that sends data via a network socket to the StreamData object on a differe...
Definition: stream_sink.hpp:38
Base class for common structures for ConcatStream and MixedStream.
Definition: stream_data.hpp:45
MixStreamData & operator=(const MixStreamData &)=delete
non-copyable: delete assignment operator
void Close() final
shuts the stream down.
Definition: mix_stream.cpp:118
MixBlockQueue queue_
BlockQueue to store incoming Blocks with source.
Definition: mix_stream.hpp:97
static constexpr bool debug
Definition: mix_stream.hpp:46
void OnStreamBlockOrdered(size_t from, PinnedBlock &&b)
called to process PinnedBlock in sequence
Definition: mix_stream.cpp:201
Ownership handle onto a MixStream.
Definition: mix_stream.hpp:119
bool is_closed_
flag if Close() was completed
Definition: mix_stream.hpp:89
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
BlockWriter< StreamSink > Writer
Definition: stream_data.hpp:48
MixBlockQueueReader MixReader
Definition: mix_stream.hpp:49
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
Definition: multiplexer.hpp:33
MixReader GetReader(bool consume)
Open a MixReader (function name matches a method in File and CatStream).
Definition: mix_stream.cpp:114
A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them...
Definition: mix_stream.hpp:44
Implements reading an unordered sequence of items from multiple workers, which sends Blocks...
Writers GetWriters() final
Definition: mix_stream.cpp:46
MixStreamData(Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Creates a new stream instance.
Definition: mix_stream.cpp:27
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
Reader to retrieve items in unordered sequence from a MixBlockQueue.
std::vector< SeqReordering > seq_
Block Sequence numbers.
Definition: mix_stream.hpp:91
bool closed() const final
Definition: mix_stream.cpp:145
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
Stream - base class for CatStream and MixStream.
Definition: stream.hpp:36
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
size_t StreamId
Definition: stream_data.hpp:32
void OnStreamBlock(size_t from, uint32_t seq, PinnedBlock &&b)
called from Multiplexer when there is a new Block for this Stream.
Definition: mix_stream.cpp:160
const StreamId & id() const
Return stream id.
Definition: stream_data.hpp:87
MixReader GetMixReader(bool consume)
Creates a BlockReader which mixes items from all workers.
Definition: mix_stream.cpp:109
void set_dia_id(size_t dia_id)
Definition: mix_stream.cpp:41