Thrill  0.1
buf_writer.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/mng/buf_writer.hpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2002-2004 Roman Dementiev <[email protected]>
7  *
8  * Distributed under the Boost Software License, Version 1.0.
9  * (See accompanying file LICENSE_1_0.txt or copy at
10  * http://www.boost.org/LICENSE_1_0.txt)
11  **************************************************************************/
12 
13 #ifndef FOXXLL_MNG_BUF_WRITER_HEADER
14 #define FOXXLL_MNG_BUF_WRITER_HEADER
15 
16 #include <queue>
17 #include <vector>
18 
21 
22 #include <tlx/define/likely.hpp>
23 
24 namespace foxxll {
25 
26 //! \defgroup foxxll_schedlayer Block Scheduling Sublayer
27 //! \ingroup foxxll_mnglayer
28 //! Group of classes which help in scheduling
29 //! sequences of read and write requests
30 //! via prefetching and buffered writing
31 //! \{
32 
33 //! Encapsulates asynchronous buffered block writing engine.
34 //!
35 //! \c buffered_writer overlaps I/Os with filling of output buffer.
36 template <typename BlockType>
38 {
39  constexpr static bool debug = false;
40  using block_type = BlockType;
41  using bid_type = typename block_type::bid_type;
42 
43 protected:
44  const size_t nwriteblocks;
48  const size_t writebatchsize;
49 
50  std::vector<size_t> free_write_blocks; // contains free write blocks
51  std::vector<size_t> busy_write_blocks; // blocks that are in writing, notice that if block is not in free_
52  // an not in busy then block is not yet filled
53 
54  struct batch_entry
55  {
56  int64_t offset;
57  size_t ibuffer;
58  batch_entry(int64_t o, size_t b) : offset(o), ibuffer(b) { }
59  };
61  {
62  bool operator () (const batch_entry& a, const batch_entry& b) const
63  {
64  return (a.offset > b.offset);
65  }
66  };
67 
68  using batch_type = std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp>;
69  batch_type batch_write_blocks; // sorted sequence of blocks to write
70 
71 public:
72  //! Constructs an object.
73  //! \param write_buf_size number of write buffers to use
74  //! \param write_batch_size number of blocks to accumulate in
75  //! order to flush write requests (bulk buffered writing)
76  buffered_writer(size_t write_buf_size, size_t write_batch_size)
77  : nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
78  writebatchsize(write_batch_size ? write_batch_size : 1)
79  {
80  write_buffers = new block_type[nwriteblocks];
81  write_reqs = new request_ptr[nwriteblocks];
82 
83  write_bids = new bid_type[nwriteblocks];
84 
85  for (size_t i = 0; i < nwriteblocks; i++)
86  free_write_blocks.push_back(i);
87 
89  }
90 
91  //! non-copyable: delete copy-constructor
92  buffered_writer(const buffered_writer&) = delete;
93  //! non-copyable: delete assignment operator
95 
96  //! Returns free block from the internal buffer pool.
97  //! \return pointer to the block from the internal buffer pool
99  {
100  size_t ibuffer;
101  for (auto it = busy_write_blocks.begin(); it != busy_write_blocks.end(); ++it)
102  {
103  if (write_reqs[ibuffer = (*it)]->poll())
104  {
105  busy_write_blocks.erase(it);
106  free_write_blocks.push_back(ibuffer);
107 
108  break;
109  }
110  }
111  if (TLX_UNLIKELY(free_write_blocks.empty()))
112  {
113  size_t size = busy_write_blocks.size();
114  request_ptr* reqs = new request_ptr[size];
115  size_t i = 0;
116  for ( ; i < size; ++i)
117  {
118  reqs[i] = write_reqs[busy_write_blocks[i]];
119  }
120  size_t completed = wait_any(reqs, size);
121  size_t completed_global = busy_write_blocks[completed];
122  delete[] reqs;
123  busy_write_blocks.erase(busy_write_blocks.begin() + completed);
124 
125  return (write_buffers + completed_global);
126  }
127  ibuffer = free_write_blocks.back();
128  free_write_blocks.pop_back();
129 
130  return (write_buffers + ibuffer);
131  }
132  //! Submits block for writing.
133  //! \param filled_block pointer to the block
134  //! \remark parameter \c filled_block must be value returned by \c get_free_block() or \c write() methods
135  //! \param bid block identifier, a place to write data of the \c filled_block
136  //! \return pointer to the new free block from the pool
137  block_type * write(block_type* filled_block, const bid_type& bid) // writes filled_block and returns a new block
138  {
139  if (batch_write_blocks.size() >= writebatchsize)
140  {
141  // flush batch
142  while (!batch_write_blocks.empty())
143  {
144  size_t ibuffer = batch_write_blocks.top().ibuffer;
145  batch_write_blocks.pop();
146 
147  if (write_reqs[ibuffer].valid())
148  write_reqs[ibuffer]->wait();
149 
150  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
151 
152  busy_write_blocks.push_back(ibuffer);
153  }
154  }
155  TLX_LOG << "Adding write request to batch";
156 
157  size_t ibuffer = filled_block - write_buffers;
158  write_bids[ibuffer] = bid;
159  batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
160 
161  return get_free_block();
162  }
163  //! Flushes not yet written buffers.
164  void flush()
165  {
166  size_t ibuffer;
167  while (!batch_write_blocks.empty())
168  {
169  ibuffer = batch_write_blocks.top().ibuffer;
170  batch_write_blocks.pop();
171 
172  if (write_reqs[ibuffer].valid())
173  write_reqs[ibuffer]->wait();
174 
175  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
176 
177  busy_write_blocks.push_back(ibuffer);
178  }
179  for (auto it = busy_write_blocks.begin(); it != busy_write_blocks.end(); it++)
180  {
181  ibuffer = *it;
182  write_reqs[ibuffer]->wait();
183  }
184 
185  assert(batch_write_blocks.empty());
186  free_write_blocks.clear();
187  busy_write_blocks.clear();
188 
189  for (size_t i = 0; i < nwriteblocks; i++)
190  free_write_blocks.push_back(i);
191  }
192 
193  //! Flushes not yet written buffers and frees used memory.
195  {
196  size_t ibuffer;
197  while (!batch_write_blocks.empty())
198  {
199  ibuffer = batch_write_blocks.top().ibuffer;
200  batch_write_blocks.pop();
201 
202  if (write_reqs[ibuffer].valid())
203  write_reqs[ibuffer]->wait();
204 
205  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
206 
207  busy_write_blocks.push_back(ibuffer);
208  }
209  for (auto it = busy_write_blocks.begin(); it != busy_write_blocks.end(); it++)
210  {
211  ibuffer = *it;
212  write_reqs[ibuffer]->wait();
213  }
214 
215  delete[] write_reqs;
216  delete[] write_buffers;
217  delete[] write_bids;
218  }
219 };
220 
221 //! \}
222 
223 } // namespace foxxll
224 
225 #endif // !FOXXLL_MNG_BUF_WRITER_HEADER
226 
227 /**************************************************************************/
std::vector< size_t > free_write_blocks
Definition: buf_writer.hpp:50
std::priority_queue< batch_entry, std::vector< batch_entry >, batch_entry_cmp > batch_type
Definition: buf_writer.hpp:68
request_ptr * write_reqs
Definition: buf_writer.hpp:47
const size_t writebatchsize
Definition: buf_writer.hpp:48
void flush()
Flushes not yet written buffers.
Definition: buf_writer.hpp:164
buffered_writer(size_t write_buf_size, size_t write_batch_size)
Definition: buf_writer.hpp:76
block_type * get_free_block()
Definition: buf_writer.hpp:98
Definition: buf_writer.hpp:60
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
static constexpr bool debug
Definition: buf_writer.hpp:39
int64_t offset
Definition: buf_writer.hpp:56
batch_type batch_write_blocks
Definition: buf_writer.hpp:69
typename block_type::bid_type bid_type
Definition: buf_writer.hpp:41
FOXXLL library namespace
static instance_pointer get_instance()
return instance or create base instance if empty
Definition: singleton.hpp:41
High-performance smart pointer used as a wrapping reference counting pointer.
void set_priority_op(const request_queue::priority_op &op)
batch_entry(int64_t o, size_t b)
Definition: buf_writer.hpp:58
const size_t nwriteblocks
Definition: buf_writer.hpp:44
buffered_writer & operator=(const buffered_writer &)=delete
non-copyable: delete assignment operator
block_type * write(block_type *filled_block, const bid_type &bid)
Definition: buf_writer.hpp:137
RequestIterator wait_any(RequestIterator reqs_begin, RequestIterator reqs_end)
~buffered_writer()
Flushes not yet written buffers and frees used memory.
Definition: buf_writer.hpp:194
Definition: buf_writer.hpp:54
std::vector< size_t > busy_write_blocks
Definition: buf_writer.hpp:51
block_type * write_buffers
Definition: buf_writer.hpp:45
#define TLX_LOG
Default logging method: output if the local debug variable is true.
Definition: core.hpp:141
size_t ibuffer
Definition: buf_writer.hpp:57