13 #ifndef THRILL_API_WRITE_BINARY_HEADER 14 #define THRILL_API_WRITE_BINARY_HEADER 35 template <
typename ValueType>
38 static constexpr
bool debug =
false;
44 template <
typename ParentDIA>
49 { parent.id() }, { parent.node() }),
52 sLOG <<
"Creating write node.";
58 auto pre_op_fn = [=](
const ValueType& input) {
63 auto lop_chain = parent.stack().push(pre_op_fn).fold();
64 parent.node()->AddChild(
this, lop_chain);
72 void PreOp(
const ValueType& input) {
78 writer_->PutNoSelfVerify(input);
85 writer_->PutNoSelfVerify(input);
88 throw std::runtime_error(
89 "Error in WriteBinary: " 90 "an item is larger than the file size limit");
101 <<
"class" <<
"WriteBinaryNode" 116 size_t& stats_total_elements,
117 size_t& stats_total_writes)
126 sLOG <<
"SysFileSink::AppendBlock()" << b;
128 stream_->write(b.data_begin(), b.size());
179 sLOG <<
"OpenNextFile() out_path" << out_path;
181 writer_ = std::make_unique<Writer>(
190 template <
typename ValueType,
typename Stack>
192 const std::string& filepath,
size_t max_file_size)
const {
196 auto node = tlx::make_counting<WriteBinaryNode>(
197 *
this, filepath, max_file_size);
202 template <
typename ValueType,
typename Stack>
204 const std::string& filepath,
size_t max_file_size)
const {
208 auto node = tlx::make_counting<WriteBinaryNode>(
209 *
this, filepath, max_file_size);
217 #endif // !THRILL_API_WRITE_BINARY_HEADER PinnedBlock PinWait(size_t local_worker_id) const
Convenience function to call Pin() and wait for the future.
size_t & stats_total_elements_
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
std::unique_ptr< Writer > writer_
BlockWriter to sink.
#define sLOG
Default logging method: output if the local debug variable is true.
SysFileSink(api::Context &context, size_t local_worker_id, const std::string &path, size_t max_file_size, size_t &stats_total_elements, size_t &stats_total_writes)
Description of the amount of RAM the internal data structures of a DIANode require.
Context & context()
Returns the api::Context of this DIABase.
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
std::string FillFilePattern(const std::string &pathbase, size_t worker, size_t file_part)
Derivative BlockSink which counts and limits how many bytes it has delivered as ByteBlocks for writin...
Future< void > WriteBinaryFuture(const std::string &filepath, size_t max_file_size=128 *1024 *1024) const
WriteBinary is a function, which writes a DIA to many files per worker.
size_t stats_total_elements_
void OpenNextFile()
Function to create sink_ and writer_ for next file.
void PreOp(const ValueType &input)
writer preop: put item into file, create files as needed.
size_t & stats_total_writes_
DIAMemUse PreOpMemUse() final
Amount of RAM used by PreOp after StartPreOp()
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
Implements BlockSink class writing to files with size limit.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
size_t stats_total_writes_
size_t out_serial_
File serial number for this worker.
Specialized template class for ActionFuture which return void.
size_t local_worker_id() const
local worker id to associate pinned block with
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.
std::string out_pathbase_
Base path of the output file.
size_t block_size_
Block size used by BlockWriter.
BlockPool * block_pool() const
Returns block_pool_.
BoundedBlockSink(BlockPool &block_pool, size_t local_worker_id, size_t max_size)
constructor with reference to BlockPool
void Close() final
Closes the sink. Must not be called multiple times.
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
size_t local_worker_id() const
void AppendPinnedBlock(data::PinnedBlock &&b, bool) final
Appends the PinnedBlock.
size_t my_rank() const
Global rank of this worker among all other workers in the system.
void StopPreOp(size_t) final
Closes the output file.
ActionNode(Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
WriteBinaryNode(const ParentDIA &parent, const std::string &path_out, size_t max_file_size)
static uint_pair min()
return an uint_pair instance containing the smallest value possible
void AppendBlock(const data::Block &block, bool is_last_block)
Appends the (unpinned) Block.
common::JsonLogger logger_
A pinned / pin-counted derivative of a Block.
vfs::WriteStreamPtr stream_
static int round_up_to_power_of_two(int i)
does what it says: round up to next power of two
static constexpr bool debug
void WriteBinary(const std::string &filepath, size_t max_file_size=128 *1024 *1024) const
WriteBinary is a function, which writes a DIA to many files per worker.
An Exception is thrown by BlockWriter when the underlying sink does not allow allocation of a new blo...
size_t max_file_size_
Maximum file size.
WriteStreamPtr OpenWriteStream(const std::string &path)
void AppendBlock(data::Block &&block, bool is_last_block)
Appends the (unpinned) Block.
Context & context_
associated Context
BlockSink(BlockPool &block_pool, size_t local_worker_id)
constructor with reference to BlockPool