30 #include <unordered_map> 31 #include <unordered_set> 64 std::unique_lock<std::recursive_mutex> lock(s_new_mutex);
66 printf(
"new handler called recursively! fixup using mem::Pool!\n");
70 static bool s_notify_new_handler =
false;
71 if (!s_notify_new_handler) {
72 fprintf(stderr,
"Thrill: new_handler called! Program is out of C++ heap memory, trying to\n");
73 fprintf(stderr,
"Thrill: swap out Blocks to external memory. Check your program's memory usage.\n");
77 static size_t s_iter = 0;
81 for (
size_t i = 0; i < s_blockpools.size(); ++i) {
82 req = s_blockpools[s_iter]->GetAnyWriting();
83 ++s_iter %= s_blockpools.size();
89 for (
size_t i = 0; i < s_blockpools.size(); ++i) {
90 req = s_blockpools[s_iter]->EvictBlockLRU();
91 ++s_iter %= s_blockpools.size();
101 printf(
"new handler found no ByteBlock to evict.\n");
102 for (
size_t i = 0; i < s_blockpools.size(); ++i) {
103 LOG1 <<
"BlockPool[" << i <<
"]" 104 <<
" total_blocks=" << s_blockpools[i]->total_blocks()
105 <<
" total_bytes=" << s_blockpools[i]->total_bytes()
106 <<
" pinned_blocks=" << s_blockpools[i]->pinned_blocks()
107 <<
" writing_blocks=" << s_blockpools[i]->writing_blocks()
108 <<
" swapped_blocks=" << s_blockpools[i]->swapped_blocks()
109 <<
" reading_blocks=" << s_blockpools[i]->reading_blocks();
115 std::this_thread::sleep_for(std::chrono::milliseconds(100));
122 struct BlockPool::Counter {
128 operator size_t ()
const {
return value; }
135 Counter& operator -= (
size_t v) {
141 size_t hmax_update() {
148 struct BlockPool::PinCount {
151 size_t total_pins_ = 0;
154 Counter total_pinned_bytes_;
160 size_t max_pinned_bytes = 0;
164 std::vector<size_t> pin_count_;
167 std::vector<size_t> pinned_bytes_;
173 void Increment(
size_t local_worker_id,
size_t size);
176 void Decrement(
size_t local_worker_id,
size_t size);
179 void AssertZero()
const;
183 : pin_count_(workers_per_host),
184 pinned_bytes_(workers_per_host) { }
186 void BlockPool::PinCount::Increment(
size_t local_worker_id,
size_t size) {
187 ++pin_count_[local_worker_id];
188 pinned_bytes_[local_worker_id] += size;
190 total_pinned_bytes_ += size;
191 max_pins =
std::max(max_pins, total_pins_);
192 max_pinned_bytes =
std::max(max_pinned_bytes, total_pinned_bytes_.value);
195 void BlockPool::PinCount::Decrement(
size_t local_worker_id,
size_t size) {
197 die_unless(pinned_bytes_[local_worker_id] >= size);
201 --pin_count_[local_worker_id];
202 pinned_bytes_[local_worker_id] -= size;
204 total_pinned_bytes_ -= size;
207 void BlockPool::PinCount::AssertZero()
const {
210 for (
const size_t& pc : pin_count_)
212 for (
const size_t& pb : pinned_bytes_)
216 std::ostream&
operator << (std::ostream& os,
const BlockPool::PinCount& p) {
217 os <<
" total_pins_=" << p.total_pins_
218 <<
" total_pinned_bytes_=" << p.total_pinned_bytes_
219 <<
" pin_count_=[" <<
tlx::join(
',', p.pin_count_) <<
"]" 220 <<
" pinned_bytes_=[" <<
tlx::join(
',', p.pinned_bytes_) <<
"]" 221 <<
" max_pin=" << p.max_pins
222 <<
" max_pinned_bytes=" << p.max_pinned_bytes;
232 std::hash<ByteBlock*>, std::equal_to<>,
238 std::hash<ByteBlock*>, std::equal_to<>,
240 std::pair<ByteBlock* const, PinRequestPtr> > >;
242 class BlockPool::Data
246 std::condition_variable cv_memory_change_;
250 size_t soft_ram_limit_;
254 size_t hard_ram_limit_;
257 bool notify_em_used_ =
false;
271 ByteBlock*, std::hash<ByteBlock*>, std::equal_to<>,
288 std::atomic<size_t> next_file_id_ { 0 };
291 Counter unpinned_bytes_;
297 size_t requested_bytes_ = 0;
300 Counter writing_bytes_;
303 Counter swapped_bytes_;
306 Counter reading_bytes_;
309 size_t total_byte_blocks_ = 0;
312 std::condition_variable cv_total_byte_blocks_;
315 Counter total_bytes_;
318 size_t max_total_bytes_ = 0;
322 Counter total_ram_bytes_;
325 std::chrono::steady_clock::time_point tp_last_
326 = std::chrono::steady_clock::now();
331 size_t workers_per_host)
332 : soft_ram_limit_(soft_ram_limit),
333 hard_ram_limit_(hard_ram_limit),
336 pin_count_(workers_per_host) { }
340 void IntRequestInternalMemory(std::unique_lock<std::mutex>& lock,
size_t size);
344 void IntReleaseInternalMemory(
size_t size);
349 BlockPool& bp, ByteBlock* block_ptr,
size_t local_worker_id);
352 foxxll::request_ptr IntEvictBlockLRU();
356 foxxll::request_ptr IntEvictBlock(ByteBlock* block_ptr);
362 size_t int_total_blocks() noexcept;
365 size_t int_total_bytes() noexcept;
374 :
BlockPool(0, 0, nullptr, nullptr, workers_per_host) { }
378 size_t workers_per_host)
383 *this, soft_ram_limit, hard_ram_limit, workers_per_host)) {
387 std::unique_lock<std::recursive_mutex> lock(s_new_mutex);
389 s_blockpools.reserve(32);
390 s_blockpools.push_back(
this);
395 d_->io_stats_first_ =
d_->io_stats_prev_ =
398 logger_ <<
"class" <<
"BlockPool" 399 <<
"event" <<
"create" 400 <<
"soft_ram_limit" << soft_ram_limit
405 std::unique_lock<std::mutex> lock(
mutex_);
408 while (
d_->writing_.begin() !=
d_->writing_.end()) {
410 ByteBlock* block_ptr =
d_->writing_.begin()->first;
411 foxxll::request_ptr req =
d_->writing_.begin()->second;
414 <<
"BlockPool::~BlockPool() block=" << block_ptr
415 <<
" is currently begin written to external memory, canceling.";
419 if (!req->cancel()) {
422 <<
"BlockPool::~BlockPool() block=" << block_ptr
423 <<
" is currently begin written to external memory," 424 <<
" cancel failed, waiting.";
432 <<
"BlockPool::PinBlock block=" << block_ptr
433 <<
" is currently begin written to external memory," 434 <<
" cancel/wait done.";
440 while (
d_->reading_.begin() !=
d_->reading_.end()) {
442 ByteBlock* block_ptr =
d_->reading_.begin()->first;
443 PinRequestPtr read =
d_->reading_.begin()->second;
446 <<
"BlockPool::~BlockPool() block=" << block_ptr
447 <<
" is currently begin read from external memory, waiting.";
460 while (
d_->total_byte_blocks_ != 0)
461 d_->cv_total_byte_blocks_.wait(lock);
463 d_->pin_count_.AssertZero();
470 <<
" max_pin=" <<
d_->pin_count_.max_pins
471 <<
" max_pinned_bytes=" <<
d_->pin_count_.max_pinned_bytes;
473 logger_ <<
"class" <<
"BlockPool" 474 <<
"event" <<
"destroy" 475 <<
"max_pins" <<
d_->pin_count_.max_pins
476 <<
"max_pinned_bytes" <<
d_->pin_count_.max_pinned_bytes;
478 std::unique_lock<std::recursive_mutex> s_new_lock(s_new_mutex);
480 std::find(s_blockpools.begin(), s_blockpools.end(),
this));
486 std::unique_lock<std::mutex> lock(
mutex_);
491 &&
d_->hard_ram_limit_ != 0) {
492 die(
"BlockPool: requested unaligned block_size=" << size <<
"." <<
496 d_->IntRequestInternalMemory(lock, size);
501 Byte* data =
d_->aligned_alloc_.allocate(size);
503 <<
"ByteBlock aligned_alloc: " << (
void*)data <<
" size " << size;
508 mem::GPool().make<ByteBlock>(
this, data, size), local_worker_id);
509 ++
d_->total_byte_blocks_;
510 d_->total_bytes_ += size;
511 d_->max_total_bytes_ =
std::max(
d_->max_total_bytes_,
d_->total_bytes_.value);
514 d_->pin_count_.Increment(local_worker_id, size);
517 <<
"BlockPool::AllocateBlock()" 518 <<
" ptr=" << block_ptr.
get()
520 <<
" local_worker_id=" << local_worker_id
521 <<
" total_blocks()=" <<
d_->int_total_blocks()
522 <<
" total_bytes()=" <<
d_->int_total_bytes()
530 std::unique_lock<std::mutex> lock(
mutex_);
533 mem::GPool().make<ByteBlock>(
this, file, offset, size));
534 ++
d_->total_byte_blocks_;
535 d_->max_total_bytes_ =
std::max(
d_->max_total_bytes_,
d_->total_bytes_.value);
536 d_->total_bytes_ += size;
539 <<
"BlockPool::MapExternalBlock()" 540 <<
" ptr=" << block_ptr.
get()
541 <<
" offset=" << offset
550 std::unique_lock<std::mutex> lock(
mutex_);
554 if (block_ptr->
pin_count_[local_worker_id] > 0) {
563 <<
"BlockPool::PinBlock block=" << &block
564 <<
" already pinned by thread";
580 <<
"BlockPool::PinBlock block=" << block
581 <<
" already pinned by another thread" 585 d_->pin_count_.Increment(local_worker_id, block_ptr->
size());
592 WritingMap::iterator write_it;
593 while ((write_it =
d_->writing_.find(block_ptr)) !=
d_->writing_.end()) {
596 <<
"BlockPool::PinBlock() block=" << block_ptr
597 <<
" is currently begin written to external memory, canceling.";
603 foxxll::request_ptr req = write_it->second;
606 if (!req->cancel()) {
609 <<
"BlockPool::PinBlock() block=" << block_ptr
610 <<
" is currently begin written to external memory, " 611 <<
"cancel failed, waiting.";
620 <<
"BlockPool::PinBlock() block=" << block_ptr
621 <<
" is currently begin written to external memory, " 622 <<
"cancel/wait done.";
630 ReadingMap::iterator read_it =
d_->reading_.find(block_ptr);
631 if (read_it !=
d_->reading_.end())
632 return read_it->second;
640 d_->unpinned_blocks_.erase(block_ptr);
641 d_->unpinned_bytes_ -= block_ptr->
size();
644 d_->pin_count_.Increment(local_worker_id, block_ptr->
size());
647 <<
"BlockPool::PinBlock block=" << &block
648 <<
" pinned from internal memory" 661 d_->IntRequestInternalMemory(lock, block_ptr->
size());
664 d_->pin_count_.Increment(local_worker_id, block_ptr->
size());
670 this,
PinnedBlock(block, local_worker_id),
false));
671 d_->reading_[block_ptr] = read;
675 Byte* data = read->byte_block()->data_ =
676 d_->aligned_alloc_.allocate(block_ptr->
size());
680 d_->swapped_.erase(block_ptr);
681 d_->swapped_bytes_ -= block_ptr->
size();
685 <<
"BlockPool::PinBlock block=" << block
686 <<
" requested from external memory" 695 foxxll::completion_handler::make<
698 d_->reading_bytes_ += block_ptr->
size();
707 if (num_files >= avail_blocks) {
710 return std::make_pair(avail_blocks, 0u);
715 return std::make_pair(num_files, avail_bytes / num_files);
720 return block_pool_->OnReadComplete(
this, req, success);
725 std::unique_lock<std::mutex> lock(
mutex_);
728 size_t block_size = block_ptr->
size();
731 <<
"OnReadComplete():" 732 <<
" req " << req <<
" block " << *block_ptr
733 <<
" size " << block_size <<
" done," 734 <<
" from " << block_ptr->
em_bid_ <<
" success = " << success;
743 d_->swapped_.insert(block_ptr);
744 d_->swapped_bytes_ += block_size;
749 <<
"ByteBlock deallocate" 750 << (
void*)read->
byte_block()->data_ <<
"size" << block_size;
751 d_->aligned_alloc_.deallocate(read->
byte_block()->data_, block_size);
753 d_->IntReleaseInternalMemory(block_size);
767 d_->bm_->delete_block(block_ptr->
em_bid_);
773 d_->reading_bytes_ -= block_size;
789 auto it =
d_->reading_.find(block_ptr);
791 PinRequestPtr holder = std::move(it->second);
792 d_->reading_.erase(it);
797 std::unique_lock<std::mutex> lock(
mutex_);
810 <<
"BlockPool::IncBlockPinCount()" 811 <<
" byte_block=" << block_ptr
812 <<
" ++block.pin_count[" << local_worker_id <<
"]=" 814 <<
" ++block.total_pins_=" << block_ptr->
total_pins_ 819 std::unique_lock<std::mutex> lock(
mutex_);
825 size_t p = --block_ptr->
pin_count_[local_worker_id];
829 <<
"BlockPool::DecBlockPinCount()" 830 <<
" byte_block=" << block_ptr
831 <<
" --block.pin_count[" << local_worker_id <<
"]=" << p
832 <<
" --block.total_pins_=" << tp
833 <<
" local_worker_id=" << local_worker_id;
836 d_->IntUnpinBlock(*
this, block_ptr, local_worker_id);
839 void BlockPool::Data::IntUnpinBlock(
840 BlockPool& bp, ByteBlock* block_ptr,
size_t local_worker_id) {
846 pin_count_.Decrement(local_worker_id, block_ptr->
size());
850 <<
"BlockPool::IntUnpinBlock()" 851 <<
" --block.total_pins_=" << block_ptr->
total_pins_;
856 die_unless(!unpinned_blocks_.exists(block_ptr));
857 unpinned_blocks_.put(block_ptr);
858 unpinned_bytes_ += block_ptr->
size();
861 <<
"BlockPool::IntUnpinBlock()" 862 <<
" byte_block=" << block_ptr
864 <<
" allow swap out.";
868 std::unique_lock<std::mutex> lock(
mutex_);
869 return d_->int_total_blocks();
872 size_t BlockPool::Data::int_total_blocks() noexcept {
874 LOG <<
"BlockPool::total_blocks()" 875 <<
" pinned_blocks_=" << pin_count_.total_pins_
876 <<
" unpinned_blocks_=" << unpinned_blocks_.size()
877 <<
" writing_.size()=" << writing_.size()
878 <<
" swapped_.size()=" << swapped_.size()
879 <<
" reading_.size()=" << reading_.size();
881 return pin_count_.total_pins_
882 + unpinned_blocks_.size() + writing_.size()
883 + swapped_.size() + reading_.size();
887 std::unique_lock<std::mutex> lock(
mutex_);
888 return d_->hard_ram_limit_;
892 std::unique_lock<std::mutex> lock(
mutex_);
893 return d_->int_total_bytes();
897 std::unique_lock<std::mutex> lock(
mutex_);
898 return d_->max_total_bytes_;
901 size_t BlockPool::Data::int_total_bytes() noexcept {
902 LOG <<
"BlockPool::total_bytes()" 903 <<
" pinned_bytes_=" << pin_count_.total_pinned_bytes_
904 <<
" unpinned_bytes_=" << unpinned_bytes_
905 <<
" writing_bytes_=" << writing_bytes_
906 <<
" swapped_bytes_=" << swapped_bytes_
907 <<
" reading_bytes_=" << reading_bytes_;
909 return pin_count_.total_pinned_bytes_
910 + unpinned_bytes_ + writing_bytes_
911 + swapped_bytes_ + reading_bytes_;
915 std::unique_lock<std::mutex> lock(
mutex_);
916 return d_->pin_count_.total_pins_;
920 std::unique_lock<std::mutex> lock(
mutex_);
921 return d_->unpinned_blocks_.size();
925 std::unique_lock<std::mutex> lock(
mutex_);
926 return d_->writing_.size();
930 std::unique_lock<std::mutex> lock(
mutex_);
931 return d_->swapped_.size();
935 std::unique_lock<std::mutex> lock(
mutex_);
936 return d_->reading_.size();
942 std::unique_lock<std::mutex> lock(
mutex_);
945 <<
"BlockPool::DestroyBlock() block_ptr=" << block_ptr
946 <<
" byte_block=" << *block_ptr;
952 block_ptr->pin_count_.clear();
955 if (block_ptr->in_memory())
958 WritingMap::iterator it =
d_->writing_.find(block_ptr);
959 if (it !=
d_->writing_.end()) {
962 foxxll::request_ptr req = it->second;
966 <<
" canceling write I/O request " << req
967 <<
" for block " << *block_ptr;
971 if (!req->cancel()) {
986 ReadingMap::iterator it =
d_->reading_.find(block_ptr);
987 if (it !=
d_->reading_.end()) {
990 foxxll::request_ptr req = it->second->req_;
994 <<
" canceling read I/O request " << req
995 <<
" for block " << *block_ptr;
999 if (!req->cancel()) {
1014 if (block_ptr->ext_file_ && block_ptr->in_memory())
1017 <<
"BlockPool::DestroyBlock() block_ptr=" << block_ptr
1018 <<
" external block, in memory: release memory.";
1021 d_->unpinned_blocks_.erase(block_ptr);
1022 d_->unpinned_bytes_ -= block_ptr->size();
1026 <<
"ByteBlock deallocate" 1027 << (
void*)block_ptr->data_ <<
"size" << block_ptr->size();
1028 d_->aligned_alloc_.deallocate(block_ptr->data_, block_ptr->size());
1029 block_ptr->data_ =
nullptr;
1031 d_->IntReleaseInternalMemory(block_ptr->size());
1033 else if (block_ptr->ext_file_)
1036 <<
"BlockPool::DestroyBlock() block_ptr=" << block_ptr
1037 <<
" external block, but not in memory: nothing to do, thus just" 1038 <<
" delete the reference";
1040 else if (block_ptr->in_memory())
1043 <<
"BlockPool::DestroyBlock() block_ptr=" << block_ptr
1044 <<
" unpinned block in memory, remove from list";
1046 if (
d_->unpinned_blocks_.exists(block_ptr)) {
1047 d_->unpinned_blocks_.erase(block_ptr);
1048 d_->unpinned_bytes_ -= block_ptr->size();
1053 <<
"ByteBlock deallocate" 1054 << (
void*)block_ptr->data_ <<
"size" << block_ptr->size();
1055 d_->aligned_alloc_.deallocate(block_ptr->data_, block_ptr->size());
1056 block_ptr->data_ =
nullptr;
1058 d_->IntReleaseInternalMemory(block_ptr->size());
1063 <<
"BlockPool::DestroyBlock() block_ptr=" << block_ptr
1064 <<
" block in external memory, delete block";
1066 auto it =
d_->swapped_.find(block_ptr);
1069 d_->swapped_.erase(it);
1070 d_->swapped_bytes_ -= block_ptr->size();
1072 d_->bm_->delete_block(block_ptr->em_bid_);
1076 assert(
d_->total_byte_blocks_ > 0);
1077 assert(
d_->total_bytes_ >= block_ptr->size());
1078 --
d_->total_byte_blocks_;
1079 d_->total_bytes_ -= block_ptr->size();
1080 d_->cv_total_byte_blocks_.notify_all();
1084 std::unique_lock<std::mutex> lock(
mutex_);
1085 return d_->IntRequestInternalMemory(lock, size);
1088 void BlockPool::Data::IntRequestInternalMemory(
1089 std::unique_lock<std::mutex>& lock,
size_t size) {
1091 requested_bytes_ += size;
1094 <<
"BlockPool::RequestInternalMemory()" 1096 <<
" total_ram_bytes_=" << total_ram_bytes_
1097 <<
" writing_bytes_=" << writing_bytes_
1098 <<
" requested_bytes_=" << requested_bytes_
1099 <<
" soft_ram_limit_=" << soft_ram_limit_
1100 <<
" hard_ram_limit_=" << hard_ram_limit_
1102 <<
" unpinned_blocks_.size()=" << unpinned_blocks_.size()
1103 <<
" swapped_.size()=" << swapped_.size();
1105 while (soft_ram_limit_ != 0 &&
1106 unpinned_blocks_.size() &&
1107 total_ram_bytes_ + requested_bytes_ > soft_ram_limit_ + writing_bytes_)
1114 static constexpr
size_t max_retry = 60;
1115 size_t retry = max_retry;
1116 size_t last_writing_bytes = 0;
1119 while (hard_ram_limit_ != 0 && total_ram_bytes_ + size > hard_ram_limit_)
1121 while (hard_ram_limit_ != 0 &&
1122 unpinned_blocks_.size() &&
1123 total_ram_bytes_ + requested_bytes_ > hard_ram_limit_ + writing_bytes_)
1129 cv_memory_change_.wait_for(lock, std::chrono::seconds(1));
1132 <<
"BlockPool::RequestInternalMemory() waiting for memory" 1133 <<
" total_ram_bytes_=" << total_ram_bytes_
1134 <<
" writing_bytes_=" << writing_bytes_
1135 <<
" requested_bytes_=" << requested_bytes_
1136 <<
" soft_ram_limit_=" << soft_ram_limit_
1137 <<
" hard_ram_limit_=" << hard_ram_limit_
1139 <<
" unpinned_blocks_.size()=" << unpinned_blocks_.size()
1140 <<
" swapped_.size()=" << swapped_.size();
1142 if (writing_bytes_ == 0 &&
1143 total_ram_bytes_ + requested_bytes_ > hard_ram_limit_) {
1145 LOG1 <<
"abort() due to out-of-pinned-memory ???" 1146 <<
" total_ram_bytes_=" << total_ram_bytes_
1147 <<
" writing_bytes_=" << writing_bytes_
1148 <<
" requested_bytes_=" << requested_bytes_
1149 <<
" soft_ram_limit_=" << soft_ram_limit_
1150 <<
" hard_ram_limit_=" << hard_ram_limit_
1152 <<
" unpinned_blocks_.size()=" << unpinned_blocks_.size()
1153 <<
" swapped_.size()=" << swapped_.size();
1155 if (writing_bytes_ == last_writing_bytes) {
1160 last_writing_bytes = writing_bytes_;
1166 requested_bytes_ -= size;
1167 total_ram_bytes_ += size;
1171 std::unique_lock<std::mutex> lock(
mutex_);
1174 <<
"BlockPool::AdviseFree() advice to free memory" 1176 <<
" total_ram_bytes_=" <<
d_->total_ram_bytes_
1177 <<
" writing_bytes_=" <<
d_->writing_bytes_
1178 <<
" requested_bytes_=" <<
d_->requested_bytes_
1179 <<
" soft_ram_limit_=" <<
d_->soft_ram_limit_
1180 <<
" hard_ram_limit_=" <<
d_->hard_ram_limit_
1182 <<
" unpinned_blocks_.size()=" <<
d_->unpinned_blocks_.size()
1183 <<
" swapped_.size()=" <<
d_->swapped_.size();
1185 while (
d_->soft_ram_limit_ != 0 &&
d_->unpinned_blocks_.size() &&
1186 d_->total_ram_bytes_ +
d_->requested_bytes_ + size >
d_->hard_ram_limit_ +
d_->writing_bytes_)
1189 d_->IntEvictBlockLRU();
1193 std::unique_lock<std::mutex> lock(
mutex_);
1194 return d_->IntReleaseInternalMemory(size);
1197 void BlockPool::Data::IntReleaseInternalMemory(
size_t size) {
1200 <<
"BlockPool::IntReleaseInternalMemory()" 1202 <<
" total_ram_bytes_=" << total_ram_bytes_;
1205 total_ram_bytes_ -= size;
1207 cv_memory_change_.notify_all();
1211 std::unique_lock<std::mutex> lock(
mutex_);
1216 d_->unpinned_blocks_.erase(block_ptr);
1217 d_->unpinned_bytes_ -= block_ptr->
size();
1219 d_->IntEvictBlock(block_ptr);
1223 std::unique_lock<std::mutex> lock(
mutex_);
1224 if (!
d_->writing_.size())
return foxxll::request_ptr();
1225 return d_->writing_.begin()->second;
1229 std::unique_lock<std::mutex> lock(
mutex_);
1230 return d_->IntEvictBlockLRU();
1233 foxxll::request_ptr BlockPool::Data::IntEvictBlockLRU() {
1235 if (!unpinned_blocks_.size())
return foxxll::request_ptr();
1237 ByteBlock* block_ptr = unpinned_blocks_.pop();
1239 unpinned_bytes_ -= block_ptr->
size();
1241 return IntEvictBlock(block_ptr);
1244 foxxll::request_ptr BlockPool::Data::IntEvictBlock(ByteBlock* block_ptr) {
1252 <<
"EvictBlock(): " << block_ptr <<
" - " << *block_ptr
1253 <<
" from ext_file " << block_ptr->
ext_file_;
1257 <<
"ByteBlock deallocate" 1258 << (
void*)block_ptr->
data_ <<
"size" << block_ptr->
size();
1259 aligned_alloc_.deallocate(block_ptr->
data_, block_ptr->
size());
1260 block_ptr->
data_ =
nullptr;
1262 IntReleaseInternalMemory(block_ptr->
size());
1266 if (!notify_em_used_) {
1267 std::cerr <<
"Thrill: evicting first Block to external memory. " 1268 "Be aware, that unexpected" << std::endl;
1269 std::cerr <<
"Thrill: use of external memory may lead to " 1270 "disappointingly slow performance." << std::endl;
1271 notify_em_used_ =
true;
1281 <<
"EvictBlock(): " << block_ptr <<
" - " << *block_ptr
1282 <<
" to em_bid " << block_ptr->
em_bid_;
1284 writing_bytes_ += block_ptr->
size();
1287 foxxll::request_ptr req =
1291 foxxll::completion_handler::make<
1294 return (writing_[block_ptr] = std::move(req));
1299 std::unique_lock<std::mutex> lock(
mutex_);
1302 <<
"OnWriteComplete(): request " << req <<
" done," 1303 <<
" block " << *block_ptr <<
" to " << block_ptr->
em_bid_ 1304 <<
" success = " << success;
1309 d_->writing_bytes_ -= block_ptr->
size();
1319 d_->unpinned_blocks_.put(block_ptr);
1320 d_->unpinned_bytes_ += block_ptr->
size();
1323 d_->bm_->delete_block(block_ptr->
em_bid_);
1330 d_->swapped_.insert(block_ptr);
1331 d_->swapped_bytes_ += block_ptr->
size();
1335 <<
"ByteBlock deallocate" 1336 << (
void*)block_ptr->
data_ <<
"size" << block_ptr->
size();
1337 d_->aligned_alloc_.deallocate(block_ptr->
data_, block_ptr->
size());
1338 block_ptr->
data_ =
nullptr;
1340 d_->IntReleaseInternalMemory(block_ptr->
size());
1345 std::unique_lock<std::mutex> lock(
mutex_);
1350 d_->io_stats_prev_ = stnow;
1352 double elapsed =
static_cast<double>(
1353 std::chrono::duration_cast<std::chrono::microseconds>(
1354 tp -
d_->tp_last_).count()) / 1e6;
1360 size_t unpinned_bytes =
d_->unpinned_bytes_.hmax_update();
1361 size_t writing_bytes =
d_->writing_bytes_.hmax_update();
1362 size_t reading_bytes =
d_->reading_bytes_.hmax_update();
1363 size_t pinned_bytes =
d_->pin_count_.total_pinned_bytes_.hmax_update();
1365 logger_ <<
"class" <<
"BlockPool" 1366 <<
"event" <<
"profile" 1367 <<
"total_blocks" <<
d_->int_total_blocks()
1368 <<
"total_bytes" <<
d_->total_bytes_.hmax_update()
1369 <<
"max_total_bytes" <<
d_->max_total_bytes_
1370 <<
"total_ram_bytes" <<
d_->total_ram_bytes_.hmax_update()
1372 << (unpinned_bytes + pinned_bytes + writing_bytes + reading_bytes)
1373 <<
"pinned_blocks" <<
d_->pin_count_.total_pins_
1374 <<
"pinned_bytes" << pinned_bytes
1375 <<
"unpinned_blocks" <<
d_->unpinned_blocks_.size()
1376 <<
"unpinned_bytes" << unpinned_bytes
1377 <<
"swapped_blocks" <<
d_->swapped_.size()
1378 <<
"swapped_bytes" <<
d_->swapped_bytes_.hmax_update()
1379 <<
"max_pinned_blocks" <<
d_->pin_count_.max_pins
1380 <<
"max_pinned_bytes" <<
d_->pin_count_.max_pinned_bytes
1381 <<
"writing_blocks" <<
d_->writing_.size()
1382 <<
"writing_bytes" << writing_bytes
1383 <<
"reading_blocks" <<
d_->reading_.size()
1384 <<
"reading_bytes" << reading_bytes
1391 <<
"rd_speed" <<
static_cast<double>(stp.
get_read_bytes()) / elapsed
1395 <<
"disk_allocation" <<
d_->bm_->current_allocation();
1399 return ++
d_->next_file_id_;
static constexpr bool debug_mem
debug memory requests
std::unordered_map< ByteBlock *, PinRequestPtr, std::hash< ByteBlock * >, std::equal_to<>, mem::GPoolAllocator< std::pair< ByteBlock *const, PinRequestPtr > > > ReadingMap
type of set of ByteBlocks currently begin read from EM.
~BlockPool()
Checks that all blocks were freed.
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
void ReleaseInternalMemory(size_t size)
external_size_type offset
offset within the file of the block (uint64_t)
static constexpr bool debug_pin
debug block pinning:
static uint_pair max()
return an uint_pair instance containing the largest value possible
size_t total_blocks() noexcept
Total number of allocated blocks of this block pool.
friend class PinRequest
for calling OnReadComplete and access to mutex and cvs
tlx::counting_ptr< request > request_ptr
A reference counting pointer for request.
size_t max_total_bytes() noexcept
Maximum total number of bytes allocated in blocks of this block pool.
std::vector< size_t, mem::GPoolAllocator< size_t > > pin_count_
counts the number of pins in this block per thread_id.
void AdviseFree(size_t size)
size_t size() const
the block size
bool is_deleted() const
true if being deleted
This is an expected O(1) LRU cache which contains a set of key-only elements.
std::pair< size_t, size_t > MaxMergeDegreePrefetch(size_t num_files)
void OnWriteComplete(foxxll::request *req, bool success)
forwarded to block_pool_
void OnWriteComplete(ByteBlock *block_ptr, foxxll::request *req, bool success)
callback for async write of blocks during eviction
size_t hard_ram_limit() noexcept
Hard limit on amount of memory used for ByteBlock.
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
static void OurNewHandler()
void DecBlockPinCount(ByteBlock *block_ptr, size_t local_worker_id)
Decrement a ByteBlock's pin count and possibly unpin it.
Specialization of block identifier class (BID) for variable size block size.
#define sLOGC(cond)
Explicitly specify the condition for logging.
virtual request_ptr awrite(void *buffer, offset_type pos, size_type bytes, const completion_handler &on_complete=completion_handler())=0
size_t pinned_blocks() noexcept
Total number of pinned blocks of this block pool.
static bool is_power_of_two(int i)
does what it says: true if i is a power of two
external_size_type get_read_bytes() const
A pinned / pin-counted pointer to a ByteBlock.
void reset()
release contained pointer, frees object if this is the last reference.
static std::vector< BlockPool * > s_blockpools
static constexpr bool debug_blc
debug block life cycle output: create, destroy
BlockPool(size_t workers_per_host=1)
Creates a simple BlockPool for tests: allows only one thread, enforces no memory limitations, never swaps to disk.
A non-pinned counting pointer to a ByteBlock.
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
size_t local_worker_id_
thread id of holder of pin
virtual request_ptr aread(void *buffer, offset_type pos, size_type bytes, const completion_handler &on_complete=completion_handler())=0
mem::Manager mem_manager_
local Manager counting only ByteBlock allocations in internal memory.
PinRequestPtr PinBlock(const Block &block, size_t local_worker_id)
Pins a block by swapping it in if required.
void DestroyBlock(ByteBlock *block_ptr)
Destroys the block. Called by ByteBlockPtr's deleter.
#define THRILL_DEFAULT_ALIGN
size_t workers_per_host_
number of workers per host
void IncBlockPinCount(ByteBlock *block_ptr, size_t local_worker_id)
Increment a ByteBlock's pin count, requires the pin count to be > 0.
common::JsonLogger logger_
reference to HostContext's logger or a null sink
void malloc_tracker_print_status()
user function which prints current and peak allocation to stderr
void OnComplete(foxxll::request *req, bool success)
calls BlockPool::OnReadComplete used to tlx::delegate
PinnedByteBlockPtr AllocateByteBlock(size_t size, size_t local_worker_id)
Type * get() const noexcept
return the enclosed pointer.
std::condition_variable cv_read_complete_
static constexpr bool debug_em
debug block eviction: evict, write complete, read complete
uint_pair & operator+=(const uint_pair &b)
addition operator (uses 64-bit arithmetic)
foxxll::file_ptr ext_file_
static constexpr bool debug_alloc
debug block memory alloc and dealloc
unsigned get_write_count() const
void OnReadComplete(PinRequest *read, foxxll::request *req, bool success)
callback for async read of blocks for pin requests
Pool & GPool()
singleton instance of global pool for I/O data structures
external_size_type get_write_bytes() const
std::unordered_map< ByteBlock *, foxxll::request_ptr, std::hash< ByteBlock * >, std::equal_to<>, mem::GPoolAllocator< std::pair< ByteBlock *const, foxxll::request_ptr > > > WritingMap
type of set of ByteBlocks currently begin written to EM.
ByteBlockPtr & byte_block()
uint8_t Byte
type of underlying memory area
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
void check_errors()
Rises an exception if there were error with the I/O.
size_t workers_per_host() const
return number of workers per host
#define die_unequal(X, Y)
A ByteBlock is the basic storage units of containers like File, BlockQueue, etc.
static std::atomic< bool > in_new_handler
unique_ptr< T > make_unique(Manager &manager, Args &&... args)
make_unique with Manager tracking
size_t swapped_blocks() noexcept
Total number of swapped blocks.
size_t writing_blocks() noexcept
Total number of blocks currently begin written.
foxxll::request_ptr EvictBlockLRU()
static instance_pointer get_instance()
return instance or create base instance if empty
const ByteBlockPtr & byte_block() const
access to byte_block_
unsigned get_read_count() const
bool in_memory() const
true if block resides in memory
size_t total_bytes() noexcept
Total number of bytes allocated in blocks of this block pool.
size_t pin_count(size_t local_worker_id) const
return current pin count
file * storage
pointer to the file of the block
common::JsonLogger & logger()
Returns logger_.
High-performance smart pointer used as a wrapping reference counting pointer.
friend std::ostream & operator<<(std::ostream &os, const PinCount &p)
make ostream-able
size_t reading_blocks() noexcept
Total number of blocks currently begin read from EM.
void release_file_reference()
foxxll::request_ptr GetAnyWriting()
Return any currently being written block (for waiting on completion)
static std::recursive_mutex s_new_mutex
Object shared by allocators and other classes to track memory allocations.
A pinned / pin-counted derivative of a Block.
size_t next_file_id()
return next unique File id
std::atomic< bool > ready_
indication that the PinnedBlocks ready
Request object encapsulating basic properties like file and offset.
std::string join(char glue, const std::vector< std::string > &parts)
Join a vector of strings by some glue character between each pair from the sequence.
tlx::CountingPtr< PinRequest, mem::GPoolDeleter< PinRequest > > PinRequestPtr
size_t size
size of the block in bytes
JsonLogger is a receiver of JSON output objects for logging.
size_t unpinned_blocks() noexcept
Total number of unpinned blocks in memory of this block pool.
void IntIncBlockPinCount(ByteBlock *block_ptr, size_t local_worker_id)
Increment a ByteBlock's pin count - without locking the mutex.
void EvictBlock(ByteBlock *block_ptr)
friend class ByteBlock
for calling OnWriteComplete
#define LOG
Default logging method: output if the local debug variable is true.
void RunTask(const std::chrono::steady_clock::time_point &tp) final
method called by ProfileThread.
std::mutex mutex_
locked before internal state is changed
std::unique_ptr< Data > d_
pimpl data structure
#define LOGC(cond)
Explicitly specify the condition for logging.
const ByteBlockPtr & byte_block() const
access to byte_block_
void RequestInternalMemory(size_t size)
ByteBlockPtr MapExternalBlock(const foxxll::file_ptr &file, uint64_t offset, size_t size)