Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
buf_writer.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/mng/buf_writer.hpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2002-2004 Roman Dementiev <[email protected]>
7  *
8  * Distributed under the Boost Software License, Version 1.0.
9  * (See accompanying file LICENSE_1_0.txt or copy at
10  * http://www.boost.org/LICENSE_1_0.txt)
11  **************************************************************************/
12 
13 #ifndef FOXXLL_MNG_BUF_WRITER_HEADER
14 #define FOXXLL_MNG_BUF_WRITER_HEADER
15 
16 #include <queue>
17 #include <vector>
18 
21 
22 namespace foxxll {
23 
24 //! \defgroup foxxll_schedlayer Block Scheduling Sublayer
25 //! \ingroup foxxll_mnglayer
26 //! Group of classes which help in scheduling
27 //! sequences of read and write requests
28 //! via prefetching and buffered writing
29 //! \{
30 
31 //! Encapsulates asynchronous buffered block writing engine.
32 //!
33 //! \c buffered_writer overlaps I/Os with filling of output buffer.
34 template <typename BlockType>
36 {
37  constexpr static bool debug = false;
38  using block_type = BlockType;
39  using bid_type = typename block_type::bid_type;
40 
41 protected:
42  const size_t nwriteblocks;
46  const size_t writebatchsize;
47 
48  std::vector<size_t> free_write_blocks; // contains free write blocks
49  std::vector<size_t> busy_write_blocks; // blocks that are in writing, notice that if block is not in free_
50  // an not in busy then block is not yet filled
51 
52  struct batch_entry
53  {
54  int64_t offset;
55  size_t ibuffer;
56  batch_entry(int64_t o, size_t b) : offset(o), ibuffer(b) { }
57  };
59  {
60  bool operator () (const batch_entry& a, const batch_entry& b) const
61  {
62  return (a.offset > b.offset);
63  }
64  };
65 
66  using batch_type = std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp>;
67  batch_type batch_write_blocks; // sorted sequence of blocks to write
68 
69 public:
70  //! Constructs an object.
71  //! \param write_buf_size number of write buffers to use
72  //! \param write_batch_size number of blocks to accumulate in
73  //! order to flush write requests (bulk buffered writing)
74  buffered_writer(size_t write_buf_size, size_t write_batch_size)
75  : nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
76  writebatchsize(write_batch_size ? write_batch_size : 1)
77  {
80 
82 
83  for (size_t i = 0; i < nwriteblocks; i++)
84  free_write_blocks.push_back(i);
85 
87  }
88 
89  //! non-copyable: delete copy-constructor
90  buffered_writer(const buffered_writer&) = delete;
91  //! non-copyable: delete assignment operator
93 
94  //! Returns free block from the internal buffer pool.
95  //! \return pointer to the block from the internal buffer pool
97  {
98  size_t ibuffer;
99  for (auto it = busy_write_blocks.begin(); it != busy_write_blocks.end(); ++it)
100  {
101  if (write_reqs[ibuffer = (*it)]->poll())
102  {
103  busy_write_blocks.erase(it);
104  free_write_blocks.push_back(ibuffer);
105 
106  break;
107  }
108  }
109  if (UNLIKELY(free_write_blocks.empty()))
110  {
111  size_t size = busy_write_blocks.size();
112  request_ptr* reqs = new request_ptr[size];
113  size_t i = 0;
114  for ( ; i < size; ++i)
115  {
116  reqs[i] = write_reqs[busy_write_blocks[i]];
117  }
118  size_t completed = wait_any(reqs, size);
119  size_t completed_global = busy_write_blocks[completed];
120  delete[] reqs;
121  busy_write_blocks.erase(busy_write_blocks.begin() + completed);
122 
123  return (write_buffers + completed_global);
124  }
125  ibuffer = free_write_blocks.back();
126  free_write_blocks.pop_back();
127 
128  return (write_buffers + ibuffer);
129  }
130  //! Submits block for writing.
131  //! \param filled_block pointer to the block
132  //! \remark parameter \c filled_block must be value returned by \c get_free_block() or \c write() methods
133  //! \param bid block identifier, a place to write data of the \c filled_block
134  //! \return pointer to the new free block from the pool
135  block_type * write(block_type* filled_block, const bid_type& bid) // writes filled_block and returns a new block
136  {
137  if (batch_write_blocks.size() >= writebatchsize)
138  {
139  // flush batch
140  while (!batch_write_blocks.empty())
141  {
142  size_t ibuffer = batch_write_blocks.top().ibuffer;
143  batch_write_blocks.pop();
144 
145  if (write_reqs[ibuffer].valid())
146  write_reqs[ibuffer]->wait();
147 
148  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
149 
150  busy_write_blocks.push_back(ibuffer);
151  }
152  }
153  TLX_LOG << "Adding write request to batch";
154 
155  size_t ibuffer = filled_block - write_buffers;
156  write_bids[ibuffer] = bid;
157  batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
158 
159  return get_free_block();
160  }
161  //! Flushes not yet written buffers.
162  void flush()
163  {
164  size_t ibuffer;
165  while (!batch_write_blocks.empty())
166  {
167  ibuffer = batch_write_blocks.top().ibuffer;
168  batch_write_blocks.pop();
169 
170  if (write_reqs[ibuffer].valid())
171  write_reqs[ibuffer]->wait();
172 
173  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
174 
175  busy_write_blocks.push_back(ibuffer);
176  }
177  for (auto it = busy_write_blocks.begin(); it != busy_write_blocks.end(); it++)
178  {
179  ibuffer = *it;
180  write_reqs[ibuffer]->wait();
181  }
182 
183  assert(batch_write_blocks.empty());
184  free_write_blocks.clear();
185  busy_write_blocks.clear();
186 
187  for (size_t i = 0; i < nwriteblocks; i++)
188  free_write_blocks.push_back(i);
189  }
190 
191  //! Flushes not yet written buffers and frees used memory.
193  {
194  size_t ibuffer;
195  while (!batch_write_blocks.empty())
196  {
197  ibuffer = batch_write_blocks.top().ibuffer;
198  batch_write_blocks.pop();
199 
200  if (write_reqs[ibuffer].valid())
201  write_reqs[ibuffer]->wait();
202 
203  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
204 
205  busy_write_blocks.push_back(ibuffer);
206  }
207  for (auto it = busy_write_blocks.begin(); it != busy_write_blocks.end(); it++)
208  {
209  ibuffer = *it;
210  write_reqs[ibuffer]->wait();
211  }
212 
213  delete[] write_reqs;
214  delete[] write_buffers;
215  delete[] write_bids;
216  }
217 };
218 
219 //! \}
220 
221 } // namespace foxxll
222 
223 #endif // !FOXXLL_MNG_BUF_WRITER_HEADER
224 
225 /**************************************************************************/
std::vector< size_t > free_write_blocks
Definition: buf_writer.hpp:48
std::priority_queue< batch_entry, std::vector< batch_entry >, batch_entry_cmp > batch_type
Definition: buf_writer.hpp:66
request_ptr * write_reqs
Definition: buf_writer.hpp:45
const size_t writebatchsize
Definition: buf_writer.hpp:46
void flush()
Flushes not yet written buffers.
Definition: buf_writer.hpp:162
buffered_writer(size_t write_buf_size, size_t write_batch_size)
Definition: buf_writer.hpp:74
block_type * get_free_block()
Definition: buf_writer.hpp:96
Definition: buf_writer.hpp:58
static constexpr bool debug
Definition: buf_writer.hpp:37
int64_t offset
Definition: buf_writer.hpp:54
batch_type batch_write_blocks
Definition: buf_writer.hpp:67
typename block_type::bid_type bid_type
Definition: buf_writer.hpp:39
static instance_pointer get_instance()
Definition: singleton.hpp:44
High-performance smart pointer used as a wrapping reference counting pointer.
#define UNLIKELY(c)
Definition: utils.hpp:88
void set_priority_op(const request_queue::priority_op &op)
batch_entry(int64_t o, size_t b)
Definition: buf_writer.hpp:56
const size_t nwriteblocks
Definition: buf_writer.hpp:42
buffered_writer & operator=(const buffered_writer &)=delete
non-copyable: delete assignment operator
block_type * write(block_type *filled_block, const bid_type &bid)
Definition: buf_writer.hpp:135
RequestIterator wait_any(RequestIterator reqs_begin, RequestIterator reqs_end)
~buffered_writer()
Flushes not yet written buffers and frees used memory.
Definition: buf_writer.hpp:192
Definition: buf_writer.hpp:52
std::vector< size_t > busy_write_blocks
Definition: buf_writer.hpp:49
block_type * write_buffers
Definition: buf_writer.hpp:43
bool operator()(const batch_entry &a, const batch_entry &b) const
Definition: buf_writer.hpp:60
#define TLX_LOG
Default logging method: output if the local debug variable is true.
Definition: core.hpp:141
size_t ibuffer
Definition: buf_writer.hpp:55