Thrill  0.1
block_sink.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/block_sink.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_DATA_BLOCK_SINK_HEADER
13 #define THRILL_DATA_BLOCK_SINK_HEADER
14 
15 #include <thrill/data/block.hpp>
17 
18 namespace thrill {
19 namespace data {
20 
21 //! \addtogroup data_layer
22 //! \{
23 
24 /*!
25  * Pure virtual base class for all things that can receive Blocks from a
26  * BlockWriter.
27  */
28 class BlockSink
29 {
30 public:
31  //! constructor with reference to BlockPool
33  : block_pool_(&block_pool), local_worker_id_(local_worker_id)
34  { }
35 
36  //! constructor with reference to BlockPool
38  : block_pool_(block_pool), local_worker_id_(local_worker_id)
39  { }
40 
41  //! default copy-constructor
42  BlockSink(const BlockSink&) = default;
43  //! default assignment operator
44  BlockSink& operator = (const BlockSink&) = default;
45  //! move-constructor: default
46  BlockSink(BlockSink&&) = default;
47  //! move-assignment operator: default
48  BlockSink& operator = (BlockSink&&) = default;
49 
50  //! required virtual destructor
51  virtual ~BlockSink() { }
52 
53  //! Allocate a ByteBlock with n bytes backing memory. If returned
54  //! ByteBlockPtr is a nullptr, then memory of this BlockSink is exhausted.
55  virtual PinnedByteBlockPtr
56  AllocateByteBlock(size_t block_size) {
57  return block_pool_->AllocateByteBlock(block_size, local_worker_id_);
58  }
59 
60  //! Release an unused ByteBlock with n bytes backing memory.
61  virtual void ReleaseByteBlock(ByteBlockPtr& block) {
62  block.reset();
63  }
64 
65  //! Returns BlockPool.logger_
67 
68  //! Returns block_pool_
69  BlockPool * block_pool() const { return block_pool_; }
70 
71  //! return number of workers per host
72  size_t workers_per_host() const { return block_pool_->workers_per_host(); }
73 
74  //! boolean flag whether to check if AllocateByteBlock can fail in any
75  //! subclass (if false: accelerate BlockWriter to not be able to cope with
76  //! nullptr).
77  static constexpr bool allocate_can_fail_ = false;
78 
79  //! Closes the sink. Must not be called multiple times
80  virtual void Close() = 0;
81 
82  //! Appends the (unpinned) Block
83  virtual void AppendBlock(const Block& b, bool is_last_block) = 0;
84 
85  //! Appends the (unpinned) Block
86  virtual void AppendBlock(Block&& b, bool is_last_block) = 0;
87 
88  //! Appends the PinnedBlock
89  virtual void AppendPinnedBlock(PinnedBlock&& b, bool is_last_block) {
90  return AppendBlock(std::move(b).MoveToBlock(), is_last_block);
91  }
92 
93  //! local worker id to associate pinned block with
94  size_t local_worker_id() const { return local_worker_id_; }
95 
96 private:
97  //! reference to BlockPool for allocation and deallocation. (ptr to make
98  //! BlockSink movable).
100 
101 protected:
102  //! local worker id to associate pinned block with
104 };
105 
106 /*!
107  * Derivative BlockSink which counts and limits how many bytes it has delivered
108  * as ByteBlocks for writing.
109  */
110 class BoundedBlockSink : public virtual BlockSink
111 {
112 public:
113  //! constructor with reference to BlockPool
115  : BlockSink(block_pool, local_worker_id),
116  max_size_(max_size), available_(max_size)
117  { }
118 
119  PinnedByteBlockPtr AllocateByteBlock(size_t block_size) final {
120  if (available_ < block_size) return PinnedByteBlockPtr();
121  available_ -= block_size;
122  return BlockSink::AllocateByteBlock(block_size);
123  }
124 
125  void ReleaseByteBlock(ByteBlockPtr& block) final {
126  if (block)
127  available_ += block->size();
128  block.reset();
129  }
130 
131  size_t max_size() const { return max_size_; }
132 
133  static constexpr bool allocate_can_fail_ = true;
134 
135 private:
136  //! maximum allocation of ByteBlock for this BlockSink
137  size_t max_size_;
138 
139  //! currently allocated ByteBlock for this BlockSink.
140  size_t available_;
141 };
142 
143 //! \}
144 
145 } // namespace data
146 } // namespace thrill
147 
148 #endif // !THRILL_DATA_BLOCK_SINK_HEADER
149 
150 /******************************************************************************/
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
PinnedByteBlockPtr AllocateByteBlock(size_t block_size) final
Definition: block_sink.hpp:119
virtual void AppendPinnedBlock(PinnedBlock &&b, bool is_last_block)
Appends the PinnedBlock.
Definition: block_sink.hpp:89
virtual void ReleaseByteBlock(ByteBlockPtr &block)
Release an unused ByteBlock with n bytes backing memory.
Definition: block_sink.hpp:61
virtual ~BlockSink()
required virtual destructor
Definition: block_sink.hpp:51
Derivative BlockSink which counts and limits how many bytes it has delivered as ByteBlocks for writin...
Definition: block_sink.hpp:110
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:205
void reset()
release contained pointer, frees object if this is the last reference.
A non-pinned counting pointer to a ByteBlock.
Definition: byte_block.hpp:176
virtual void AppendBlock(const Block &b, bool is_last_block)=0
Appends the (unpinned) Block.
BlockSink(BlockPool *block_pool, size_t local_worker_id)
constructor with reference to BlockPool
Definition: block_sink.hpp:37
PinnedByteBlockPtr AllocateByteBlock(size_t size, size_t local_worker_id)
Definition: block_pool.cpp:484
size_t local_worker_id_
local worker id to associate pinned block with
Definition: block_sink.hpp:103
size_t available_
currently allocated ByteBlock for this BlockSink.
Definition: block_sink.hpp:140
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
size_t workers_per_host() const
return number of workers per host
Definition: block_pool.hpp:82
size_t max_size_
maximum allocation of ByteBlock for this BlockSink
Definition: block_sink.hpp:137
BlockSink & operator=(const BlockSink &)=default
default assignment operator
size_t local_worker_id() const
local worker id to associate pinned block with
Definition: block_sink.hpp:94
BlockPool * block_pool() const
Returns block_pool_.
Definition: block_sink.hpp:69
BoundedBlockSink(BlockPool &block_pool, size_t local_worker_id, size_t max_size)
constructor with reference to BlockPool
Definition: block_sink.hpp:114
common::JsonLogger & logger()
Returns logger_.
Definition: block_pool.hpp:85
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
Definition: block_sink.hpp:28
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
static constexpr bool allocate_can_fail_
Definition: block_sink.hpp:77
JsonLogger is a receiver of JSON output objects for logging.
Definition: json_logger.hpp:69
virtual void Close()=0
Closes the sink. Must not be called multiple times.
void ReleaseByteBlock(ByteBlockPtr &block) final
Release an unused ByteBlock with n bytes backing memory.
Definition: block_sink.hpp:125
size_t workers_per_host() const
return number of workers per host
Definition: block_sink.hpp:72
virtual PinnedByteBlockPtr AllocateByteBlock(size_t block_size)
Definition: block_sink.hpp:56
common::JsonLogger & logger()
Returns BlockPool.logger_.
Definition: block_sink.hpp:66
BlockSink(BlockPool &block_pool, size_t local_worker_id)
constructor with reference to BlockPool
Definition: block_sink.hpp:32