Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
block_writer.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/block_writer.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_DATA_BLOCK_WRITER_HEADER
13 #define THRILL_DATA_BLOCK_WRITER_HEADER
14 
15 #include <thrill/common/config.hpp>
18 #include <thrill/data/block.hpp>
21 #include <tlx/die.hpp>
22 
23 #include <algorithm>
24 #include <deque>
25 #include <string>
26 #include <vector>
27 
28 namespace thrill {
29 namespace data {
30 
31 //! \addtogroup data_layer
32 //! \{
33 
34 /*!
35  * An Exception is thrown by BlockWriter when the underlying sink does not allow
36  * allocation of a new block, which is needed to serialize the item.
37  */
38 class FullException : public std::exception
39 {
40 public:
41  FullException() : std::exception() { }
42 };
43 
44 /*!
45  * BlockWriter contains a temporary Block object into which a) any serializable
46  * item can be stored or b) any arbitrary integral data can be appended. It
47  * counts how many serializable items are stored and the offset of the first new
48  * item. When a Block is full it is emitted to an attached BlockSink, like a
49  * File, a ChannelSink, etc. for further delivery. The BlockWriter takes care of
50  * segmenting items when a Block is full.
51  */
52 template <typename BlockSink>
54  : public common::ItemWriterToolsBase<BlockWriter<BlockSink> >
55 {
56 public:
57  static constexpr bool debug = false;
58 
59  static constexpr bool self_verify = common::g_self_verify;
60 
61  //! Start build (appending blocks) to a File
62  explicit BlockWriter(BlockSink&& sink,
63  size_t max_block_size = default_block_size)
64  : sink_(std::move(sink)),
65  block_size_(std::min(size_t(start_block_size), max_block_size)),
66  max_block_size_(max_block_size) {
67  assert(max_block_size_ > 0);
68  }
69 
70  //! default constructor
71  BlockWriter() = default;
72 
73  //! non-copyable: delete copy-constructor
74  BlockWriter(const BlockWriter&) = delete;
75  //! non-copyable: delete assignment operator
76  BlockWriter& operator = (const BlockWriter&) = delete;
77 
78  //! move-constructor
79  BlockWriter(BlockWriter&& bw) noexcept
80  : bytes_(std::move(bw.bytes_)),
81  current_(std::move(bw.current_)),
82  end_(std::move(bw.end_)),
83  nitems_(std::move(bw.nitems_)),
84  first_offset_(std::move(bw.first_offset_)),
85  sink_(std::move(bw.sink_)),
86  do_queue_(std::move(bw.do_queue_)),
87  sink_queue_(std::move(bw.sink_queue_)),
88  block_size_(std::move(bw.block_size_)),
89  max_block_size_(std::move(bw.max_block_size_)),
90  closed_(std::move(bw.closed_)) {
91  // set closed flag -> disables destructor
92  bw.closed_ = true;
93  }
94 
95  //! move-assignment
97  if (this == &bw) return *this;
98 
99  bytes_ = std::move(bw.bytes_);
100  current_ = std::move(bw.current_);
101  end_ = std::move(bw.end_);
102  nitems_ = std::move(bw.nitems_);
103  first_offset_ = std::move(bw.first_offset_);
104  sink_ = std::move(bw.sink_);
105  do_queue_ = std::move(bw.do_queue_);
106  sink_queue_ = std::move(bw.sink_queue_);
107  block_size_ = std::move(bw.block_size_);
108  max_block_size_ = std::move(bw.max_block_size_);
109  closed_ = std::move(bw.closed_);
110  // set closed flag -> disables destructor
111  bw.closed_ = true;
112  return *this;
113  }
114 
115  //! On destruction, the last partial block is flushed.
117  Close();
118  }
119 
120  //! Explicitly close the writer
121  void Close() {
122  if (closed_) return;
123  closed_ = true;
124  Flush();
125  sink_.Close();
126  }
127 
128  //! Return whether an actual BlockSink is attached.
129  bool IsValid() const { return sink_.IsValid(); }
130 
131  //! Returns block_size_
132  size_t block_size() const { return block_size_; }
133 
134  //! Flush the current block (only really meaningful for a network sink).
135  void Flush() {
136  if (!bytes_) return;
137  // don't flush if the block is truly empty.
138  if (current_ == bytes_->begin() && nitems_ == 0) return;
139 
140  if (do_queue_) {
141  sLOG << "Flush(): queue" << bytes_.get();
142  sink_queue_.emplace_back(
143  std::move(bytes_), 0, current_ - bytes_->begin(),
145  static_cast<bool>(/* typecode_verify */ self_verify));
146  }
147  else {
148  sLOG << "Flush(): flush" << bytes_.get();
150  PinnedBlock(std::move(bytes_), 0, current_ - bytes_->begin(),
152  /* typecode_verify */ static_cast<bool>(self_verify)),
153  /* is_last_block */ closed_);
154  }
155 
156  // reset
157  nitems_ = 0;
159  current_ = end_ = nullptr;
160  }
161 
162  //! Directly write Blocks to the underlying BlockSink (after flushing the
163  //! current one if need be).
164  void AppendBlocks(const std::vector<Block>& blocks) {
165  Flush();
166  for (std::vector<Block>::const_iterator bi = blocks.begin();
167  bi != blocks.end(); ++bi) {
168  sink_.AppendBlock(*bi, /* is_last_block */ bi + 1 == blocks.end());
169  }
170  }
171 
172  //! Directly write Blocks to the underlying BlockSink (after flushing the
173  //! current one if need be).
174  void AppendBlocks(const std::deque<Block>& blocks) {
175  Flush();
176  for (std::deque<Block>::const_iterator bi = blocks.begin();
177  bi != blocks.end(); ++bi) {
178  sink_.AppendBlock(*bi, /* is_last_block */ bi + 1 == blocks.end());
179  }
180  }
181 
182  //! \name Appending (Generic) Serializable Items
183  //! \{
184 
185  //! Mark beginning of an item.
188  if (current_ == end_)
189  Flush(), AllocateBlock();
190 
191  if (nitems_ == 0)
192  first_offset_ = current_ - bytes_->begin();
193 
194  ++nitems_;
195 
196  return *this;
197  }
198 
199  //! Put appends a complete item, or fails with a FullException.
200  template <typename T>
202  BlockWriter& Put(const T& x) {
203  assert(!closed_);
204 
206  return PutUnsafe<T>(x);
207  else
208  return PutSafe<T>(x);
209  }
210 
211  //! PutNoSelfVerify appends a complete item without any self
212  //! verification information, or fails with a FullException.
213  template <typename T>
216  assert(!closed_);
217 
219  return PutUnsafe<T, true>(x);
220  else
221  return PutSafe<T, true>(x);
222  }
223 
224  //! appends a complete item, or fails safely with a FullException.
225  template <typename T, bool NoSelfVerify = false>
227  BlockWriter& PutSafe(const T& x) {
228  assert(!closed_);
229 
230  if (TLX_UNLIKELY(current_ == end_)) {
231  // if current block full: flush it, BEFORE enabling queuing, because
232  // the previous item is complete.
233  try {
234  Flush(), AllocateBlock();
235  }
236  catch (FullException&) {
237  // non-fatal allocation error: will be handled below.
238  }
239  }
240 
241  if (TLX_UNLIKELY(!bytes_)) {
242  sLOG << "!bytes";
243  throw FullException();
244  }
245 
246  // store beginning item of this item and other information for unwind.
247  Byte* initial_current = current_;
248  size_t initial_nitems = nitems_;
249  size_t initial_first_offset = first_offset_;
250  do_queue_ = true;
251 
252  try {
253  if (TLX_UNLIKELY(nitems_ == 0))
254  first_offset_ = current_ - bytes_->begin();
255 
256  ++nitems_;
257 
258  if (self_verify && !NoSelfVerify) {
259  // for self-verification, prefix T with its hash code
260  PutRaw(typeid(T).hash_code());
261  }
263 
264  // item fully serialized, push out finished blocks.
265  while (!sink_queue_.empty()) {
267  std::move(sink_queue_.front()), /* is_last_block */ false);
268  sink_queue_.pop_front();
269  }
270 
271  do_queue_ = false;
272 
273  return *this;
274  }
275  catch (FullException&) {
276  // if BlockSink signaled full, then unwind adding of the item.
277 
278  while (!sink_queue_.empty()) {
279  sLOG << "releasing" << bytes_.get();
281 
282  PinnedBlock b = sink_queue_.back();
283  sink_queue_.pop_back();
284 
285  bytes_ = std::move(b).StealPinnedByteBlock();
286  }
287 
288  sLOG << "reset" << bytes_.get();
289 
290  current_ = initial_current;
291  end_ = bytes_->end();
292  nitems_ = initial_nitems;
293  first_offset_ = initial_first_offset;
294  do_queue_ = false;
295 
296  throw;
297  }
298  }
299 
300  //! appends a complete item, or aborts with a FullException.
301  template <typename T, bool NoSelfVerify = false>
304  assert(!closed_);
305 
306  try {
307  if (TLX_UNLIKELY(current_ == end_))
308  Flush(), AllocateBlock();
309 
310  if (TLX_UNLIKELY(nitems_ == 0))
311  first_offset_ = current_ - bytes_->begin();
312 
313  ++nitems_;
314 
315  if (self_verify && !NoSelfVerify) {
316  // for self-verification, prefix T with its hash code
317  PutRaw(typeid(T).hash_code());
318  }
320  }
321  catch (FullException&) {
322  throw std::runtime_error(
323  "BlockSink was full even though declared infinite");
324  }
325 
326  return *this;
327  }
328 
329  //! \}
330 
331  //! \name Appending Write Functions
332  //! \{
333 
334  //! Append a memory range to the block
335  BlockWriter& Append(const void* data, size_t size) {
336  assert(!closed_);
337 
338  const Byte* cdata = reinterpret_cast<const Byte*>(data);
339 
340  while (TLX_UNLIKELY(current_ + size > end_)) {
341  // partial copy of beginning of buffer
342  size_t partial_size = end_ - current_;
343  std::copy(cdata, cdata + partial_size, current_);
344 
345  cdata += partial_size;
346  size -= partial_size;
347  current_ += partial_size;
348 
349  Flush(), AllocateBlock();
350  }
351 
352  // copy remaining bytes.
353  std::copy(cdata, cdata + size, current_);
354  current_ += size;
355 
356  return *this;
357  }
358 
359  //! Append a single byte to the block
361  assert(!closed_);
362 
363  if (TLX_UNLIKELY(current_ == end_))
364  Flush(), AllocateBlock();
365 
366  *current_++ = data;
367  return *this;
368  }
369 
370  //! Append to contents of a std::string, excluding the null (which isn't
371  //! contained in the string size anyway).
373  return Append(str.data(), str.size());
374  }
375 
376  //! Put (append) a single item of the template type T to the buffer. Be
377  //! careful with implicit type conversions!
378  template <typename Type>
380  BlockWriter& PutRaw(const Type& item) {
381  static_assert(std::is_pod<Type>::value,
382  "You only want to PutRaw() POD types as raw values.");
383 
384  assert(!closed_);
385 
386  // fast path for writing item into block if it fits.
387  if (TLX_LIKELY(current_ + sizeof(Type) <= end_)) {
388  *reinterpret_cast<Type*>(current_) = item;
389 
390  current_ += sizeof(Type);
391  return *this;
392  }
393 
394  return Append(&item, sizeof(item));
395  }
396 
397  //! \}
398 
399 private:
400  //! Allocate a new block (overwriting the existing one).
401  void AllocateBlock() {
403  if (!bytes_) {
404  sLOG << "AllocateBlock(): throw due to invalid block";
405  throw FullException();
406  }
407  sLOG << "AllocateBlock(): good, got" << bytes_.get();
408  // increase block size, up to max.
409  if (2 * block_size_ < max_block_size_)
410  block_size_ *= 2;
411 
412  current_ = bytes_->begin();
413  end_ = bytes_->end();
414  nitems_ = 0;
415  first_offset_ = 0;
416  }
417 
418  //! current block, already allocated as shared ptr, since we want to use
419  //! make_shared.
421 
422  //! current write pointer into block.
423  Byte* current_ = nullptr;
424 
425  //! current end of block pointer. this is == bytes_.end(), just one
426  //! indirection less.
427  Byte* end_ = nullptr;
428 
429  //! number of items in current block
430  size_t nitems_ = 0;
431 
432  //! offset of first item
433  size_t first_offset_ = 0;
434 
435  //! file or stream sink to output blocks to.
437 
438  //! boolean whether to queue blocks
439  bool do_queue_ = false;
440 
441  //! queue of blocks to flush when the current item has fully been serialized
442  std::deque<PinnedBlock> sink_queue_;
443 
444  //! size of data blocks to construct
445  size_t block_size_;
446 
447  //! size of data blocks to construct
449 
450  //! Flag if Close was called explicitly
451  bool closed_ = false;
452 };
453 
454 //! \}
455 
456 } // namespace data
457 } // namespace thrill
458 
459 #endif // !THRILL_DATA_BLOCK_WRITER_HEADER
460 
461 /******************************************************************************/
virtual void AppendPinnedBlock(PinnedBlock &&b, bool is_last_block)
Appends the PinnedBlock.
Definition: block_sink.hpp:89
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
virtual void ReleaseByteBlock(ByteBlockPtr &block)
Release an unused ByteBlock with n bytes backing memory.
Definition: block_sink.hpp:61
bool do_queue_
boolean whether to queue blocks
void Flush()
Flush the current block (only really meaningful for a network sink).
#define TLX_ATTRIBUTE_ALWAYS_INLINE
double T
BlockWriter & PutByte(Byte data)
Append a single byte to the block.
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
size_t block_size() const
Returns block_size_.
Type
VFS object type.
Definition: file_io.hpp:52
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:182
size_t block_size_
size of data blocks to construct
BlockWriter & operator=(const BlockWriter &)=delete
non-copyable: delete assignment operator
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & PutNoSelfVerify(const T &x)
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
virtual void AppendBlock(const Block &b, bool is_last_block)=0
Appends the (unpinned) Block.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & PutUnsafe(const T &x)
appends a complete item, or aborts with a FullException.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & PutSafe(const T &x)
appends a complete item, or fails safely with a FullException.
BlockWriter(BlockSink &&sink, size_t max_block_size=default_block_size)
Start build (appending blocks) to a File.
size_t nitems_
number of items in current block
static constexpr bool debug
static constexpr bool g_self_verify
Definition: config.hpp:32
#define TLX_LIKELY(c)
Definition: likely.hpp:23
Type * get() const noexcept
return the enclosed pointer.
Byte * current_
current write pointer into block.
size_t start_block_size
starting size of blocks in BlockWriter.
Definition: byte_block.cpp:24
BlockWriter & Append(const void *data, size_t size)
Append a memory range to the block.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & PutRaw(const Type &item)
void AppendBlocks(const std::vector< Block > &blocks)
uint8_t Byte
type of underlying memory area
Definition: byte_block.hpp:37
list x
Definition: gen_data.py:39
int value
Definition: gen_data.py:41
BlockWriter(BlockWriter &&bw) noexcept
move-constructor
static constexpr bool self_verify
PinnedByteBlockPtr bytes_
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
CRTP class to enhance item/memory writer classes with Varint encoding and String encoding.
size_t max_block_size_
size of data blocks to construct
void AppendBlocks(const std::deque< Block > &blocks)
BlockSink sink_
file or stream sink to output blocks to.
size_t first_offset_
offset of first item
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
Definition: block_sink.hpp:28
std::deque< PinnedBlock > sink_queue_
queue of blocks to flush when the current item has fully been serialized
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
~BlockWriter()
On destruction, the last partial block is flushed.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & MarkItem()
Mark beginning of an item.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
BlockWriter()=default
default constructor
bool closed_
Flag if Close was called explicitly.
static constexpr bool allocate_can_fail_
Definition: block_sink.hpp:77
void Close()
Explicitly close the writer.
An Exception is thrown by BlockWriter when the underlying sink does not allow allocation of a new blo...
virtual void Close()=0
Closes the sink. Must not be called multiple times.
void AllocateBlock()
Allocate a new block (overwriting the existing one).
bool IsValid() const
Return whether an actual BlockSink is attached.
virtual PinnedByteBlockPtr AllocateByteBlock(size_t block_size)
Definition: block_sink.hpp:56
BlockWriter & Append(const std::string &str)