Thrill  0.1
block_writer.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/block_writer.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_DATA_BLOCK_WRITER_HEADER
13 #define THRILL_DATA_BLOCK_WRITER_HEADER
14 
15 #include <thrill/common/config.hpp>
18 #include <thrill/data/block.hpp>
21 #include <tlx/die.hpp>
22 
23 #include <algorithm>
24 #include <deque>
25 #include <string>
26 #include <vector>
27 
28 namespace thrill {
29 namespace data {
30 
31 //! \addtogroup data_layer
32 //! \{
33 
34 /*!
35  * An Exception is thrown by BlockWriter when the underlying sink does not allow
36  * allocation of a new block, which is needed to serialize the item.
37  */
38 class FullException : public std::exception
39 {
40 public:
41  FullException() : std::exception() { }
42 };
43 
44 /*!
45  * BlockWriter contains a temporary Block object into which a) any serializable
46  * item can be stored or b) any arbitrary integral data can be appended. It
47  * counts how many serializable items are stored and the offset of the first new
48  * item. When a Block is full it is emitted to an attached BlockSink, like a
49  * File, a ChannelSink, etc. for further delivery. The BlockWriter takes care of
50  * segmenting items when a Block is full.
51  */
52 template <typename BlockSink>
54  : public common::ItemWriterToolsBase<BlockWriter<BlockSink> >
55 {
56 public:
57  static constexpr bool debug = false;
58 
59  static constexpr bool self_verify = common::g_self_verify;
60 
61  //! Start build (appending blocks) to a File
62  explicit BlockWriter(BlockSink&& sink,
63  size_t max_block_size = default_block_size)
64  : sink_(std::move(sink)),
65  block_size_(std::min(size_t(start_block_size), max_block_size)),
66  max_block_size_(max_block_size) {
67  assert(max_block_size_ > 0);
68  }
69 
70  //! default constructor
71  BlockWriter() = default;
72 
73  //! non-copyable: delete copy-constructor
74  BlockWriter(const BlockWriter&) = delete;
75  //! non-copyable: delete assignment operator
76  BlockWriter& operator = (const BlockWriter&) = delete;
77 
78  //! move-constructor
79  BlockWriter(BlockWriter&& bw) noexcept
80  : bytes_(std::move(bw.bytes_)),
81  current_(std::move(bw.current_)),
82  end_(std::move(bw.end_)),
83  nitems_(std::move(bw.nitems_)),
84  first_offset_(std::move(bw.first_offset_)),
85  sink_(std::move(bw.sink_)),
86  do_queue_(std::move(bw.do_queue_)),
87  sink_queue_(std::move(bw.sink_queue_)),
88  block_size_(std::move(bw.block_size_)),
89  max_block_size_(std::move(bw.max_block_size_)),
90  closed_(std::move(bw.closed_)) {
91  // set closed flag -> disables destructor
92  bw.closed_ = true;
93  }
94 
95  //! move-assignment
96  BlockWriter& operator = (BlockWriter&& bw) noexcept {
97  if (this == &bw) return *this;
98 
99  bytes_ = std::move(bw.bytes_);
100  current_ = std::move(bw.current_);
101  end_ = std::move(bw.end_);
102  nitems_ = std::move(bw.nitems_);
103  first_offset_ = std::move(bw.first_offset_);
104  sink_ = std::move(bw.sink_);
105  do_queue_ = std::move(bw.do_queue_);
106  sink_queue_ = std::move(bw.sink_queue_);
107  block_size_ = std::move(bw.block_size_);
108  max_block_size_ = std::move(bw.max_block_size_);
109  closed_ = std::move(bw.closed_);
110  // set closed flag -> disables destructor
111  bw.closed_ = true;
112  return *this;
113  }
114 
115  //! On destruction, the last partial block is flushed.
117  Close();
118  }
119 
120  //! Explicitly close the writer
121  void Close() {
122  if (closed_) return;
123  closed_ = true;
124  Flush();
125  sink_.Close();
126  }
127 
128  //! Return whether an actual BlockSink is attached.
129  bool IsValid() const { return sink_.IsValid(); }
130 
131  //! Return true if any data is buffered
132  bool HasBufferData() const {
133  return bytes_ && (current_ != bytes_->begin() || nitems_ != 0);
134  }
135 
136  //! Returns sink_
137  BlockSink& sink() { return sink_; }
138 
139  //! Returns block_size_
140  size_t block_size() const { return block_size_; }
141 
142  //! Flush the current block (only really meaningful for a network sink).
143  void Flush() {
144  if (!bytes_) return;
145  // don't flush if the block is truly empty.
146  if (current_ == bytes_->begin() && nitems_ == 0) return;
147 
148  if (do_queue_) {
149  sLOG << "Flush(): queue" << bytes_.get();
150  sink_queue_.emplace_back(
151  std::move(bytes_), 0, current_ - bytes_->begin(),
152  first_offset_, nitems_,
153  static_cast<bool>(/* typecode_verify */ self_verify));
154  }
155  else {
156  sLOG << "Flush(): flush" << bytes_.get();
157  sink_.AppendPinnedBlock(
158  PinnedBlock(std::move(bytes_), 0, current_ - bytes_->begin(),
159  first_offset_, nitems_,
160  /* typecode_verify */ static_cast<bool>(self_verify)),
161  /* is_last_block */ closed_);
162  }
163 
164  // reset
165  nitems_ = 0;
166  bytes_ = PinnedByteBlockPtr();
167  current_ = end_ = nullptr;
168  }
169 
170  //! Directly write Blocks to the underlying BlockSink (after flushing the
171  //! current one if need be).
172  void AppendBlocks(const std::vector<Block>& blocks) {
173  Flush();
174  for (std::vector<Block>::const_iterator bi = blocks.begin();
175  bi != blocks.end(); ++bi) {
176  sink_.AppendBlock(*bi, /* is_last_block */ bi + 1 == blocks.end());
177  }
178  }
179 
180  //! Directly write Blocks to the underlying BlockSink (after flushing the
181  //! current one if need be).
182  void AppendBlocks(const std::deque<Block>& blocks) {
183  Flush();
184  for (std::deque<Block>::const_iterator bi = blocks.begin();
185  bi != blocks.end(); ++bi) {
186  sink_.AppendBlock(*bi, /* is_last_block */ bi + 1 == blocks.end());
187  }
188  }
189 
190  //! \name Appending (Generic) Serializable Items
191  //! \{
192 
193  //! Mark beginning of an item.
196  if (current_ == end_)
197  Flush(), AllocateBlock();
198 
199  if (nitems_ == 0)
200  first_offset_ = current_ - bytes_->begin();
201 
202  ++nitems_;
203 
204  return *this;
205  }
206 
207  //! Put appends a complete item, or fails with a FullException.
208  template <typename T>
210  BlockWriter& Put(const T& x) {
211  assert(!closed_);
212 
214  return PutUnsafe<T>(x);
215  else
216  return PutSafe<T>(x);
217  }
218 
219  //! PutNoSelfVerify appends a complete item without any self
220  //! verification information, or fails with a FullException.
221  template <typename T>
224  assert(!closed_);
225 
227  return PutUnsafe<T, true>(x);
228  else
229  return PutSafe<T, true>(x);
230  }
231 
232  //! appends a complete item, or fails safely with a FullException.
233  template <typename T, bool NoSelfVerify = false>
235  BlockWriter& PutSafe(const T& x) {
236  assert(!closed_);
237 
238  if (TLX_UNLIKELY(current_ == end_)) {
239  // if current block full: flush it, BEFORE enabling queuing, because
240  // the previous item is complete.
241  try {
242  Flush(), AllocateBlock();
243  }
244  catch (FullException&) {
245  // non-fatal allocation error: will be handled below.
246  }
247  }
248 
249  if (TLX_UNLIKELY(!bytes_)) {
250  sLOG << "!bytes";
251  throw FullException();
252  }
253 
254  // store beginning item of this item and other information for unwind.
255  Byte* initial_current = current_;
256  size_t initial_nitems = nitems_;
257  size_t initial_first_offset = first_offset_;
258  do_queue_ = true;
259 
260  try {
261  if (TLX_UNLIKELY(nitems_ == 0))
262  first_offset_ = current_ - bytes_->begin();
263 
264  ++nitems_;
265 
266  if (self_verify && !NoSelfVerify) {
267  // for self-verification, prefix T with its hash code
268  PutRaw(typeid(T).hash_code());
269  }
271 
272  // item fully serialized, push out finished blocks.
273  while (!sink_queue_.empty()) {
274  sink_.AppendPinnedBlock(
275  std::move(sink_queue_.front()), /* is_last_block */ false);
276  sink_queue_.pop_front();
277  }
278 
279  do_queue_ = false;
280 
281  return *this;
282  }
283  catch (FullException&) {
284  // if BlockSink signaled full, then unwind adding of the item.
285 
286  while (!sink_queue_.empty()) {
287  sLOG << "releasing" << bytes_.get();
288  sink_.ReleaseByteBlock(bytes_);
289 
290  PinnedBlock b = sink_queue_.back();
291  sink_queue_.pop_back();
292 
293  bytes_ = std::move(b).StealPinnedByteBlock();
294  }
295 
296  sLOG << "reset" << bytes_.get();
297 
298  current_ = initial_current;
299  end_ = bytes_->end();
300  nitems_ = initial_nitems;
301  first_offset_ = initial_first_offset;
302  do_queue_ = false;
303 
304  throw;
305  }
306  }
307 
308  //! appends a complete item, or aborts with a FullException.
309  template <typename T, bool NoSelfVerify = false>
312  assert(!closed_);
313 
314  try {
315  if (TLX_UNLIKELY(current_ == end_))
316  Flush(), AllocateBlock();
317 
318  if (TLX_UNLIKELY(nitems_ == 0))
319  first_offset_ = current_ - bytes_->begin();
320 
321  ++nitems_;
322 
323  if (self_verify && !NoSelfVerify) {
324  // for self-verification, prefix T with its hash code
325  PutRaw(typeid(T).hash_code());
326  }
328  }
329  catch (FullException&) {
330  throw std::runtime_error(
331  "BlockSink was full even though declared infinite");
332  }
333 
334  return *this;
335  }
336 
337  //! \}
338 
339  //! \name Appending Write Functions
340  //! \{
341 
342  //! Append a memory range to the block
343  BlockWriter& Append(const void* data, size_t size) {
344  assert(!closed_);
345 
346  const Byte* cdata = reinterpret_cast<const Byte*>(data);
347 
348  while (TLX_UNLIKELY(current_ + size > end_)) {
349  // partial copy of beginning of buffer
350  size_t partial_size = end_ - current_;
351  std::copy(cdata, cdata + partial_size, current_);
352 
353  cdata += partial_size;
354  size -= partial_size;
355  current_ += partial_size;
356 
357  Flush(), AllocateBlock();
358  }
359 
360  // copy remaining bytes.
361  std::copy(cdata, cdata + size, current_);
362  current_ += size;
363 
364  return *this;
365  }
366 
367  //! Append a single byte to the block
369  assert(!closed_);
370 
371  if (TLX_UNLIKELY(current_ == end_))
372  Flush(), AllocateBlock();
373 
374  *current_++ = data;
375  return *this;
376  }
377 
378  //! Append to contents of a std::string, excluding the null (which isn't
379  //! contained in the string size anyway).
381  return Append(str.data(), str.size());
382  }
383 
384  //! Put (append) a single item of the template type T to the buffer. Be
385  //! careful with implicit type conversions!
386  template <typename Type>
388  BlockWriter& PutRaw(const Type& item) {
389  static_assert(std::is_pod<Type>::value,
390  "You only want to PutRaw() POD types as raw values.");
391 
392  assert(!closed_);
393 
394  // fast path for writing item into block if it fits.
395  if (TLX_LIKELY(current_ + sizeof(Type) <= end_)) {
396  *reinterpret_cast<Type*>(current_) = item;
397 
398  current_ += sizeof(Type);
399  return *this;
400  }
401 
402  return Append(&item, sizeof(item));
403  }
404 
405  //! \}
406 
407 private:
408  //! Allocate a new block (overwriting the existing one).
409  void AllocateBlock() {
410  bytes_ = sink_.AllocateByteBlock(block_size_);
411  if (!bytes_) {
412  sLOG << "AllocateBlock(): throw due to invalid block";
413  throw FullException();
414  }
415  sLOG << "AllocateBlock(): good, got" << bytes_.get();
416  // increase block size, up to max.
417  if (2 * block_size_ < max_block_size_)
418  block_size_ *= 2;
419 
420  current_ = bytes_->begin();
421  end_ = bytes_->end();
422  nitems_ = 0;
423  first_offset_ = 0;
424  }
425 
426  //! current block, already allocated as shared ptr, since we want to use
427  //! make_shared.
429 
430  //! current write pointer into block.
431  Byte* current_ = nullptr;
432 
433  //! current end of block pointer. this is == bytes_.end(), just one
434  //! indirection less.
435  Byte* end_ = nullptr;
436 
437  //! number of items in current block
438  size_t nitems_ = 0;
439 
440  //! offset of first item
441  size_t first_offset_ = 0;
442 
443  //! file or stream sink to output blocks to.
445 
446  //! boolean whether to queue blocks
447  bool do_queue_ = false;
448 
449  //! queue of blocks to flush when the current item has fully been serialized
450  std::deque<PinnedBlock> sink_queue_;
451 
452  //! size of data blocks to construct
453  size_t block_size_;
454 
455  //! size of data blocks to construct
457 
458  //! Flag if Close was called explicitly
459  bool closed_ = false;
460 };
461 
462 //! \}
463 
464 } // namespace data
465 } // namespace thrill
466 
467 #endif // !THRILL_DATA_BLOCK_WRITER_HEADER
468 
469 /******************************************************************************/
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
void Flush()
Flush the current block (only really meaningful for a network sink).
#define TLX_ATTRIBUTE_ALWAYS_INLINE
double T
BlockWriter & PutByte(Byte data)
Append a single byte to the block.
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
Type
VFS object type.
Definition: file_io.hpp:52
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:205
STL namespace.
size_t block_size_
size of data blocks to construct
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & PutNoSelfVerify(const T &x)
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & PutUnsafe(const T &x)
appends a complete item, or aborts with a FullException.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & PutSafe(const T &x)
appends a complete item, or fails safely with a FullException.
BlockWriter(BlockSink &&sink, size_t max_block_size=default_block_size)
Start build (appending blocks) to a File.
static constexpr bool g_self_verify
Definition: config.hpp:32
#define TLX_LIKELY(c)
Definition: likely.hpp:23
bool IsValid() const
Return whether an actual BlockSink is attached.
BlockSink & sink()
Returns sink_.
size_t start_block_size
starting size of blocks in BlockWriter.
Definition: byte_block.cpp:24
BlockWriter & Append(const void *data, size_t size)
Append a memory range to the block.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & PutRaw(const Type &item)
void AppendBlocks(const std::vector< Block > &blocks)
uint8_t Byte
type of underlying memory area
Definition: byte_block.hpp:37
list x
Definition: gen_data.py:39
int value
Definition: gen_data.py:41
BlockWriter(BlockWriter &&bw) noexcept
move-constructor
PinnedByteBlockPtr bytes_
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
CRTP class to enhance item/memory writer classes with Varint encoding and String encoding.
static constexpr bool debug
size_t max_block_size_
size of data blocks to construct
bool HasBufferData() const
Return true if any data is buffered.
void AppendBlocks(const std::deque< Block > &blocks)
BlockSink sink_
file or stream sink to output blocks to.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
Definition: block_sink.hpp:28
std::deque< PinnedBlock > sink_queue_
queue of blocks to flush when the current item has fully been serialized
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
~BlockWriter()
On destruction, the last partial block is flushed.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & MarkItem()
Mark beginning of an item.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
size_t block_size() const
Returns block_size_.
static constexpr bool allocate_can_fail_
Definition: block_sink.hpp:77
void Close()
Explicitly close the writer.
An Exception is thrown by BlockWriter when the underlying sink does not allow allocation of a new blo...
PinnedByteBlockPtr StealPinnedByteBlock() &&
Definition: block.hpp:291
void AllocateBlock()
Allocate a new block (overwriting the existing one).
BlockWriter & Append(const std::string &str)