Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
file.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/file.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #include <thrill/data/file.hpp>
12 
13 #include <deque>
14 #include <string>
15 
16 namespace thrill {
17 namespace data {
18 
19 /******************************************************************************/
20 // File
21 
22 File::File(BlockPool& block_pool, size_t local_worker_id, size_t dia_id)
23  : BlockSink(block_pool, local_worker_id),
24  id_(block_pool.next_file_id()), dia_id_(dia_id) { }
25 
27  // assert(dia_id_ != 0);
28  if (reference_count() != 0) {
29  die("File[" << this << "]::~File() called "
30  "but " << reference_count() << " File::Writer "
31  "handles are still open.");
32  }
33  logger()
34  << "class" << "File"
35  << "event" << "close"
36  << "id" << id_
37  << "dia_id" << dia_id_
38  << "items" << stats_items_
39  << "bytes" << stats_bytes_;
40 }
41 
42 File File::Copy() const {
44  f.blocks_ = blocks_;
49  return f;
50 }
51 
52 void File::Close() {
53  // 2016-02-04: Files are never closed, one can always append. This is
54  // current used by the ReduceTables -tb.
55 }
56 
57 void File::Clear() {
58  std::deque<Block>().swap(blocks_);
59  std::deque<size_t>().swap(num_items_sum_);
60  size_bytes_ = 0;
61 }
62 
63 File::Writer File::GetWriter(size_t block_size) {
64  return Writer(
66 }
67 
68 File::KeepReader File::GetKeepReader(size_t prefetch_size) const {
69  return KeepReader(
70  KeepFileBlockSource(*this, local_worker_id_, prefetch_size));
71 }
72 
74  return ConsumeReader(
75  ConsumeFileBlockSource(this, local_worker_id_, prefetch_size));
76 }
77 
78 File::Reader File::GetReader(bool consume, size_t prefetch_size) {
79  if (consume)
80  return ConstructDynBlockReader<ConsumeFileBlockSource>(
81  this, local_worker_id_, prefetch_size);
82  else
83  return ConstructDynBlockReader<KeepFileBlockSource>(
84  *this, local_worker_id_, prefetch_size);
85 }
86 
88  std::string output;
89  for (const Block& b : blocks_)
90  output += b.PinWait(0).ToString();
91  return output;
92 }
93 
94 std::ostream& operator << (std::ostream& os, const File& f) {
95  os << "[File " << std::hex << &f << std::dec
96  << " refs=" << f.reference_count()
97  << " Blocks=[";
98  size_t i = 0;
99  for (const Block& b : f.blocks_)
100  os << "\n " << i++ << " " << b;
101  return os << "]]";
102 }
103 
104 /******************************************************************************/
105 // KeepFileBlockSource
106 
108  const File& file, size_t local_worker_id,
109  size_t prefetch_size,
110  size_t first_block, size_t first_item)
111  : file_(file), local_worker_id_(local_worker_id),
112  prefetch_size_(prefetch_size),
113  fetching_bytes_(0),
114  first_block_(first_block), current_block_(first_block),
115  first_item_(first_item) { }
116 
117 void KeepFileBlockSource::Prefetch(size_t prefetch_size) {
118  if (prefetch_size >= prefetch_size_) {
119  prefetch_size_ = prefetch_size;
120  // prefetch #desired bytes
121  while (fetching_bytes_ < prefetch_size_ &&
123  {
124  Block b = NextUnpinnedBlock();
125  fetching_bytes_ += b.size();
126  fetching_blocks_.emplace_back(b.Pin(local_worker_id_));
127  }
128  }
129  else if (prefetch_size < prefetch_size_) {
130  prefetch_size_ = prefetch_size;
131  // cannot discard prefetched Blocks
132  }
133 }
134 
136 
137  if (current_block_ >= file_.num_blocks() && fetching_blocks_.empty())
138  return PinnedBlock();
139 
140  if (prefetch_size_ == 0)
141  {
142  // operate without prefetching
144  }
145  else
146  {
147  // prefetch #desired bytes
148  while (fetching_bytes_ < prefetch_size_ &&
150  {
151  Block b = NextUnpinnedBlock();
152  fetching_bytes_ += b.size();
153  fetching_blocks_.emplace_back(b.Pin(local_worker_id_));
154  }
155 
156  // this might block if the prefetching is not finished
157  PinnedBlock b = fetching_blocks_.front()->Wait();
158  fetching_bytes_ -= b.size();
159  fetching_blocks_.pop_front();
160  return b;
161  }
162 }
163 
164 //! Determine current unpinned Block to deliver via NextBlock()
166  if (current_block_ == first_block_) {
167  // construct first block differently, in case we want to shorten it.
171  return b;
172  }
173  else {
174  return file_.block(current_block_++);
175  }
176 }
177 
178 /******************************************************************************/
179 // ConsumeFileBlockSource
180 
182  File* file, size_t local_worker_id, size_t prefetch_size)
183  : file_(file), local_worker_id_(local_worker_id),
184  prefetch_size_(prefetch_size),
185  fetching_bytes_(0) {
187 }
188 
190  : file_(s.file_), local_worker_id_(s.local_worker_id_),
191  prefetch_size_(s.prefetch_size_),
192  fetching_blocks_(std::move(s.fetching_blocks_)),
193  fetching_bytes_(s.fetching_bytes_) {
194  s.file_ = nullptr;
195 }
196 
197 void ConsumeFileBlockSource::Prefetch(size_t prefetch_size) {
198  if (prefetch_size >= prefetch_size_) {
199  prefetch_size_ = prefetch_size;
200  // prefetch #desired bytes
201  while (fetching_bytes_ < prefetch_size_ && !file_->blocks_.empty()) {
202  Block& b = file_->blocks_.front();
203  fetching_bytes_ += b.size();
204  fetching_blocks_.emplace_back(b.Pin(local_worker_id_));
205  file_->blocks_.pop_front();
206  }
207  }
208  else if (prefetch_size < prefetch_size_) {
209  prefetch_size_ = prefetch_size;
210  // cannot discard prefetched Blocks
211  }
212 }
213 
215  assert(file_);
216  if (file_->blocks_.empty() && fetching_blocks_.empty())
217  return PinnedBlock();
218 
219  // operate without prefetching
220  if (prefetch_size_ == 0) {
221  PinRequestPtr f = file_->blocks_.front().Pin(local_worker_id_);
222  file_->blocks_.pop_front();
223  return f->Wait();
224  }
225 
226  // prefetch #desired bytes
227  while (fetching_bytes_ < prefetch_size_ && !file_->blocks_.empty()) {
228  Block& b = file_->blocks_.front();
229  fetching_bytes_ += b.size();
230  fetching_blocks_.emplace_back(b.Pin(local_worker_id_));
231  file_->blocks_.pop_front();
232  }
233 
234  // this might block if the prefetching is not finished
235  PinnedBlock b = fetching_blocks_.front()->Wait();
236  fetching_bytes_ -= b.size();
237  fetching_blocks_.pop_front();
238  return b;
239 }
240 
242  if (file_ != nullptr)
243  file_->Clear();
244 }
245 
246 } // namespace data
247 } // namespace thrill
248 
249 /******************************************************************************/
BlockReader< KeepFileBlockSource > KeepReader
Definition: file.hpp:61
size_t local_worker_id_
local worker id reading the File
Definition: file.hpp:377
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
void Prefetch(size_t prefetch_size)
Perform prefetch.
Definition: file.cpp:197
PinRequestPtr Pin(size_t local_worker_id) const
Definition: block.cpp:39
BlockPool * block_pool() const
Returns block_pool_.
Definition: block_sink.hpp:69
std::deque< Block > blocks_
container holding Blocks and thus shared pointers to all byte blocks.
Definition: file.hpp:262
static constexpr size_t keep_first_item
sentinel value for not changing the first_item item
Definition: file.hpp:371
size_t id_
unique file id
Definition: file.hpp:256
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void Clear()
Free all Blocks in the File and deallocate vectors.
Definition: file.cpp:57
size_t fetching_bytes_
current number of bytes in prefetch
Definition: file.hpp:446
size_t current_block_
index of current block.
Definition: file.hpp:392
PinnedBlock NextBlock()
Get the next block of file.
Definition: file.cpp:214
std::deque< PinRequestPtr > fetching_blocks_
current prefetch operations
Definition: file.hpp:443
size_t dia_id_
optionally associated DIANode id
Definition: file.hpp:259
ConsumeFileBlockSource(File *file, size_t local_worker_id, size_t prefetch_size=File::default_prefetch_size_)
Definition: file.cpp:181
const Block & block(size_t i) const
Return reference to a block.
Definition: file.hpp:191
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
size_t size() const
return length of valid data in bytes.
Definition: block.hpp:245
BlockWriter< FileBlockSink > Writer
Definition: file.hpp:59
std::string ReadComplete() const
Definition: file.cpp:87
void set_begin(size_t i)
accessor to begin_
Definition: block.hpp:94
~ConsumeFileBlockSource()
Consume unread blocks and reset File to zero items.
Definition: file.cpp:241
size_t first_item_
offset of first item in first block read
Definition: file.hpp:395
BlockSink which interfaces to a File.
Definition: file.hpp:290
size_t local_worker_id_
local worker id to associate pinned block with
Definition: block_sink.hpp:103
friend class ConsumeFileBlockSource
Definition: file.hpp:282
size_t first_block_
number of the first block
Definition: file.hpp:389
std::deque< PinRequestPtr > fetching_blocks_
current prefetch operations
Definition: file.hpp:383
size_t stats_bytes_
Definition: file.hpp:274
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:73
BlockReader< ConsumeFileBlockSource > ConsumeReader
Definition: file.hpp:62
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
void Close() final
Closes the sink. Must not be called multiple times.
Definition: file.cpp:52
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
PinnedBlock PinWait(size_t local_worker_id) const
Convenience function to call Pin() and wait for the future.
Definition: block.cpp:35
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t reference_count() const noexcept
Return the number of references to this object (for debugging)
size_t stats_items_
Definition: file.hpp:278
size_t num_blocks() const
Return the number of blocks.
Definition: file.hpp:177
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
size_t prefetch_size_
number of bytes of prefetch for reader
Definition: file.hpp:380
size_t size() const
return length of valid data in bytes.
Definition: block.hpp:100
const File & file_
file to read blocks from
Definition: file.hpp:374
File(BlockPool &block_pool, size_t local_worker_id, size_t dia_id)
Constructor from BlockPool.
Definition: file.cpp:22
High-performance smart pointer used as a wrapping reference counting pointer.
void Prefetch(size_t prefetch_size)
Perform prefetch.
Definition: file.cpp:117
Block NextUnpinnedBlock()
Determine current unpinned Block to deliver via NextBlock()
Definition: file.cpp:165
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
Definition: file.cpp:78
size_t local_worker_id_
local worker id reading the File
Definition: file.hpp:437
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
Definition: block_sink.hpp:28
friend class KeepFileBlockSource
for access to blocks_ and num_items_sum_
Definition: file.hpp:281
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
size_t local_worker_id() const
local worker id to associate pinned block with
Definition: block_sink.hpp:94
File * file_
file to consume blocks from (ptr to make moving easier)
Definition: file.hpp:434
KeepFileBlockSource(const File &file, size_t local_worker_id, size_t prefetch_size=File::default_prefetch_size_, size_t first_block=0, size_t first_item=keep_first_item)
Start reading a File.
Definition: file.cpp:107
std::deque< size_t > num_items_sum_
Definition: file.hpp:267
A BlockSource to read and simultaneously consume Blocks from a File.
Definition: file.hpp:406
size_t fetching_bytes_
current number of bytes in prefetch
Definition: file.hpp:386
~File()
write out stats
Definition: file.cpp:26
size_t prefetch_size_
number of bytes of prefetch for reader
Definition: file.hpp:440
KeepReader GetKeepReader(size_t prefetch_size=File::default_prefetch_size_) const
Get BlockReader for beginning of File.
Definition: file.cpp:68
size_t size_bytes_
Total size of this file in bytes. Sum of all block sizes.
Definition: file.hpp:270
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
common::JsonLogger & logger()
Returns BlockPool.logger_.
Definition: block_sink.hpp:66
std::ostream & operator<<(std::ostream &os, const Block &b)
Definition: block.cpp:22