12 #ifndef THRILL_DATA_BLOCK_WRITER_HEADER 13 #define THRILL_DATA_BLOCK_WRITER_HEADER 52 template <
typename BlockSink>
57 static constexpr
bool debug =
false;
64 : sink_(
std::move(sink)),
66 max_block_size_(max_block_size) {
67 assert(max_block_size_ > 0);
80 : bytes_(std::move(bw.bytes_)),
81 current_(std::move(bw.current_)),
82 end_(std::move(bw.end_)),
83 nitems_(std::move(bw.nitems_)),
84 first_offset_(std::move(bw.first_offset_)),
85 sink_(std::move(bw.sink_)),
86 do_queue_(std::move(bw.do_queue_)),
87 sink_queue_(std::move(bw.sink_queue_)),
88 block_size_(std::move(bw.block_size_)),
89 max_block_size_(std::move(bw.max_block_size_)),
90 closed_(std::move(bw.closed_)) {
97 if (
this == &bw)
return *
this;
99 bytes_ = std::move(bw.bytes_);
100 current_ = std::move(bw.current_);
101 end_ = std::move(bw.end_);
102 nitems_ = std::move(bw.nitems_);
103 first_offset_ = std::move(bw.first_offset_);
104 sink_ = std::move(bw.sink_);
105 do_queue_ = std::move(bw.do_queue_);
106 sink_queue_ = std::move(bw.sink_queue_);
107 block_size_ = std::move(bw.block_size_);
108 max_block_size_ = std::move(bw.max_block_size_);
109 closed_ = std::move(bw.closed_);
129 bool IsValid()
const {
return sink_.IsValid(); }
133 return bytes_ && (current_ != bytes_->begin() || nitems_ != 0);
146 if (current_ == bytes_->begin() && nitems_ == 0)
return;
149 sLOG <<
"Flush(): queue" << bytes_.get();
150 sink_queue_.emplace_back(
151 std::move(bytes_), 0, current_ - bytes_->begin(),
152 first_offset_, nitems_,
153 static_cast<bool>( self_verify));
156 sLOG <<
"Flush(): flush" << bytes_.get();
157 sink_.AppendPinnedBlock(
158 PinnedBlock(std::move(bytes_), 0, current_ - bytes_->begin(),
159 first_offset_, nitems_,
160 static_cast<bool>(self_verify)),
167 current_ = end_ =
nullptr;
174 for (std::vector<Block>::const_iterator bi = blocks.begin();
175 bi != blocks.end(); ++bi) {
176 sink_.AppendBlock(*bi, bi + 1 == blocks.end());
184 for (std::deque<Block>::const_iterator bi = blocks.begin();
185 bi != blocks.end(); ++bi) {
186 sink_.AppendBlock(*bi, bi + 1 == blocks.end());
196 if (current_ == end_)
197 Flush(), AllocateBlock();
200 first_offset_ = current_ - bytes_->begin();
208 template <
typename T>
214 return PutUnsafe<T>(
x);
216 return PutSafe<T>(
x);
221 template <
typename T>
227 return PutUnsafe<T, true>(
x);
229 return PutSafe<T, true>(
x);
233 template <
typename T,
bool NoSelfVerify = false>
242 Flush(), AllocateBlock();
255 Byte* initial_current = current_;
256 size_t initial_nitems = nitems_;
257 size_t initial_first_offset = first_offset_;
262 first_offset_ = current_ - bytes_->begin();
266 if (self_verify && !NoSelfVerify) {
268 PutRaw(
typeid(
T).hash_code());
273 while (!sink_queue_.empty()) {
274 sink_.AppendPinnedBlock(
275 std::move(sink_queue_.front()),
false);
276 sink_queue_.pop_front();
286 while (!sink_queue_.empty()) {
287 sLOG <<
"releasing" << bytes_.get();
288 sink_.ReleaseByteBlock(bytes_);
291 sink_queue_.pop_back();
296 sLOG <<
"reset" << bytes_.get();
298 current_ = initial_current;
299 end_ = bytes_->end();
300 nitems_ = initial_nitems;
301 first_offset_ = initial_first_offset;
309 template <
typename T,
bool NoSelfVerify = false>
316 Flush(), AllocateBlock();
319 first_offset_ = current_ - bytes_->begin();
323 if (self_verify && !NoSelfVerify) {
325 PutRaw(
typeid(
T).hash_code());
330 throw std::runtime_error(
331 "BlockSink was full even though declared infinite");
346 const Byte* cdata =
reinterpret_cast<const Byte*
>(data);
350 size_t partial_size = end_ - current_;
351 std::copy(cdata, cdata + partial_size, current_);
353 cdata += partial_size;
354 size -= partial_size;
355 current_ += partial_size;
357 Flush(), AllocateBlock();
361 std::copy(cdata, cdata + size, current_);
372 Flush(), AllocateBlock();
381 return Append(str.data(), str.size());
386 template <
typename Type>
390 "You only want to PutRaw() POD types as raw values.");
396 *
reinterpret_cast<Type*
>(current_) = item;
398 current_ +=
sizeof(
Type);
402 return Append(&item,
sizeof(item));
410 bytes_ = sink_.AllocateByteBlock(block_size_);
412 sLOG <<
"AllocateBlock(): throw due to invalid block";
415 sLOG <<
"AllocateBlock(): good, got" << bytes_.get();
417 if (2 * block_size_ < max_block_size_)
420 current_ = bytes_->begin();
421 end_ = bytes_->end();
441 size_t first_offset_ = 0;
447 bool do_queue_ =
false;
459 bool closed_ =
false;
467 #endif // !THRILL_DATA_BLOCK_WRITER_HEADER #define sLOG
Default logging method: output if the local debug variable is true.
void Flush()
Flush the current block (only really meaningful for a network sink).
#define TLX_ATTRIBUTE_ALWAYS_INLINE
BlockWriter & PutByte(Byte data)
Append a single byte to the block.
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
A pinned / pin-counted pointer to a ByteBlock.
size_t block_size_
size of data blocks to construct
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & PutNoSelfVerify(const T &x)
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & PutUnsafe(const T &x)
appends a complete item, or aborts with a FullException.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & PutSafe(const T &x)
appends a complete item, or fails safely with a FullException.
BlockWriter(BlockSink &&sink, size_t max_block_size=default_block_size)
Start build (appending blocks) to a File.
static constexpr bool g_self_verify
bool IsValid() const
Return whether an actual BlockSink is attached.
BlockSink & sink()
Returns sink_.
size_t start_block_size
starting size of blocks in BlockWriter.
BlockWriter & Append(const void *data, size_t size)
Append a memory range to the block.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & PutRaw(const Type &item)
void AppendBlocks(const std::vector< Block > &blocks)
uint8_t Byte
type of underlying memory area
BlockWriter(BlockWriter &&bw) noexcept
move-constructor
PinnedByteBlockPtr bytes_
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
static constexpr bool debug
size_t max_block_size_
size of data blocks to construct
bool HasBufferData() const
Return true if any data is buffered.
void AppendBlocks(const std::deque< Block > &blocks)
BlockSink sink_
file or stream sink to output blocks to.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
std::deque< PinnedBlock > sink_queue_
queue of blocks to flush when the current item has fully been serialized
A pinned / pin-counted derivative of a Block.
~BlockWriter()
On destruction, the last partial block is flushed.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & MarkItem()
Mark beginning of an item.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
size_t block_size() const
Returns block_size_.
static constexpr bool allocate_can_fail_
void Close()
Explicitly close the writer.
An Exception is thrown by BlockWriter when the underlying sink does not allow allocation of a new blo...
PinnedByteBlockPtr StealPinnedByteBlock() &&
void AllocateBlock()
Allocate a new block (overwriting the existing one).
BlockWriter & Append(const std::string &str)