Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
stream_sink.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/stream_sink.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_DATA_STREAM_SINK_HEADER
13 #define THRILL_DATA_STREAM_SINK_HEADER
14 
15 #include <thrill/common/logger.hpp>
18 #include <thrill/data/block.hpp>
21 #include <thrill/net/buffer.hpp>
23 
24 namespace thrill {
25 namespace data {
26 
27 //! \addtogroup data_layer
28 //! \{
29 
30 // forward declarations
31 class StreamData;
33 
34 /*!
35  * StreamSink is an BlockSink that sends data via a network socket to the
36  * StreamData object on a different worker.
37  */
38 class StreamSink final : public BlockSink
39 {
40  static constexpr bool debug = false;
41 
42 public:
43  using StreamId = size_t;
44 
45  //! Construct invalid StreamSink, needed for placeholders in sinks arrays
46  //! where Blocks are directly sent to local workers.
47  StreamSink();
48 
49  //! StreamSink sending out to network.
51  net::Connection* connection,
52  MagicByte magic, StreamId stream_id,
53  size_t host_rank, size_t host_local_worker,
54  size_t peer_rank, size_t peer_local_worker);
55 
56  //! StreamSink sending out local BlockQueue.
58  BlockQueue* block_queue,
59  StreamId stream_id,
60  size_t host_rank, size_t host_local_worker,
61  size_t peer_rank, size_t peer_local_worker);
62 
63  //! StreamSink sending out local MixBlockQueue.
65  MixStreamDataPtr target,
66  StreamId stream_id,
67  size_t host_rank, size_t host_local_worker,
68  size_t peer_rank, size_t peer_local_worker);
69 
70  StreamSink(StreamSink&&) = default;
71  StreamSink& operator = (StreamSink&&) = default;
72 
73  //! Appends data to the StreamSink. Data may be sent but may be delayed.
74  void AppendBlock(const Block& block, bool is_last_block) final;
75 
76  //! Appends data to the StreamSink. Data may be sent but may be delayed.
77  void AppendBlock(Block&& block, bool is_last_block) final;
78 
79  //! Appends data to the StreamSink. Data may be sent but may be delayed.
80  void AppendPinnedBlock(PinnedBlock&& block, bool is_last_block) final;
81 
82  //! Closes the connection
83  void Close() final;
84 
85  //! Finalize structure after sending the piggybacked or explicit close
86  void Finalize();
87 
88  //! return close flag
89  bool closed() const { return closed_; }
90 
91  //! is valid?
92  bool IsValid() const { return stream_ != nullptr; }
93 
94  //! boolean flag whether to check if AllocateByteBlock can fail in any
95  //! subclass (if false: accelerate BlockWriter to not be able to cope with
96  //! nullptr).
97  static constexpr bool allocate_can_fail_ = false;
98 
99  //! return local worker rank
100  size_t my_worker_rank() const;
101 
102  //! return remote worker rank
103  size_t peer_worker_rank() const;
104 
105 private:
107 
108  //! \name StreamSink To Network
109  //! \{
110 
113 
114  //! \}
115 
116  //! \name StreamSink To BlockQueue (CatStream Loopback)
117  //! \{
118 
120 
121  //! \}
122 
123  //! \name StreamSink To MixBlockQueue (MixStream Loopback)
124  //! \{
125 
126  //! destination mix stream
128 
129  //! \}
130 
131  StreamId id_ = size_t(-1);
132  size_t host_rank_ = size_t(-1);
134  size_t peer_rank_ = size_t(-1);
135  size_t peer_local_worker_ = size_t(-1);
136  bool closed_ = false;
137 
138  size_t item_counter_ = 0;
139  size_t byte_counter_ = 0;
140  size_t block_counter_ = 0;
142 };
143 
144 //! \}
145 
146 } // namespace data
147 } // namespace thrill
148 
149 #endif // !THRILL_DATA_STREAM_SINK_HEADER
150 
151 /******************************************************************************/
StreamSink is an BlockSink that sends data via a network socket to the StreamData object on a differe...
Definition: stream_sink.hpp:38
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
BlockPool * block_pool() const
Returns block_pool_.
Definition: block_sink.hpp:69
void AppendBlock(const Block &block, bool is_last_block) final
Appends data to the StreamSink. Data may be sent but may be delayed.
Definition: stream_sink.cpp:98
tlx::CountingPtr< StreamData > StreamDataPtr
void Close() final
Closes the connection.
size_t local_worker_id_
local worker id to associate pinned block with
Definition: block_sink.hpp:103
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
size_t my_worker_rank() const
return local worker rank
Definition: stream_sink.cpp:90
void AppendPinnedBlock(PinnedBlock &&block, bool is_last_block) final
Appends data to the StreamSink. Data may be sent but may be delayed.
static constexpr bool debug
Definition: stream_sink.hpp:40
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
Definition: block_sink.hpp:28
size_t peer_worker_rank() const
return remote worker rank
Definition: stream_sink.cpp:94
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
MixStreamDataPtr target_mix_stream_
destination mix stream
bool closed() const
return close flag
Definition: stream_sink.hpp:89
void Finalize()
Finalize structure after sending the piggybacked or explicit close.
net::Connection * connection_
static constexpr bool allocate_can_fail_
Definition: stream_sink.hpp:97
bool IsValid() const
is valid?
Definition: stream_sink.hpp:92
StreamSink & operator=(StreamSink &&)=default
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
Definition: block_queue.hpp:47
common::StatsTimerStart timespan_