13 #ifndef FOXXLL_MNG_BUF_WRITER_HEADER 14 #define FOXXLL_MNG_BUF_WRITER_HEADER 36 template <
typename BlockType>
39 constexpr
static bool debug =
false;
41 using bid_type =
typename block_type::bid_type;
68 using batch_type = std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp>;
77 : nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
78 writebatchsize(write_batch_size ? write_batch_size : 1)
86 free_write_blocks.push_back(i);
101 for (
auto it = busy_write_blocks.begin(); it != busy_write_blocks.end(); ++it)
103 if (write_reqs[ibuffer = (*it)]->poll())
105 busy_write_blocks.erase(it);
106 free_write_blocks.push_back(ibuffer);
113 size_t size = busy_write_blocks.size();
116 for ( ; i < size; ++i)
118 reqs[i] = write_reqs[busy_write_blocks[i]];
120 size_t completed =
wait_any(reqs, size);
121 size_t completed_global = busy_write_blocks[completed];
123 busy_write_blocks.erase(busy_write_blocks.begin() + completed);
125 return (write_buffers + completed_global);
127 ibuffer = free_write_blocks.back();
128 free_write_blocks.pop_back();
130 return (write_buffers + ibuffer);
142 while (!batch_write_blocks.empty())
144 size_t ibuffer = batch_write_blocks.top().ibuffer;
145 batch_write_blocks.pop();
147 if (write_reqs[ibuffer].valid())
150 write_reqs[
ibuffer] = write_buffers[
ibuffer].write(write_bids[ibuffer]);
152 busy_write_blocks.push_back(ibuffer);
155 TLX_LOG <<
"Adding write request to batch";
159 batch_write_blocks.push(
batch_entry(bid.offset, ibuffer));
167 while (!batch_write_blocks.empty())
169 ibuffer = batch_write_blocks.top().ibuffer;
170 batch_write_blocks.pop();
172 if (write_reqs[ibuffer].valid())
175 write_reqs[
ibuffer] = write_buffers[
ibuffer].write(write_bids[ibuffer]);
177 busy_write_blocks.push_back(ibuffer);
179 for (
auto it = busy_write_blocks.begin(); it != busy_write_blocks.end(); it++)
185 assert(batch_write_blocks.empty());
186 free_write_blocks.clear();
187 busy_write_blocks.clear();
190 free_write_blocks.push_back(i);
197 while (!batch_write_blocks.empty())
199 ibuffer = batch_write_blocks.top().ibuffer;
200 batch_write_blocks.pop();
202 if (write_reqs[ibuffer].valid())
205 write_reqs[
ibuffer] = write_buffers[
ibuffer].write(write_bids[ibuffer]);
207 busy_write_blocks.push_back(ibuffer);
209 for (
auto it = busy_write_blocks.begin(); it != busy_write_blocks.end(); it++)
225 #endif // !FOXXLL_MNG_BUF_WRITER_HEADER std::vector< size_t > free_write_blocks
std::priority_queue< batch_entry, std::vector< batch_entry >, batch_entry_cmp > batch_type
const size_t writebatchsize
void flush()
Flushes not yet written buffers.
buffered_writer(size_t write_buf_size, size_t write_batch_size)
block_type * get_free_block()
static constexpr bool debug
batch_type batch_write_blocks
typename block_type::bid_type bid_type
static instance_pointer get_instance()
return instance or create base instance if empty
High-performance smart pointer used as a wrapping reference counting pointer.
void set_priority_op(const request_queue::priority_op &op)
batch_entry(int64_t o, size_t b)
const size_t nwriteblocks
buffered_writer & operator=(const buffered_writer &)=delete
non-copyable: delete assignment operator
block_type * write(block_type *filled_block, const bid_type &bid)
RequestIterator wait_any(RequestIterator reqs_begin, RequestIterator reqs_end)
~buffered_writer()
Flushes not yet written buffers and frees used memory.
std::vector< size_t > busy_write_blocks
block_type * write_buffers
#define TLX_LOG
Default logging method: output if the local debug variable is true.