Thrill  0.1
file.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/file.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #include <thrill/data/file.hpp>
12 
13 #include <deque>
14 #include <string>
15 
16 namespace thrill {
17 namespace data {
18 
19 /******************************************************************************/
20 // File
21 
22 File::File(BlockPool& block_pool, size_t local_worker_id, size_t dia_id)
23  : BlockSink(block_pool, local_worker_id),
24  id_(block_pool.next_file_id()), dia_id_(dia_id) { }
25 
27  // assert(dia_id_ != 0);
28  if (reference_count() != 0) {
29  die("File[" << this << "]::~File() called "
30  "but " << reference_count() << " File::Writer "
31  "handles are still open.");
32  }
33  logger()
34  << "class" << "File"
35  << "event" << "close"
36  << "id" << id_
37  << "dia_id" << dia_id_
38  << "items" << stats_items_
39  << "bytes" << stats_bytes_;
40 }
41 
42 File File::Copy() const {
44  f.blocks_ = blocks_;
49  return f;
50 }
51 
52 void File::Close() {
53  // 2016-02-04: Files are never closed, one can always append. This is
54  // current used by the ReduceTables -tb.
55 }
56 
57 void File::Clear() {
58  std::deque<Block>().swap(blocks_);
59  std::deque<size_t>().swap(num_items_sum_);
60  size_bytes_ = 0;
61 }
62 
63 File::Writer File::GetWriter(size_t block_size) {
64  return Writer(
66 }
67 
68 File::KeepReader File::GetKeepReader(size_t prefetch_size) const {
69  return KeepReader(
70  KeepFileBlockSource(*this, local_worker_id_, prefetch_size));
71 }
72 
74  return ConsumeReader(
75  ConsumeFileBlockSource(this, local_worker_id_, prefetch_size));
76 }
77 
78 File::Reader File::GetReader(bool consume, size_t prefetch_size) {
79  if (consume)
80  return ConstructDynBlockReader<ConsumeFileBlockSource>(
81  this, local_worker_id_, prefetch_size);
82  else
83  return ConstructDynBlockReader<KeepFileBlockSource>(
84  *this, local_worker_id_, prefetch_size);
85 }
86 
88  std::string output;
89  for (const Block& b : blocks_)
90  output += b.PinWait(0).ToString();
91  return output;
92 }
93 
94 std::ostream& operator << (std::ostream& os, const File& f) {
95  os << "[File " << std::hex << &f << std::dec
96  << " refs=" << f.reference_count()
97  << " Blocks=[";
98  size_t i = 0;
99  for (const Block& b : f.blocks_)
100  os << "\n " << i++ << " " << b;
101  return os << "]]";
102 }
103 
104 /******************************************************************************/
105 // KeepFileBlockSource
106 
108  const File& file, size_t local_worker_id,
109  size_t prefetch_size,
110  size_t first_block, size_t first_item)
111  : file_(file), local_worker_id_(local_worker_id),
112  prefetch_size_(prefetch_size),
113  fetching_bytes_(0),
114  first_block_(first_block), current_block_(first_block),
115  first_item_(first_item)
116 { }
117 
119  if (current_block_ == first_block_) {
120  // construct first block differently, in case we want to shorten it.
124  return b;
125  }
126  else {
127  return file_.block(current_block_++);
128  }
129 }
130 
131 void KeepFileBlockSource::Prefetch(size_t prefetch_size) {
132  if (prefetch_size >= prefetch_size_) {
133  prefetch_size_ = prefetch_size;
134  // prefetch #desired bytes
135  while (fetching_bytes_ < prefetch_size_ &&
137  {
138  Block b = MakeNextBlock();
139  fetching_bytes_ += b.size();
140  fetching_blocks_.emplace_back(b.Pin(local_worker_id_));
141  }
142  }
143  else if (prefetch_size < prefetch_size_) {
144  prefetch_size_ = prefetch_size;
145  // cannot discard prefetched Blocks
146  }
147 }
148 
150  if (current_block_ >= file_.num_blocks() && fetching_blocks_.empty())
151  return PinnedBlock();
152 
153  if (prefetch_size_ == 0)
154  {
155  // operate without prefetching
157  }
158  else
159  {
160  // prefetch #desired bytes
161  while (fetching_bytes_ < prefetch_size_ &&
163  {
164  Block b = MakeNextBlock();
165  fetching_bytes_ += b.size();
166  fetching_blocks_.emplace_back(b.Pin(local_worker_id_));
167  }
168 
169  // this might block if the prefetching is not finished
170  PinnedBlock b = fetching_blocks_.front()->Wait();
171  fetching_bytes_ -= b.size();
172  fetching_blocks_.pop_front();
173  return b;
174  }
175 }
176 
178  if (TLX_UNLIKELY(prefetch_size_ != 0 && !fetching_blocks_.empty())) {
179  // next block already prefetched, return it, but don't prefetch more
180  PinnedBlock b = fetching_blocks_.front()->Wait();
181  fetching_bytes_ -= b.size();
182  fetching_blocks_.pop_front();
183  return std::move(b).MoveToBlock();
184  }
185 
187  return Block();
188 
189  return MakeNextBlock();
190 }
191 
193  return block.PinWait(local_worker_id_);
194 }
195 
196 /******************************************************************************/
197 // ConsumeFileBlockSource
198 
200  File* file, size_t local_worker_id, size_t prefetch_size)
201  : file_(file), local_worker_id_(local_worker_id),
202  prefetch_size_(prefetch_size),
203  fetching_bytes_(0) {
205 }
206 
212  s.file_ = nullptr;
213 }
214 
215 void ConsumeFileBlockSource::Prefetch(size_t prefetch_size) {
216  if (prefetch_size >= prefetch_size_) {
217  prefetch_size_ = prefetch_size;
218  // prefetch #desired bytes
219  while (fetching_bytes_ < prefetch_size_ && !file_->blocks_.empty()) {
220  Block& b = file_->blocks_.front();
221  fetching_bytes_ += b.size();
222  fetching_blocks_.emplace_back(b.Pin(local_worker_id_));
223  file_->blocks_.pop_front();
224  }
225  }
226  else if (prefetch_size < prefetch_size_) {
227  prefetch_size_ = prefetch_size;
228  // cannot discard prefetched Blocks
229  }
230 }
231 
233  assert(file_);
234  if (file_->blocks_.empty() && fetching_blocks_.empty())
235  return PinnedBlock();
236 
237  // operate without prefetching
238  if (prefetch_size_ == 0) {
239  PinRequestPtr f = file_->blocks_.front().Pin(local_worker_id_);
240  file_->blocks_.pop_front();
241  return f->Wait();
242  }
243 
244  // prefetch #desired bytes
245  while (fetching_bytes_ < prefetch_size_ && !file_->blocks_.empty()) {
246  Block& b = file_->blocks_.front();
247  fetching_bytes_ += b.size();
248  fetching_blocks_.emplace_back(b.Pin(local_worker_id_));
249  file_->blocks_.pop_front();
250  }
251 
252  // this might block if the prefetching is not finished
253  PinnedBlock b = fetching_blocks_.front()->Wait();
254  fetching_bytes_ -= b.size();
255  fetching_blocks_.pop_front();
256  return b;
257 }
258 
260  assert(file_);
261 
262  if (TLX_UNLIKELY(prefetch_size_ != 0 && !fetching_blocks_.empty())) {
263  // next block already prefetched, return it, but don't prefetch more
264  PinnedBlock b = fetching_blocks_.front()->Wait();
265  fetching_bytes_ -= b.size();
266  fetching_blocks_.pop_front();
267  return std::move(b).MoveToBlock();
268  }
269 
270  if (file_->blocks_.empty())
271  return Block();
272 
273  Block b = file_->blocks_.front();
274  file_->blocks_.pop_front();
275  return b;
276 }
277 
279  return block.PinWait(local_worker_id_);
280 }
281 
283  if (file_ != nullptr)
284  file_->Clear();
285 }
286 
287 } // namespace data
288 } // namespace thrill
289 
290 /******************************************************************************/
PinnedBlock PinWait(size_t local_worker_id) const
Convenience function to call Pin() and wait for the future.
Definition: block.cpp:35
BlockReader< KeepFileBlockSource > KeepReader
Definition: file.hpp:61
size_t local_worker_id_
local worker id reading the File
Definition: file.hpp:385
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
void Prefetch(size_t prefetch_size)
Perform prefetch.
Definition: file.cpp:215
KeepReader GetKeepReader(size_t prefetch_size=File::default_prefetch_size_) const
Get BlockReader for beginning of File.
Definition: file.cpp:68
std::deque< Block > blocks_
container holding Blocks and thus shared pointers to all byte blocks.
Definition: file.hpp:262
static constexpr size_t keep_first_item
sentinel value for not changing the first_item item
Definition: file.hpp:379
PinnedBlock AcquirePin(const Block &block)
Acquire Pin for Block returned from NextBlockUnpinned.
Definition: file.cpp:278
size_t id_
unique file id
Definition: file.hpp:256
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void Clear()
Free all Blocks in the File and deallocate vectors.
Definition: file.cpp:57
size_t fetching_bytes_
current number of bytes in prefetch
Definition: file.hpp:461
PinnedBlock AcquirePin(const Block &block)
Acquire Pin for Block returned from NextBlockUnpinned.
Definition: file.cpp:192
size_t current_block_
index of current block.
Definition: file.hpp:400
PinnedBlock NextBlock()
Get the next block of file.
Definition: file.cpp:232
std::deque< PinRequestPtr > fetching_blocks_
current prefetch operations
Definition: file.hpp:458
size_t dia_id_
optionally associated DIANode id
Definition: file.hpp:259
ConsumeFileBlockSource(File *file, size_t local_worker_id, size_t prefetch_size=File::default_prefetch_size_)
Definition: file.cpp:199
STL namespace.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
size_t size() const
return length of valid data in bytes.
Definition: block.hpp:100
BlockWriter< FileBlockSink > Writer
Definition: file.hpp:59
void set_begin(size_t i)
accessor to begin_
Definition: block.hpp:94
~ConsumeFileBlockSource()
Consume unread blocks and reset File to zero items.
Definition: file.cpp:282
size_t first_item_
offset of first item in first block read
Definition: file.hpp:403
size_t size() const
return length of valid data in bytes.
Definition: block.hpp:245
BlockSink which interfaces to a File.
Definition: file.hpp:290
size_t local_worker_id_
local worker id to associate pinned block with
Definition: block_sink.hpp:103
friend class ConsumeFileBlockSource
Definition: file.hpp:282
size_t first_block_
number of the first block
Definition: file.hpp:397
std::deque< PinRequestPtr > fetching_blocks_
current prefetch operations
Definition: file.hpp:391
size_t stats_bytes_
Definition: file.hpp:274
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:73
BlockReader< ConsumeFileBlockSource > ConsumeReader
Definition: file.hpp:62
void Close() final
Closes the sink. Must not be called multiple times.
Definition: file.cpp:52
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
size_t local_worker_id() const
local worker id to associate pinned block with
Definition: block_sink.hpp:94
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t reference_count() const noexcept
Return the number of references to this object (for debugging)
BlockPool * block_pool() const
Returns block_pool_.
Definition: block_sink.hpp:69
size_t stats_items_
Definition: file.hpp:278
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
friend std::ostream & operator<<(std::ostream &os, const File &f)
Output the Block objects contained in this File.
Definition: file.cpp:94
size_t prefetch_size_
number of bytes of prefetch for reader
Definition: file.hpp:388
PinRequestPtr Pin(size_t local_worker_id) const
Definition: block.cpp:39
const File & file_
file to read blocks from
Definition: file.hpp:382
File(BlockPool &block_pool, size_t local_worker_id, size_t dia_id)
Constructor from BlockPool.
Definition: file.cpp:22
High-performance smart pointer used as a wrapping reference counting pointer.
void Prefetch(size_t prefetch_size)
Perform prefetch.
Definition: file.cpp:131
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
Definition: file.cpp:78
size_t local_worker_id_
local worker id reading the File
Definition: file.hpp:452
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
Definition: block_sink.hpp:28
friend class KeepFileBlockSource
for access to blocks_ and num_items_sum_
Definition: file.hpp:281
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
File * file_
file to consume blocks from (ptr to make moving easier)
Definition: file.hpp:449
KeepFileBlockSource(const File &file, size_t local_worker_id, size_t prefetch_size=File::default_prefetch_size_, size_t first_block=0, size_t first_item=keep_first_item)
Start reading a File.
Definition: file.cpp:107
std::string ReadComplete() const
Definition: file.cpp:87
std::deque< size_t > num_items_sum_
Definition: file.hpp:267
A BlockSource to read and simultaneously consume Blocks from a File.
Definition: file.hpp:414
size_t fetching_bytes_
current number of bytes in prefetch
Definition: file.hpp:394
size_t num_blocks() const
Return the number of blocks.
Definition: file.hpp:177
~File()
write out stats
Definition: file.cpp:26
size_t prefetch_size_
number of bytes of prefetch for reader
Definition: file.hpp:455
size_t size_bytes_
Total size of this file in bytes. Sum of all block sizes.
Definition: file.hpp:270
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
const Block & block(size_t i) const
Return reference to a block.
Definition: file.hpp:191
common::JsonLogger & logger()
Returns BlockPool.logger_.
Definition: block_sink.hpp:66