Thrill  0.1
block_pool.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/block_pool.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #include <thrill/common/logger.hpp>
12 #include <thrill/common/math.hpp>
13 #include <thrill/data/block.hpp>
16 #include <thrill/mem/pool.hpp>
17 
18 #include <foxxll/io/file.hpp>
19 #include <foxxll/io/iostats.hpp>
21 #include <tlx/die.hpp>
24 
25 #include <algorithm>
26 #include <functional>
27 #include <iostream>
28 #include <limits>
29 #include <thread>
30 #include <unordered_map>
31 #include <unordered_set>
32 #include <utility>
33 #include <vector>
34 
35 namespace thrill {
36 namespace data {
37 
38 //! debug block life cycle output: create, destroy
39 static constexpr bool debug_blc = false;
40 
41 //! debug block memory alloc and dealloc
42 static constexpr bool debug_alloc = false;
43 
44 //! debug block pinning:
45 static constexpr bool debug_pin = false;
46 
47 //! debug memory requests
48 static constexpr bool debug_mem = false;
49 
50 //! debug block eviction: evict, write complete, read complete
51 static constexpr bool debug_em = false;
52 
53 /******************************************************************************/
54 // std::new_handler() which gets called when malloc() returns nullptr
55 
56 static std::recursive_mutex s_new_mutex;
57 static std::vector<BlockPool*> s_blockpools;
58 
59 static std::atomic<bool> in_new_handler {
60  false
61 };
62 
63 static void OurNewHandler() {
64  std::unique_lock<std::recursive_mutex> lock(s_new_mutex);
65  if (in_new_handler) {
66  printf("new handler called recursively! fixup using mem::Pool!\n");
67  abort();
68  }
69 
70  static bool s_notify_new_handler = false;
71  if (!s_notify_new_handler) {
72  fprintf(stderr, "Thrill: new_handler called! Program is out of C++ heap memory, trying to\n");
73  fprintf(stderr, "Thrill: swap out Blocks to external memory. Check your program's memory usage.\n");
74  in_new_handler = true;
75  }
76 
77  static size_t s_iter = 0;
79 
80  // first try to find a handle to a currently being written block.
81  for (size_t i = 0; i < s_blockpools.size(); ++i) {
82  req = s_blockpools[s_iter]->GetAnyWriting();
83  ++s_iter %= s_blockpools.size();
84  if (req) break;
85  }
86 
87  if (!req) {
88  // if no writing active, evict a block
89  for (size_t i = 0; i < s_blockpools.size(); ++i) {
90  req = s_blockpools[s_iter]->EvictBlockLRU();
91  ++s_iter %= s_blockpools.size();
92  if (req) break;
93  }
94  }
95 
96  if (req) {
97  req->wait();
98  in_new_handler = false;
99  }
100  else {
101  printf("new handler found no ByteBlock to evict.\n");
102  for (size_t i = 0; i < s_blockpools.size(); ++i) {
103  LOG1 << "BlockPool[" << i << "]"
104  << " total_blocks=" << s_blockpools[i]->total_blocks()
105  << " total_bytes=" << s_blockpools[i]->total_bytes()
106  << " pinned_blocks=" << s_blockpools[i]->pinned_blocks()
107  << " writing_blocks=" << s_blockpools[i]->writing_blocks()
108  << " swapped_blocks=" << s_blockpools[i]->swapped_blocks()
109  << " reading_blocks=" << s_blockpools[i]->reading_blocks();
110  }
112  in_new_handler = false;
113 
114  lock.unlock();
115  std::this_thread::sleep_for(std::chrono::milliseconds(100));
116  }
117 }
118 
119 /******************************************************************************/
120 // BlockPool::PinCount
121 
122 struct BlockPool::Counter {
123  //! current counter value
124  size_t value = 0;
125  //! maximum counter value since last stats read
126  size_t hmax = 0;
127 
128  operator size_t () const { return value; }
129 
130  Counter& operator += (size_t v) {
131  value += v;
132  hmax = std::max(hmax, value);
133  return *this;
134  }
135  Counter& operator -= (size_t v) {
136  value -= v;
137  return *this;
138  }
139 
140  //! get last held max value and update to current
141  size_t hmax_update() {
142  size_t m = hmax;
143  hmax = value;
144  return m;
145  }
146 };
147 
148 struct BlockPool::PinCount {
149  //! current total number of pins, where each thread pin counts
150  //! individually.
151  size_t total_pins_ = 0;
152 
153  //! total number of bytes pinned.
154  Counter total_pinned_bytes_;
155 
156  //! maximum number of total pins
157  size_t max_pins = 0;
158 
159  //! maximum number of pinned bytes
160  size_t max_pinned_bytes = 0;
161 
162  //! number of pinned blocks per local worker id - this is used to count
163  //! the amount of memory locked per thread.
164  std::vector<size_t> pin_count_;
165 
166  //! number of bytes pinned per local worker id.
167  std::vector<size_t> pinned_bytes_;
168 
169  //! ctor: initializes vectors to correct size.
170  explicit PinCount(size_t workers_per_host);
171 
172  //! increment pin counter for thread_id by given size in bytes
173  void Increment(size_t local_worker_id, size_t size);
174 
175  //! decrement pin counter for thread_id by given size in bytes
176  void Decrement(size_t local_worker_id, size_t size);
177 
178  //! assert that it is zero.
179  void AssertZero() const;
180 };
181 
182 BlockPool::PinCount::PinCount(size_t workers_per_host)
183  : pin_count_(workers_per_host),
184  pinned_bytes_(workers_per_host) { }
185 
186 void BlockPool::PinCount::Increment(size_t local_worker_id, size_t size) {
187  ++pin_count_[local_worker_id];
188  pinned_bytes_[local_worker_id] += size;
189  ++total_pins_;
190  total_pinned_bytes_ += size;
191  max_pins = std::max(max_pins, total_pins_);
192  max_pinned_bytes = std::max(max_pinned_bytes, total_pinned_bytes_.value);
193 }
194 
195 void BlockPool::PinCount::Decrement(size_t local_worker_id, size_t size) {
196  die_unless(pin_count_[local_worker_id] > 0);
197  die_unless(pinned_bytes_[local_worker_id] >= size);
198  die_unless(total_pins_ > 0);
199  die_unless(total_pinned_bytes_ >= size);
200 
201  --pin_count_[local_worker_id];
202  pinned_bytes_[local_worker_id] -= size;
203  --total_pins_;
204  total_pinned_bytes_ -= size;
205 }
206 
207 void BlockPool::PinCount::AssertZero() const {
208  die_unless(total_pins_ == 0);
209  die_unless(total_pinned_bytes_ == 0);
210  for (const size_t& pc : pin_count_)
211  die_unless(pc == 0);
212  for (const size_t& pb : pinned_bytes_)
213  die_unless(pb == 0);
214 }
215 
216 std::ostream& operator << (std::ostream& os, const BlockPool::PinCount& p) {
217  os << " total_pins_=" << p.total_pins_
218  << " total_pinned_bytes_=" << p.total_pinned_bytes_
219  << " pin_count_=[" << tlx::join(',', p.pin_count_) << "]"
220  << " pinned_bytes_=[" << tlx::join(',', p.pinned_bytes_) << "]"
221  << " max_pin=" << p.max_pins
222  << " max_pinned_bytes=" << p.max_pinned_bytes;
223  return os;
224 }
225 
226 /******************************************************************************/
227 // BlockPool::Data
228 
229 //! type of set of ByteBlocks currently begin written to EM.
230 using WritingMap = std::unordered_map<
232  std::hash<ByteBlock*>, std::equal_to<>,
234 
235 //! type of set of ByteBlocks currently begin read from EM.
236 using ReadingMap = std::unordered_map<
237  ByteBlock*, PinRequestPtr,
238  std::hash<ByteBlock*>, std::equal_to<>,
240  std::pair<ByteBlock* const, PinRequestPtr> > >;
241 
242 class BlockPool::Data
243 {
244 public:
245  //! For waiting on hard memory limit
246  std::condition_variable cv_memory_change_;
247 
248  //! Soft limit for the block pool, blocks will be written to disk if this
249  //! limit is reached. 0 for no limit.
250  size_t soft_ram_limit_;
251 
252  //! Hard limit for the block pool, memory requests will block if this limit
253  //! is reached. 0 for no limit.
254  size_t hard_ram_limit_;
255 
256  //! print a message on the first block evicted to external memory
257  bool notify_em_used_ = false;
258 
259  //! list of all blocks that are _in_memory_ but are _not_ pinned.
261  ByteBlock*, mem::GPoolAllocator<ByteBlock*> > unpinned_blocks_;
262 
263  //! set of ByteBlocks currently begin written to EM.
264  WritingMap writing_;
265 
266  //! set of ByteBlocks currently begin read from EM.
267  ReadingMap reading_;
268 
269  //! set of ByteBlock currently in EM.
270  std::unordered_set<
271  ByteBlock*, std::hash<ByteBlock*>, std::equal_to<>,
273 
274  //! I/O layer stats when BlockPool was created.
275  foxxll::stats_data io_stats_first_;
276 
277  //! I/O layer stats of previous profile tick
278  foxxll::stats_data io_stats_prev_;
279 
280  //! reference to io block manager
282 
283  //! Allocator for ByteBlocks such that they are aligned for faster
284  //! I/O. Allocations are counted via mem_manager_.
286 
287  //! next unique File id
288  std::atomic<size_t> next_file_id_ { 0 };
289 
290  //! number of unpinned bytes
291  Counter unpinned_bytes_;
292 
293  //! pin counter class
294  PinCount pin_count_;
295 
296  //! number of bytes currently begin requested from RAM.
297  size_t requested_bytes_ = 0;
298 
299  //! number of bytes currently being written to EM.
300  Counter writing_bytes_;
301 
302  //! total number of bytes in swapped blocks
303  Counter swapped_bytes_;
304 
305  //! number of bytes currently being read from to EM.
306  Counter reading_bytes_;
307 
308  //! total number of ByteBlocks allocated
309  size_t total_byte_blocks_ = 0;
310 
311  //! condition variable to wait on for ByteBlock deallocation
312  std::condition_variable cv_total_byte_blocks_;
313 
314  //! total number of bytes in all ByteBlocks (in memory or swapped)
315  Counter total_bytes_;
316 
317  //! maximum number of bytes in all ByteBlocks (in memory or swapped)
318  size_t max_total_bytes_ = 0;
319 
320  //! total number of bytes used in RAM by pinned and unpinned blocks, and
321  //! also additionally reserved memory via BlockPoolMemoryHolder.
322  Counter total_ram_bytes_;
323 
324  //! last time statistics where outputted
325  std::chrono::steady_clock::time_point tp_last_
326  = std::chrono::steady_clock::now();
327 
328 public:
329  Data(BlockPool& block_pool,
330  size_t soft_ram_limit, size_t hard_ram_limit,
331  size_t workers_per_host)
332  : soft_ram_limit_(soft_ram_limit),
333  hard_ram_limit_(hard_ram_limit),
335  aligned_alloc_(mem::Allocator<char>(block_pool.mem_manager_)),
336  pin_count_(workers_per_host) { }
337 
338  //! Updates the memory manager for internal memory. If the hard limit is
339  //! reached, the call is blocked intil memory is free'd
340  void IntRequestInternalMemory(std::unique_lock<std::mutex>& lock, size_t size);
341 
342  //! Updates the memory manager for the internal memory, wakes up waiting
343  //! BlockPool::RequestInternalMemory calls
344  void IntReleaseInternalMemory(size_t size);
345 
346  //! Unpins a block. If all pins are removed, the block might be swapped.
347  //! Returns immediately. Actual unpinning is async.
348  void IntUnpinBlock(
349  BlockPool& bp, ByteBlock* block_ptr, size_t local_worker_id);
350 
351  //! Evict a block from the lru list into external memory
352  foxxll::request_ptr IntEvictBlockLRU();
353 
354  //! Evict a block into external memory. The block must be unpinned and not
355  //! swapped.
356  foxxll::request_ptr IntEvictBlock(ByteBlock* block_ptr);
357 
358  //! \name Block Statistics
359  //! \{
360 
361  //! Total number of allocated blocks of this block pool
362  size_t int_total_blocks() noexcept;
363 
364  //! Total number of bytes allocated in blocks of this block pool
365  size_t int_total_bytes() noexcept;
366 
367  //! \}
368 };
369 
370 /******************************************************************************/
371 // BlockPool
372 
373 BlockPool::BlockPool(size_t workers_per_host)
374  : BlockPool(0, 0, nullptr, nullptr, workers_per_host) { }
375 
376 BlockPool::BlockPool(size_t soft_ram_limit, size_t hard_ram_limit,
377  common::JsonLogger* logger, mem::Manager* mem_manager,
378  size_t workers_per_host)
379  : logger_(logger),
380  mem_manager_(mem_manager, "BlockPool"),
381  workers_per_host_(workers_per_host),
382  d_(std::make_unique<Data>(
383  *this, soft_ram_limit, hard_ram_limit, workers_per_host)) {
384 
385  die_unless(hard_ram_limit >= soft_ram_limit);
386  {
387  std::unique_lock<std::recursive_mutex> lock(s_new_mutex);
388  // register BlockPool as method of OurNewHandler to free memory.
389  s_blockpools.reserve(32);
390  s_blockpools.push_back(this);
391 
392  std::set_new_handler(OurNewHandler);
393  }
394 
395  d_->io_stats_first_ = d_->io_stats_prev_ =
397 
398  logger_ << "class" << "BlockPool"
399  << "event" << "create"
400  << "soft_ram_limit" << soft_ram_limit
401  << "hard_ram_limit" << hard_ram_limit;
402 }
403 
405  std::unique_lock<std::mutex> lock(mutex_);
406 
407  // check that not writing any block.
408  while (d_->writing_.begin() != d_->writing_.end()) {
409 
410  ByteBlock* block_ptr = d_->writing_.begin()->first;
411  foxxll::request_ptr req = d_->writing_.begin()->second;
412 
413  LOGC(debug_em)
414  << "BlockPool::~BlockPool() block=" << block_ptr
415  << " is currently begin written to external memory, canceling.";
416 
417  lock.unlock();
418  // cancel I/O request
419  if (!req->cancel()) {
420 
421  LOGC(debug_em)
422  << "BlockPool::~BlockPool() block=" << block_ptr
423  << " is currently begin written to external memory,"
424  << " cancel failed, waiting.";
425 
426  // must still wait for cancellation to complete and the I/O handler.
427  req->wait();
428  }
429  lock.lock();
430 
431  LOGC(debug_em)
432  << "BlockPool::PinBlock block=" << block_ptr
433  << " is currently begin written to external memory,"
434  << " cancel/wait done.";
435  }
436 
437  die_unless(d_->writing_bytes_ == 0);
438 
439  // check that not reading any block.
440  while (d_->reading_.begin() != d_->reading_.end()) {
441 
442  ByteBlock* block_ptr = d_->reading_.begin()->first;
443  PinRequestPtr read = d_->reading_.begin()->second;
444 
445  LOGC(debug_em)
446  << "BlockPool::~BlockPool() block=" << block_ptr
447  << " is currently begin read from external memory, waiting.";
448 
449  lock.unlock();
450  // wait for I/O request for completion and the I/O handler.
451  read->req_->wait();
452  lock.lock();
453  }
454 
455  die_unless(d_->reading_bytes_ == 0);
456 
457  // wait for deletion of last ByteBlocks. this may actually be needed, when
458  // the I/O handlers have been finished, and the corresponding references are
459  // freed, but DestroyBlock() could not be called yet.
460  while (d_->total_byte_blocks_ != 0)
461  d_->cv_total_byte_blocks_.wait(lock);
462 
463  d_->pin_count_.AssertZero();
464  die_unequal(d_->total_ram_bytes_, 0u);
465  die_unequal(d_->total_bytes_, 0u);
466  die_unequal(d_->unpinned_blocks_.size(), 0u);
467 
468  LOGC(debug_pin)
469  << "~BlockPool()"
470  << " max_pin=" << d_->pin_count_.max_pins
471  << " max_pinned_bytes=" << d_->pin_count_.max_pinned_bytes;
472 
473  logger_ << "class" << "BlockPool"
474  << "event" << "destroy"
475  << "max_pins" << d_->pin_count_.max_pins
476  << "max_pinned_bytes" << d_->pin_count_.max_pinned_bytes;
477 
478  std::unique_lock<std::recursive_mutex> s_new_lock(s_new_mutex);
479  s_blockpools.erase(
480  std::find(s_blockpools.begin(), s_blockpools.end(), this));
481 }
482 
484 BlockPool::AllocateByteBlock(size_t size, size_t local_worker_id) {
485  assert(local_worker_id < workers_per_host_);
486  std::unique_lock<std::mutex> lock(mutex_);
487 
488  if (!(size % THRILL_DEFAULT_ALIGN == 0 && tlx::is_power_of_two(size))
489  // make exception to block_size constraint for test programs, which use
490  // irregular block sizes to check all corner cases
491  && d_->hard_ram_limit_ != 0) {
492  die("BlockPool: requested unaligned block_size=" << size << "." <<
493  "ByteBlocks must be >= " << THRILL_DEFAULT_ALIGN << " and a power of two.");
494  }
495 
496  d_->IntRequestInternalMemory(lock, size);
497 
498  // allocate block memory. -- unlock mutex for that time, since it may
499  // require block eviction.
500  lock.unlock();
501  Byte* data = d_->aligned_alloc_.allocate(size);
502  LOGC(debug_alloc)
503  << "ByteBlock aligned_alloc: " << (void*)data << " size " << size;
504  lock.lock();
505 
506  // create tlx::CountingPtr, no need for special make_shared()-equivalent
507  PinnedByteBlockPtr block_ptr(
508  mem::GPool().make<ByteBlock>(this, data, size), local_worker_id);
509  ++d_->total_byte_blocks_;
510  d_->total_bytes_ += size;
511  d_->max_total_bytes_ = std::max(d_->max_total_bytes_, d_->total_bytes_.value);
512  IntIncBlockPinCount(block_ptr.get(), local_worker_id);
513 
514  d_->pin_count_.Increment(local_worker_id, size);
515 
516  LOGC(debug_blc)
517  << "BlockPool::AllocateBlock()"
518  << " ptr=" << block_ptr.get()
519  << " size=" << size
520  << " local_worker_id=" << local_worker_id
521  << " total_blocks()=" << d_->int_total_blocks()
522  << " total_bytes()=" << d_->int_total_bytes()
523  << d_->pin_count_;
524 
525  return block_ptr;
526 }
527 
529  const foxxll::file_ptr& file, uint64_t offset, size_t size) {
530  std::unique_lock<std::mutex> lock(mutex_);
531  // create tlx::CountingPtr, no need for special make_shared()-equivalent
532  ByteBlockPtr block_ptr(
533  mem::GPool().make<ByteBlock>(this, file, offset, size));
534  ++d_->total_byte_blocks_;
535  d_->max_total_bytes_ = std::max(d_->max_total_bytes_, d_->total_bytes_.value);
536  d_->total_bytes_ += size;
537 
538  LOGC(debug_blc)
539  << "BlockPool::MapExternalBlock()"
540  << " ptr=" << block_ptr.get()
541  << " offset=" << offset
542  << " size=" << size;
543 
544  return block_ptr;
545 }
546 
547 //! Pins a block by swapping it in if required.
548 PinRequestPtr BlockPool::PinBlock(const Block& block, size_t local_worker_id) {
549  assert(local_worker_id < workers_per_host_);
550  std::unique_lock<std::mutex> lock(mutex_);
551 
552  ByteBlock* block_ptr = block.byte_block().get();
553 
554  if (block_ptr->pin_count_[local_worker_id] > 0) {
555  // We may get a Block who's underlying is already pinned, since
556  // PinnedBlock become Blocks when transfered between Files or delivered
557  // via GetItemRange() or Scatter().
558 
559  die_unless(!d_->unpinned_blocks_.exists(block_ptr));
560  die_unless(d_->reading_.find(block_ptr) == d_->reading_.end());
561 
562  LOGC(debug_pin)
563  << "BlockPool::PinBlock block=" << &block
564  << " already pinned by thread";
565 
566  IntIncBlockPinCount(block_ptr, local_worker_id);
567 
568  return PinRequestPtr(mem::GPool().make<PinRequest>(
569  this, PinnedBlock(block, local_worker_id)));
570  }
571 
572  if (block_ptr->total_pins_ > 0) {
573  // This block was already pinned by another thread, hence we only need
574  // to get a pin for the new thread.
575 
576  die_unless(!d_->unpinned_blocks_.exists(block_ptr));
577  die_unless(d_->reading_.find(block_ptr) == d_->reading_.end());
578 
579  LOGC(debug_pin)
580  << "BlockPool::PinBlock block=" << block
581  << " already pinned by another thread"
582  << d_->pin_count_;
583 
584  IntIncBlockPinCount(block_ptr, local_worker_id);
585  d_->pin_count_.Increment(local_worker_id, block_ptr->size());
586 
587  return PinRequestPtr(mem::GPool().make<PinRequest>(
588  this, PinnedBlock(block, local_worker_id)));
589  }
590 
591  // check that not writing the block.
592  WritingMap::iterator write_it;
593  while ((write_it = d_->writing_.find(block_ptr)) != d_->writing_.end()) {
594 
595  LOGC(debug_em)
596  << "BlockPool::PinBlock() block=" << block_ptr
597  << " is currently begin written to external memory, canceling.";
598 
599  die_unless(!block_ptr->ext_file_);
600 
601  // get reference count to request, since complete handler removes it
602  // from the map.
603  foxxll::request_ptr req = write_it->second;
604  lock.unlock();
605  // cancel I/O request
606  if (!req->cancel()) {
607 
608  LOGC(debug_em)
609  << "BlockPool::PinBlock() block=" << block_ptr
610  << " is currently begin written to external memory, "
611  << "cancel failed, waiting.";
612 
613  // must still wait for cancellation to complete and the I/O
614  // handler.
615  req->wait();
616  }
617  lock.lock();
618 
619  LOGC(debug_em)
620  << "BlockPool::PinBlock() block=" << block_ptr
621  << " is currently begin written to external memory, "
622  << "cancel/wait done.";
623 
624  // recheck whether block is being written, it may have been evicting
625  // the unlocked time.
626  }
627 
628  // check if block is being loaded. in this case, just deliver the
629  // shared_future.
630  ReadingMap::iterator read_it = d_->reading_.find(block_ptr);
631  if (read_it != d_->reading_.end())
632  return read_it->second;
633 
634  if (block_ptr->in_memory())
635  {
636  // unpinned block in memory, no need to load from EM.
637 
638  // remove from unpinned list
639  die_unless(d_->unpinned_blocks_.exists(block_ptr));
640  d_->unpinned_blocks_.erase(block_ptr);
641  d_->unpinned_bytes_ -= block_ptr->size();
642 
643  IntIncBlockPinCount(block_ptr, local_worker_id);
644  d_->pin_count_.Increment(local_worker_id, block_ptr->size());
645 
646  LOGC(debug_pin)
647  << "BlockPool::PinBlock block=" << &block
648  << " pinned from internal memory"
649  << d_->pin_count_;
650 
651  return PinRequestPtr(mem::GPool().make<PinRequest>(
652  this, PinnedBlock(block, local_worker_id)));
653  }
654 
655  // else need to initiate an async read to get the data.
656 
657  die_unless(block_ptr->em_bid_.storage);
658 
659  // maybe blocking call until memory is available, this also swaps out other
660  // blocks.
661  d_->IntRequestInternalMemory(lock, block_ptr->size());
662 
663  // the requested memory is already counted as a pin.
664  d_->pin_count_.Increment(local_worker_id, block_ptr->size());
665 
666  // initiate reading from EM -- already create PinnedBlock, which will hold
667  // the read data
668  PinRequestPtr read(
669  mem::GPool().make<PinRequest>(
670  this, PinnedBlock(block, local_worker_id), /* ready */ false));
671  d_->reading_[block_ptr] = read;
672 
673  // allocate block memory.
674  lock.unlock();
675  Byte* data = read->byte_block()->data_ =
676  d_->aligned_alloc_.allocate(block_ptr->size());
677  lock.lock();
678 
679  if (!block_ptr->ext_file_) {
680  d_->swapped_.erase(block_ptr);
681  d_->swapped_bytes_ -= block_ptr->size();
682  }
683 
684  LOGC(debug_em)
685  << "BlockPool::PinBlock block=" << block
686  << " requested from external memory"
687  << d_->pin_count_;
688 
689  // issue I/O request, hold the reference to the request in the hashmap
690  read->req_ =
691  block_ptr->em_bid_.storage->aread(
692  // parameters for the read
693  data, block_ptr->em_bid_.offset, block_ptr->size(),
694  // construct an immediate CompletionHandler callback
695  foxxll::completion_handler::make<
697 
698  d_->reading_bytes_ += block_ptr->size();
699 
700  return read;
701 }
702 
703 std::pair<size_t, size_t> BlockPool::MaxMergeDegreePrefetch(size_t num_files) {
704  size_t avail_bytes = hard_ram_limit() / workers_per_host_ / 2;
705  size_t avail_blocks = avail_bytes / default_block_size;
706 
707  if (num_files >= avail_blocks) {
708  // more files than blocks available -> partial merge of avail_bytes
709  // Files with prefetch = 0, which is at most one block per File.
710  return std::make_pair(avail_blocks, 0u);
711  }
712  else {
713  // less files than available Blocks -> split prefetch size equally
714  // among Files.
715  return std::make_pair(num_files, avail_bytes / num_files);
716  }
717 }
718 
719 void PinRequest::OnComplete(foxxll::request* req, bool success) {
720  return block_pool_->OnReadComplete(this, req, success);
721 }
722 
724  PinRequest* read, foxxll::request* req, bool success) {
725  std::unique_lock<std::mutex> lock(mutex_);
726 
727  ByteBlock* block_ptr = read->block_.byte_block().get();
728  size_t block_size = block_ptr->size();
729 
730  LOGC(debug_em)
731  << "OnReadComplete():"
732  << " req " << req << " block " << *block_ptr
733  << " size " << block_size << " done,"
734  << " from " << block_ptr->em_bid_ << " success = " << success;
735  req->check_errors();
736 
737  if (!success)
738  {
739  // request was canceled. this is not an I/O error, but intentional,
740  // e.g. because the Block was deleted.
741 
742  if (!block_ptr->ext_file_) {
743  d_->swapped_.insert(block_ptr);
744  d_->swapped_bytes_ += block_size;
745  }
746 
747  // release memory
748  sLOGC(debug_alloc)
749  << "ByteBlock deallocate"
750  << (void*)read->byte_block()->data_ << "size" << block_size;
751  d_->aligned_alloc_.deallocate(read->byte_block()->data_, block_size);
752 
753  d_->IntReleaseInternalMemory(block_size);
754 
755  // the requested memory was already counted as a pin.
756  d_->pin_count_.Decrement(read->block_.local_worker_id_, block_size);
757 
758  // set delivered PinnedBlock as invalid.
759  read->byte_block().reset();
760  }
761  else // success
762  {
763  // set pin on ByteBlock
764  IntIncBlockPinCount(block_ptr, read->block_.local_worker_id_);
765 
766  if (!block_ptr->ext_file_) {
767  d_->bm_->delete_block(block_ptr->em_bid_);
768  block_ptr->em_bid_ = foxxll::BID<0>();
769  }
770  }
771 
772  read->ready_ = true;
773  d_->reading_bytes_ -= block_size;
774  cv_read_complete_.notify_all();
775 
776  // first released the foxxll::request's file reference because it is
777  // possible that the PinRequest contains the last reference to this
778  // ByteBlock. In that case the ByteBlock's foxxll::file_ptr is released in
779  // the step below, when the PinRequestPtr is deleted. This triggers an
780  // error, because the current foxxll::request is still active on the
781  // foxxll::file_ptr.
782  req->release_file_reference();
783 
784  // remove the PinRequest from the hash map. The problem here is that the
785  // PinRequestPtr may have been discarded (the Pin wasn't needed after
786  // all). In that case, deletion of PinRequest will call Unpin, which creates
787  // a deadlock on the mutex_. Hence, we first move the PinRequest out of the
788  // map, then unlock, and delete it. -tb
789  auto it = d_->reading_.find(block_ptr);
790  die_unless(it != d_->reading_.end());
791  PinRequestPtr holder = std::move(it->second);
792  d_->reading_.erase(it);
793  lock.unlock();
794 }
795 
796 void BlockPool::IncBlockPinCount(ByteBlock* block_ptr, size_t local_worker_id) {
797  std::unique_lock<std::mutex> lock(mutex_);
798  assert(local_worker_id < workers_per_host_);
799  die_unless(block_ptr->pin_count_[local_worker_id] > 0);
800  return IntIncBlockPinCount(block_ptr, local_worker_id);
801 }
802 
803 void BlockPool::IntIncBlockPinCount(ByteBlock* block_ptr, size_t local_worker_id) {
804  assert(local_worker_id < workers_per_host_);
805 
806  ++block_ptr->pin_count_[local_worker_id];
807  ++block_ptr->total_pins_;
808 
809  LOGC(debug_pin)
810  << "BlockPool::IncBlockPinCount()"
811  << " byte_block=" << block_ptr
812  << " ++block.pin_count[" << local_worker_id << "]="
813  << block_ptr->pin_count_[local_worker_id]
814  << " ++block.total_pins_=" << block_ptr->total_pins_
815  << d_->pin_count_;
816 }
817 
818 void BlockPool::DecBlockPinCount(ByteBlock* block_ptr, size_t local_worker_id) {
819  std::unique_lock<std::mutex> lock(mutex_);
820 
821  assert(local_worker_id < workers_per_host_);
822  die_unless(block_ptr->pin_count_[local_worker_id] > 0);
823  die_unless(block_ptr->total_pins_ > 0);
824 
825  size_t p = --block_ptr->pin_count_[local_worker_id];
826  size_t tp = --block_ptr->total_pins_;
827 
828  LOGC(debug_pin)
829  << "BlockPool::DecBlockPinCount()"
830  << " byte_block=" << block_ptr
831  << " --block.pin_count[" << local_worker_id << "]=" << p
832  << " --block.total_pins_=" << tp
833  << " local_worker_id=" << local_worker_id;
834 
835  if (p == 0)
836  d_->IntUnpinBlock(*this, block_ptr, local_worker_id);
837 }
838 
839 void BlockPool::Data::IntUnpinBlock(
840  BlockPool& bp, ByteBlock* block_ptr, size_t local_worker_id) {
841  die_unless(local_worker_id < bp.workers_per_host_);
842 
843  // decrease per-thread total pin count (memory locked by thread)
844  die_unless(block_ptr->pin_count(local_worker_id) == 0);
845 
846  pin_count_.Decrement(local_worker_id, block_ptr->size());
847 
848  if (block_ptr->total_pins_ != 0) {
849  LOGC(debug_pin)
850  << "BlockPool::IntUnpinBlock()"
851  << " --block.total_pins_=" << block_ptr->total_pins_;
852  return;
853  }
854 
855  // if all per-thread pins are zero, allow this Block to be swapped out.
856  die_unless(!unpinned_blocks_.exists(block_ptr));
857  unpinned_blocks_.put(block_ptr);
858  unpinned_bytes_ += block_ptr->size();
859 
860  LOGC(debug_pin)
861  << "BlockPool::IntUnpinBlock()"
862  << " byte_block=" << block_ptr
863  << " --total_pins_=" << block_ptr->total_pins_
864  << " allow swap out.";
865 }
866 
867 size_t BlockPool::total_blocks() noexcept {
868  std::unique_lock<std::mutex> lock(mutex_);
869  return d_->int_total_blocks();
870 }
871 
872 size_t BlockPool::Data::int_total_blocks() noexcept {
873 
874  LOG << "BlockPool::total_blocks()"
875  << " pinned_blocks_=" << pin_count_.total_pins_
876  << " unpinned_blocks_=" << unpinned_blocks_.size()
877  << " writing_.size()=" << writing_.size()
878  << " swapped_.size()=" << swapped_.size()
879  << " reading_.size()=" << reading_.size();
880 
881  return pin_count_.total_pins_
882  + unpinned_blocks_.size() + writing_.size()
883  + swapped_.size() + reading_.size();
884 }
885 
886 size_t BlockPool::hard_ram_limit() noexcept {
887  std::unique_lock<std::mutex> lock(mutex_);
888  return d_->hard_ram_limit_;
889 }
890 
891 size_t BlockPool::total_bytes() noexcept {
892  std::unique_lock<std::mutex> lock(mutex_);
893  return d_->int_total_bytes();
894 }
895 
896 size_t BlockPool::max_total_bytes() noexcept {
897  std::unique_lock<std::mutex> lock(mutex_);
898  return d_->max_total_bytes_;
899 }
900 
901 size_t BlockPool::Data::int_total_bytes() noexcept {
902  LOG << "BlockPool::total_bytes()"
903  << " pinned_bytes_=" << pin_count_.total_pinned_bytes_
904  << " unpinned_bytes_=" << unpinned_bytes_
905  << " writing_bytes_=" << writing_bytes_
906  << " swapped_bytes_=" << swapped_bytes_
907  << " reading_bytes_=" << reading_bytes_;
908 
909  return pin_count_.total_pinned_bytes_
910  + unpinned_bytes_ + writing_bytes_
911  + swapped_bytes_ + reading_bytes_;
912 }
913 
914 size_t BlockPool::pinned_blocks() noexcept {
915  std::unique_lock<std::mutex> lock(mutex_);
916  return d_->pin_count_.total_pins_;
917 }
918 
919 size_t BlockPool::unpinned_blocks() noexcept {
920  std::unique_lock<std::mutex> lock(mutex_);
921  return d_->unpinned_blocks_.size();
922 }
923 
924 size_t BlockPool::writing_blocks() noexcept {
925  std::unique_lock<std::mutex> lock(mutex_);
926  return d_->writing_.size();
927 }
928 
929 size_t BlockPool::swapped_blocks() noexcept {
930  std::unique_lock<std::mutex> lock(mutex_);
931  return d_->swapped_.size();
932 }
933 
934 size_t BlockPool::reading_blocks() noexcept {
935  std::unique_lock<std::mutex> lock(mutex_);
936  return d_->reading_.size();
937 }
938 
939 void BlockPool::DestroyBlock(ByteBlock* block_ptr) {
940  // this method is called by ByteBlockPtr's deleter when the reference
941  // counter reaches zero to deallocate the block.
942  std::unique_lock<std::mutex> lock(mutex_);
943 
944  LOGC(debug_blc)
945  << "BlockPool::DestroyBlock() block_ptr=" << block_ptr
946  << " byte_block=" << *block_ptr;
947 
948  // pinned blocks cannot be destroyed since they are always unpinned first
949  die_unless(block_ptr->total_pins_ == 0);
950 
951  // delete pin_count_ -> mark block as being deleted
952  block_ptr->pin_count_.clear();
953 
954  do {
955  if (block_ptr->in_memory())
956  {
957  // block was evicted, may still be writing to EM.
958  WritingMap::iterator it = d_->writing_.find(block_ptr);
959  if (it != d_->writing_.end()) {
960  // get reference count to request, since complete handler
961  // removes it from the map.
962  foxxll::request_ptr req = it->second;
963 
964  LOGC(debug_em)
965  << "DestroyBlock()"
966  << " canceling write I/O request " << req
967  << " for block " << *block_ptr;
968 
969  lock.unlock();
970  // cancel I/O request
971  if (!req->cancel()) {
972  // must still wait for cancellation to complete and the I/O
973  // handler.
974  req->wait();
975  }
976  lock.lock();
977 
978  // recheck whether block is being written, it may have been
979  // evicting the unlocked time.
980  continue;
981  }
982  }
983  else
984  {
985  // block was being pinned. cancel read operation
986  ReadingMap::iterator it = d_->reading_.find(block_ptr);
987  if (it != d_->reading_.end()) {
988  // get reference count to request, since complete handler
989  // removes it from the map.
990  foxxll::request_ptr req = it->second->req_;
991 
992  LOGC(debug_em)
993  << "DestroyBlock()"
994  << " canceling read I/O request " << req
995  << " for block " << *block_ptr;
996 
997  lock.unlock();
998  // cancel I/O request
999  if (!req->cancel()) {
1000  // must still wait for cancellation to complete and the I/O
1001  // handler.
1002  req->wait();
1003  }
1004  lock.lock();
1005 
1006  // recheck whether block is being read, it may have been
1007  // evicting again in the unlocked time.
1008  continue;
1009  }
1010  }
1011  }
1012  while (0); // NOLINT
1013 
1014  if (block_ptr->ext_file_ && block_ptr->in_memory())
1015  {
1016  LOGC(debug_blc)
1017  << "BlockPool::DestroyBlock() block_ptr=" << block_ptr
1018  << " external block, in memory: release memory.";
1019 
1020  die_unless(d_->unpinned_blocks_.exists(block_ptr));
1021  d_->unpinned_blocks_.erase(block_ptr);
1022  d_->unpinned_bytes_ -= block_ptr->size();
1023 
1024  // release memory
1025  sLOGC(debug_alloc)
1026  << "ByteBlock deallocate"
1027  << (void*)block_ptr->data_ << "size" << block_ptr->size();
1028  d_->aligned_alloc_.deallocate(block_ptr->data_, block_ptr->size());
1029  block_ptr->data_ = nullptr;
1030 
1031  d_->IntReleaseInternalMemory(block_ptr->size());
1032  }
1033  else if (block_ptr->ext_file_)
1034  {
1035  LOGC(debug_blc)
1036  << "BlockPool::DestroyBlock() block_ptr=" << block_ptr
1037  << " external block, but not in memory: nothing to do, thus just"
1038  << " delete the reference";
1039  }
1040  else if (block_ptr->in_memory())
1041  {
1042  LOGC(debug_blc)
1043  << "BlockPool::DestroyBlock() block_ptr=" << block_ptr
1044  << " unpinned block in memory, remove from list";
1045 
1046  if (d_->unpinned_blocks_.exists(block_ptr)) {
1047  d_->unpinned_blocks_.erase(block_ptr);
1048  d_->unpinned_bytes_ -= block_ptr->size();
1049  }
1050 
1051  // release memory
1052  sLOGC(debug_alloc)
1053  << "ByteBlock deallocate"
1054  << (void*)block_ptr->data_ << "size" << block_ptr->size();
1055  d_->aligned_alloc_.deallocate(block_ptr->data_, block_ptr->size());
1056  block_ptr->data_ = nullptr;
1057 
1058  d_->IntReleaseInternalMemory(block_ptr->size());
1059  }
1060  else
1061  {
1062  LOGC(debug_blc)
1063  << "BlockPool::DestroyBlock() block_ptr=" << block_ptr
1064  << " block in external memory, delete block";
1065 
1066  auto it = d_->swapped_.find(block_ptr);
1067  die_unless(it != d_->swapped_.end());
1068 
1069  d_->swapped_.erase(it);
1070  d_->swapped_bytes_ -= block_ptr->size();
1071 
1072  d_->bm_->delete_block(block_ptr->em_bid_);
1073  block_ptr->em_bid_ = foxxll::BID<0>();
1074  }
1075 
1076  assert(d_->total_byte_blocks_ > 0);
1077  assert(d_->total_bytes_ >= block_ptr->size());
1078  --d_->total_byte_blocks_;
1079  d_->total_bytes_ -= block_ptr->size();
1080  d_->cv_total_byte_blocks_.notify_all();
1081 }
1082 
1084  std::unique_lock<std::mutex> lock(mutex_);
1085  return d_->IntRequestInternalMemory(lock, size);
1086 }
1087 
1088 void BlockPool::Data::IntRequestInternalMemory(
1089  std::unique_lock<std::mutex>& lock, size_t size) {
1090 
1091  requested_bytes_ += size;
1092 
1093  LOGC(debug_mem)
1094  << "BlockPool::RequestInternalMemory()"
1095  << " size=" << size
1096  << " total_ram_bytes_=" << total_ram_bytes_
1097  << " writing_bytes_=" << writing_bytes_
1098  << " requested_bytes_=" << requested_bytes_
1099  << " soft_ram_limit_=" << soft_ram_limit_
1100  << " hard_ram_limit_=" << hard_ram_limit_
1101  << pin_count_
1102  << " unpinned_blocks_.size()=" << unpinned_blocks_.size()
1103  << " swapped_.size()=" << swapped_.size();
1104 
1105  while (soft_ram_limit_ != 0 &&
1106  unpinned_blocks_.size() &&
1107  total_ram_bytes_ + requested_bytes_ > soft_ram_limit_ + writing_bytes_)
1108  {
1109  // evict blocks: schedule async writing which increases writing_bytes_.
1110  IntEvictBlockLRU();
1111  }
1112 
1113  // wait up to 60 seconds for other threads to free up memory or pins
1114  static constexpr size_t max_retry = 60;
1115  size_t retry = max_retry;
1116  size_t last_writing_bytes = 0;
1117 
1118  // wait for memory change due to blocks begin written and deallocated.
1119  while (hard_ram_limit_ != 0 && total_ram_bytes_ + size > hard_ram_limit_)
1120  {
1121  while (hard_ram_limit_ != 0 &&
1122  unpinned_blocks_.size() &&
1123  total_ram_bytes_ + requested_bytes_ > hard_ram_limit_ + writing_bytes_)
1124  {
1125  // evict blocks: schedule async writing which increases writing_bytes_.
1126  IntEvictBlockLRU();
1127  }
1128 
1129  cv_memory_change_.wait_for(lock, std::chrono::seconds(1));
1130 
1131  LOGC(debug_mem)
1132  << "BlockPool::RequestInternalMemory() waiting for memory"
1133  << " total_ram_bytes_=" << total_ram_bytes_
1134  << " writing_bytes_=" << writing_bytes_
1135  << " requested_bytes_=" << requested_bytes_
1136  << " soft_ram_limit_=" << soft_ram_limit_
1137  << " hard_ram_limit_=" << hard_ram_limit_
1138  << pin_count_
1139  << " unpinned_blocks_.size()=" << unpinned_blocks_.size()
1140  << " swapped_.size()=" << swapped_.size();
1141 
1142  if (writing_bytes_ == 0 &&
1143  total_ram_bytes_ + requested_bytes_ > hard_ram_limit_) {
1144 
1145  LOG1 << "abort() due to out-of-pinned-memory ???"
1146  << " total_ram_bytes_=" << total_ram_bytes_
1147  << " writing_bytes_=" << writing_bytes_
1148  << " requested_bytes_=" << requested_bytes_
1149  << " soft_ram_limit_=" << soft_ram_limit_
1150  << " hard_ram_limit_=" << hard_ram_limit_
1151  << pin_count_
1152  << " unpinned_blocks_.size()=" << unpinned_blocks_.size()
1153  << " swapped_.size()=" << swapped_.size();
1154 
1155  if (writing_bytes_ == last_writing_bytes) {
1156  if (--retry == 0)
1157  abort();
1158  }
1159  else {
1160  last_writing_bytes = writing_bytes_;
1161  retry = max_retry;
1162  }
1163  }
1164  }
1165 
1166  requested_bytes_ -= size;
1167  total_ram_bytes_ += size;
1168 }
1169 
1170 void BlockPool::AdviseFree(size_t size) {
1171  std::unique_lock<std::mutex> lock(mutex_);
1172 
1173  LOGC(debug_mem)
1174  << "BlockPool::AdviseFree() advice to free memory"
1175  << " size=" << size
1176  << " total_ram_bytes_=" << d_->total_ram_bytes_
1177  << " writing_bytes_=" << d_->writing_bytes_
1178  << " requested_bytes_=" << d_->requested_bytes_
1179  << " soft_ram_limit_=" << d_->soft_ram_limit_
1180  << " hard_ram_limit_=" << d_->hard_ram_limit_
1181  << d_->pin_count_
1182  << " unpinned_blocks_.size()=" << d_->unpinned_blocks_.size()
1183  << " swapped_.size()=" << d_->swapped_.size();
1184 
1185  while (d_->soft_ram_limit_ != 0 && d_->unpinned_blocks_.size() &&
1186  d_->total_ram_bytes_ + d_->requested_bytes_ + size > d_->hard_ram_limit_ + d_->writing_bytes_)
1187  {
1188  // evict blocks: schedule async writing which increases writing_bytes_.
1189  d_->IntEvictBlockLRU();
1190  }
1191 }
1193  std::unique_lock<std::mutex> lock(mutex_);
1194  return d_->IntReleaseInternalMemory(size);
1195 }
1196 
1197 void BlockPool::Data::IntReleaseInternalMemory(size_t size) {
1198 
1199  LOGC(debug_mem)
1200  << "BlockPool::IntReleaseInternalMemory()"
1201  << " size=" << size
1202  << " total_ram_bytes_=" << total_ram_bytes_;
1203 
1204  die_unless(total_ram_bytes_ >= size);
1205  total_ram_bytes_ -= size;
1206 
1207  cv_memory_change_.notify_all();
1208 }
1209 
1210 void BlockPool::EvictBlock(ByteBlock* block_ptr) {
1211  std::unique_lock<std::mutex> lock(mutex_);
1212 
1213  die_unless(block_ptr->in_memory());
1214 
1215  die_unless(d_->unpinned_blocks_.exists(block_ptr));
1216  d_->unpinned_blocks_.erase(block_ptr);
1217  d_->unpinned_bytes_ -= block_ptr->size();
1218 
1219  d_->IntEvictBlock(block_ptr);
1220 }
1221 
1222 foxxll::request_ptr BlockPool::GetAnyWriting() {
1223  std::unique_lock<std::mutex> lock(mutex_);
1224  if (!d_->writing_.size()) return foxxll::request_ptr();
1225  return d_->writing_.begin()->second;
1226 }
1227 
1228 foxxll::request_ptr BlockPool::EvictBlockLRU() {
1229  std::unique_lock<std::mutex> lock(mutex_);
1230  return d_->IntEvictBlockLRU();
1231 }
1232 
1233 foxxll::request_ptr BlockPool::Data::IntEvictBlockLRU() {
1234 
1235  if (!unpinned_blocks_.size()) return foxxll::request_ptr();
1236 
1237  ByteBlock* block_ptr = unpinned_blocks_.pop();
1238  die_unless(block_ptr);
1239  unpinned_bytes_ -= block_ptr->size();
1240 
1241  return IntEvictBlock(block_ptr);
1242 }
1243 
1244 foxxll::request_ptr BlockPool::Data::IntEvictBlock(ByteBlock* block_ptr) {
1245 
1246  // die_unless(block_ptr->block_pool_ == this);
1247 
1248  if (block_ptr->ext_file_) {
1249  // if in external file -> free memory without writing
1250 
1251  LOGC(debug_em)
1252  << "EvictBlock(): " << block_ptr << " - " << *block_ptr
1253  << " from ext_file " << block_ptr->ext_file_;
1254 
1255  // release memory
1256  sLOGC(debug_alloc)
1257  << "ByteBlock deallocate"
1258  << (void*)block_ptr->data_ << "size" << block_ptr->size();
1259  aligned_alloc_.deallocate(block_ptr->data_, block_ptr->size());
1260  block_ptr->data_ = nullptr;
1261 
1262  IntReleaseInternalMemory(block_ptr->size());
1263  return foxxll::request_ptr();
1264  }
1265 
1266  if (!notify_em_used_) {
1267  std::cerr << "Thrill: evicting first Block to external memory. "
1268  "Be aware, that unexpected" << std::endl;
1269  std::cerr << "Thrill: use of external memory may lead to "
1270  "disappointingly slow performance." << std::endl;
1271  notify_em_used_ = true;
1272  }
1273 
1274  die_unless(block_ptr->em_bid_.storage == nullptr);
1275 
1276  // allocate EM block
1277  block_ptr->em_bid_.size = block_ptr->size();
1278  bm_->new_block(foxxll::fully_random(), block_ptr->em_bid_);
1279 
1280  LOGC(debug_em)
1281  << "EvictBlock(): " << block_ptr << " - " << *block_ptr
1282  << " to em_bid " << block_ptr->em_bid_;
1283 
1284  writing_bytes_ += block_ptr->size();
1285 
1286  // initiate writing to EM.
1287  foxxll::request_ptr req =
1288  block_ptr->em_bid_.storage->awrite(
1289  block_ptr->data_, block_ptr->em_bid_.offset, block_ptr->size(),
1290  // construct an immediate CompletionHandler callback
1291  foxxll::completion_handler::make<
1292  ByteBlock, &ByteBlock::OnWriteComplete>(block_ptr));
1293 
1294  return (writing_[block_ptr] = std::move(req));
1295 }
1296 
1298  ByteBlock* block_ptr, foxxll::request* req, bool success) {
1299  std::unique_lock<std::mutex> lock(mutex_);
1300 
1301  LOGC(debug_em)
1302  << "OnWriteComplete(): request " << req << " done,"
1303  << " block " << *block_ptr << " to " << block_ptr->em_bid_
1304  << " success = " << success;
1305  req->check_errors();
1306 
1307  die_unless(!block_ptr->ext_file_);
1308  die_unequal(d_->writing_.erase(block_ptr), 1u);
1309  d_->writing_bytes_ -= block_ptr->size();
1310 
1311  if (!success)
1312  {
1313  // request was canceled. this is not an I/O error, but intentional,
1314  // e.g. because the block was deleted or if it was re-pinned while being
1315  // written to disk.
1316 
1317  if (!block_ptr->is_deleted()) {
1318  die_unless(!d_->unpinned_blocks_.exists(block_ptr));
1319  d_->unpinned_blocks_.put(block_ptr);
1320  d_->unpinned_bytes_ += block_ptr->size();
1321  }
1322 
1323  d_->bm_->delete_block(block_ptr->em_bid_);
1324  block_ptr->em_bid_ = foxxll::BID<0>();
1325  }
1326  else
1327  {
1328  // success
1329 
1330  d_->swapped_.insert(block_ptr);
1331  d_->swapped_bytes_ += block_ptr->size();
1332 
1333  // release memory
1334  sLOGC(debug_alloc)
1335  << "ByteBlock deallocate"
1336  << (void*)block_ptr->data_ << "size" << block_ptr->size();
1337  d_->aligned_alloc_.deallocate(block_ptr->data_, block_ptr->size());
1338  block_ptr->data_ = nullptr;
1339 
1340  d_->IntReleaseInternalMemory(block_ptr->size());
1341  }
1342 }
1343 
1344 void BlockPool::RunTask(const std::chrono::steady_clock::time_point& tp) {
1345  std::unique_lock<std::mutex> lock(mutex_);
1346 
1348  foxxll::stats_data stf = stnow - d_->io_stats_first_;
1349  foxxll::stats_data stp = stnow - d_->io_stats_prev_;
1350  d_->io_stats_prev_ = stnow;
1351 
1352  double elapsed = static_cast<double>(
1353  std::chrono::duration_cast<std::chrono::microseconds>(
1354  tp - d_->tp_last_).count()) / 1e6;
1355  d_->tp_last_ = tp;
1356 
1357  // LOG0 << stp;
1358  // LOG0 << stf;
1359 
1360  size_t unpinned_bytes = d_->unpinned_bytes_.hmax_update();
1361  size_t writing_bytes = d_->writing_bytes_.hmax_update();
1362  size_t reading_bytes = d_->reading_bytes_.hmax_update();
1363  size_t pinned_bytes = d_->pin_count_.total_pinned_bytes_.hmax_update();
1364 
1365  logger_ << "class" << "BlockPool"
1366  << "event" << "profile"
1367  << "total_blocks" << d_->int_total_blocks()
1368  << "total_bytes" << d_->total_bytes_.hmax_update()
1369  << "max_total_bytes" << d_->max_total_bytes_
1370  << "total_ram_bytes" << d_->total_ram_bytes_.hmax_update()
1371  << "ram_bytes"
1372  << (unpinned_bytes + pinned_bytes + writing_bytes + reading_bytes)
1373  << "pinned_blocks" << d_->pin_count_.total_pins_
1374  << "pinned_bytes" << pinned_bytes
1375  << "unpinned_blocks" << d_->unpinned_blocks_.size()
1376  << "unpinned_bytes" << unpinned_bytes
1377  << "swapped_blocks" << d_->swapped_.size()
1378  << "swapped_bytes" << d_->swapped_bytes_.hmax_update()
1379  << "max_pinned_blocks" << d_->pin_count_.max_pins
1380  << "max_pinned_bytes" << d_->pin_count_.max_pinned_bytes
1381  << "writing_blocks" << d_->writing_.size()
1382  << "writing_bytes" << writing_bytes
1383  << "reading_blocks" << d_->reading_.size()
1384  << "reading_bytes" << reading_bytes
1385  << "rd_ops_total" << stf.get_read_count()
1386  << "rd_bytes_total" << stf.get_read_bytes()
1387  << "wr_ops_total" << stf.get_write_count()
1388  << "wr_bytes_total" << stf.get_write_bytes()
1389  << "rd_ops" << stp.get_read_count()
1390  << "rd_bytes" << stp.get_read_bytes()
1391  << "rd_speed" << static_cast<double>(stp.get_read_bytes()) / elapsed
1392  << "wr_ops" << stp.get_write_count()
1393  << "wr_bytes" << stp.get_write_bytes()
1394  << "wr_speed" << static_cast<double>(stp.get_write_bytes()) / elapsed
1395  << "disk_allocation" << d_->bm_->current_allocation();
1396 }
1397 
1399  return ++d_->next_file_id_;
1400 }
1401 
1402 } // namespace data
1403 } // namespace thrill
1404 
1405 /******************************************************************************/
static constexpr bool debug_mem
debug memory requests
Definition: block_pool.cpp:48
std::unordered_map< ByteBlock *, PinRequestPtr, std::hash< ByteBlock * >, std::equal_to<>, mem::GPoolAllocator< std::pair< ByteBlock *const, PinRequestPtr > > > ReadingMap
type of set of ByteBlocks currently begin read from EM.
Definition: block_pool.cpp:240
~BlockPool()
Checks that all blocks were freed.
Definition: block_pool.cpp:404
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
void ReleaseInternalMemory(size_t size)
external_size_type offset
offset within the file of the block (uint64_t)
Definition: bid.hpp:118
static constexpr bool debug_pin
debug block pinning:
Definition: block_pool.cpp:45
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
size_t total_blocks() noexcept
Total number of allocated blocks of this block pool.
Definition: block_pool.cpp:867
friend class PinRequest
for calling OnReadComplete and access to mutex and cvs
Definition: block_pool.hpp:224
tlx::counting_ptr< request > request_ptr
A reference counting pointer for request.
Definition: request.hpp:43
size_t max_total_bytes() noexcept
Maximum total number of bytes allocated in blocks of this block pool.
Definition: block_pool.cpp:896
#define die_unless(X)
Definition: die.hpp:27
std::vector< size_t, mem::GPoolAllocator< size_t > > pin_count_
counts the number of pins in this block per thread_id.
Definition: byte_block.hpp:125
void AdviseFree(size_t size)
size_t size() const
the block size
Definition: byte_block.hpp:84
bool is_deleted() const
true if being deleted
Definition: byte_block.hpp:103
This is an expected O(1) LRU cache which contains a set of key-only elements.
Definition: lru_cache.hpp:42
std::pair< size_t, size_t > MaxMergeDegreePrefetch(size_t num_files)
Definition: block_pool.cpp:703
void OnWriteComplete(foxxll::request *req, bool success)
forwarded to block_pool_
Definition: byte_block.cpp:75
void OnWriteComplete(ByteBlock *block_ptr, foxxll::request *req, bool success)
callback for async write of blocks during eviction
size_t hard_ram_limit() noexcept
Hard limit on amount of memory used for ByteBlock.
Definition: block_pool.cpp:886
#define LOG1
Definition: logger.hpp:28
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
static void OurNewHandler()
Definition: block_pool.cpp:63
void DecBlockPinCount(ByteBlock *block_ptr, size_t local_worker_id)
Decrement a ByteBlock&#39;s pin count and possibly unpin it.
Definition: block_pool.cpp:818
Specialization of block identifier class (BID) for variable size block size.
Definition: bid.hpp:112
#define sLOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:31
virtual request_ptr awrite(void *buffer, offset_type pos, size_type bytes, const completion_handler &on_complete=completion_handler())=0
size_t pinned_blocks() noexcept
Total number of pinned blocks of this block pool.
Definition: block_pool.cpp:914
static bool is_power_of_two(int i)
does what it says: true if i is a power of two
external_size_type get_read_bytes() const
Definition: iostats.cpp:527
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:205
STL namespace.
void reset()
release contained pointer, frees object if this is the last reference.
static std::vector< BlockPool * > s_blockpools
Definition: block_pool.cpp:57
static constexpr bool debug_blc
debug block life cycle output: create, destroy
Definition: block_pool.cpp:39
BlockPool(size_t workers_per_host=1)
Creates a simple BlockPool for tests: allows only one thread, enforces no memory limitations, never swaps to disk.
Definition: block_pool.cpp:373
A non-pinned counting pointer to a ByteBlock.
Definition: byte_block.hpp:176
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
size_t local_worker_id_
thread id of holder of pin
Definition: block.hpp:324
virtual request_ptr aread(void *buffer, offset_type pos, size_type bytes, const completion_handler &on_complete=completion_handler())=0
mem::Manager mem_manager_
local Manager counting only ByteBlock allocations in internal memory.
Definition: block_pool.hpp:191
PinRequestPtr PinBlock(const Block &block, size_t local_worker_id)
Pins a block by swapping it in if required.
Definition: block_pool.cpp:548
void DestroyBlock(ByteBlock *block_ptr)
Destroys the block. Called by ByteBlockPtr&#39;s deleter.
Definition: block_pool.cpp:939
#define THRILL_DEFAULT_ALIGN
size_t workers_per_host_
number of workers per host
Definition: block_pool.hpp:194
void IncBlockPinCount(ByteBlock *block_ptr, size_t local_worker_id)
Increment a ByteBlock&#39;s pin count, requires the pin count to be > 0.
Definition: block_pool.cpp:796
common::JsonLogger logger_
reference to HostContext&#39;s logger or a null sink
Definition: block_pool.hpp:188
foxxll::BID< 0 > em_bid_
Definition: byte_block.hpp:133
void malloc_tracker_print_status()
user function which prints current and peak allocation to stderr
void OnComplete(foxxll::request *req, bool success)
calls BlockPool::OnReadComplete used to tlx::delegate
Definition: block_pool.cpp:719
PinnedByteBlockPtr AllocateByteBlock(size_t size, size_t local_worker_id)
Definition: block_pool.cpp:484
Type * get() const noexcept
return the enclosed pointer.
std::condition_variable cv_read_complete_
Definition: block_pool.hpp:185
static constexpr bool debug_em
debug block eviction: evict, write complete, read complete
Definition: block_pool.cpp:51
uint_pair & operator+=(const uint_pair &b)
addition operator (uses 64-bit arithmetic)
Definition: uint_types.hpp:166
foxxll::file_ptr ext_file_
Definition: byte_block.hpp:137
static constexpr bool debug_alloc
debug block memory alloc and dealloc
Definition: block_pool.cpp:42
unsigned get_write_count() const
Definition: iostats.cpp:512
void OnReadComplete(PinRequest *read, foxxll::request *req, bool success)
callback for async read of blocks for pin requests
Definition: block_pool.cpp:723
Pool & GPool()
singleton instance of global pool for I/O data structures
Definition: pool.cpp:26
external_size_type get_write_bytes() const
Definition: iostats.cpp:543
std::unordered_map< ByteBlock *, foxxll::request_ptr, std::hash< ByteBlock * >, std::equal_to<>, mem::GPoolAllocator< std::pair< ByteBlock *const, foxxll::request_ptr > > > WritingMap
type of set of ByteBlocks currently begin written to EM.
Definition: block_pool.cpp:233
ByteBlockPtr & byte_block()
Definition: block.hpp:340
uint8_t Byte
type of underlying memory area
Definition: byte_block.hpp:37
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
void check_errors()
Rises an exception if there were error with the I/O.
Definition: request.hpp:114
size_t workers_per_host() const
return number of workers per host
Definition: block_pool.hpp:82
#define die_unequal(X, Y)
Definition: die.hpp:50
A ByteBlock is the basic storage units of containers like File, BlockQueue, etc.
Definition: byte_block.hpp:51
int value
Definition: gen_data.py:41
static std::atomic< bool > in_new_handler
Definition: block_pool.cpp:59
unique_ptr< T > make_unique(Manager &manager, Args &&... args)
make_unique with Manager tracking
Definition: allocator.hpp:212
size_t swapped_blocks() noexcept
Total number of swapped blocks.
Definition: block_pool.cpp:929
size_t writing_blocks() noexcept
Total number of blocks currently begin written.
Definition: block_pool.cpp:924
foxxll::request_ptr EvictBlockLRU()
static instance_pointer get_instance()
return instance or create base instance if empty
Definition: singleton.hpp:41
const ByteBlockPtr & byte_block() const
access to byte_block_
Definition: block.hpp:226
unsigned get_read_count() const
Definition: iostats.cpp:497
bool in_memory() const
true if block resides in memory
Definition: byte_block.hpp:98
size_t total_bytes() noexcept
Total number of bytes allocated in blocks of this block pool.
Definition: block_pool.cpp:891
size_t pin_count(size_t local_worker_id) const
return current pin count
Definition: byte_block.hpp:90
file * storage
pointer to the file of the block
Definition: bid.hpp:116
common::JsonLogger & logger()
Returns logger_.
Definition: block_pool.hpp:85
High-performance smart pointer used as a wrapping reference counting pointer.
friend std::ostream & operator<<(std::ostream &os, const PinCount &p)
make ostream-able
Definition: block_pool.cpp:216
size_t reading_blocks() noexcept
Total number of blocks currently begin read from EM.
Definition: block_pool.cpp:934
void release_file_reference()
Definition: request.cpp:95
Block manager class.
foxxll::request_ptr GetAnyWriting()
Return any currently being written block (for waiting on completion)
static std::recursive_mutex s_new_mutex
Definition: block_pool.cpp:56
Object shared by allocators and other classes to track memory allocations.
Definition: manager.hpp:28
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
size_t next_file_id()
return next unique File id
std::atomic< bool > ready_
indication that the PinnedBlocks ready
Definition: block.hpp:358
Request object encapsulating basic properties like file and offset.
Definition: request.hpp:49
std::string join(char glue, const std::vector< std::string > &parts)
Join a vector of strings by some glue character between each pair from the sequence.
Definition: join.cpp:16
tlx::CountingPtr< PinRequest, mem::GPoolDeleter< PinRequest > > PinRequestPtr
Definition: block.hpp:34
size_t size
size of the block in bytes
Definition: bid.hpp:120
JsonLogger is a receiver of JSON output objects for logging.
Definition: json_logger.hpp:69
size_t unpinned_blocks() noexcept
Total number of unpinned blocks in memory of this block pool.
Definition: block_pool.cpp:919
void IntIncBlockPinCount(ByteBlock *block_ptr, size_t local_worker_id)
Increment a ByteBlock&#39;s pin count - without locking the mutex.
Definition: block_pool.cpp:803
void EvictBlock(ByteBlock *block_ptr)
PinnedBlock block_
Definition: block.hpp:353
friend class ByteBlock
for calling OnWriteComplete
Definition: block_pool.hpp:221
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
void RunTask(const std::chrono::steady_clock::time_point &tp) final
method called by ProfileThread.
std::mutex mutex_
locked before internal state is changed
Definition: block_pool.hpp:181
std::unique_ptr< Data > d_
pimpl data structure
Definition: block_pool.hpp:203
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
const ByteBlockPtr & byte_block() const
access to byte_block_
Definition: block.hpp:79
void RequestInternalMemory(size_t size)
ByteBlockPtr MapExternalBlock(const foxxll::file_ptr &file, uint64_t offset, size_t size)
Definition: block_pool.cpp:528