Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
context.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/context.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/api/context.hpp>
13 
14 #include <thrill/api/dia_base.hpp>
16 #include <thrill/common/logger.hpp>
17 #include <thrill/common/math.hpp>
20 #include <thrill/common/string.hpp>
22 #include <thrill/vfs/file_io.hpp>
23 
24 #include <foxxll/io/iostats.hpp>
25 #include <tlx/math/abs_diff.hpp>
26 #include <tlx/port/setenv.hpp>
29 #include <tlx/string/split.hpp>
30 
31 // mock net backend is always available -tb :)
33 
34 #if THRILL_HAVE_NET_TCP
37 #endif
38 
39 #if THRILL_HAVE_NET_MPI
41 #include <thrill/net/mpi/group.hpp>
42 #endif
43 
44 #if THRILL_HAVE_NET_IB
45 #include <thrill/net/ib/group.hpp>
46 #endif
47 
48 #if __linux__
49 
50 // linux-specific process control
51 #include <sys/prctl.h>
52 
53 // for calling getrlimit() to determine memory limit
54 #include <sys/resource.h>
55 #include <sys/time.h>
56 
57 #endif
58 
59 #if __APPLE__
60 
61 // for sysctl()
62 #include <sys/sysctl.h>
63 #include <sys/types.h>
64 
65 #endif
66 
67 #if defined(_MSC_VER)
68 
69 // for detecting amount of physical memory
70 #include <windows.h>
71 
72 #endif
73 
74 #include <algorithm>
75 #include <csignal>
76 #include <iostream>
77 #include <memory>
78 #include <string>
79 #include <thread>
80 #include <tuple>
81 #include <utility>
82 #include <vector>
83 
84 namespace thrill {
85 namespace api {
86 
87 /******************************************************************************/
88 // Generic Network Construction
89 
90 //! Generic network constructor for net backends supporting loopback tests.
91 template <typename NetGroup>
92 static inline
93 std::vector<std::unique_ptr<HostContext> >
95  const MemoryConfig& mem_config,
96  size_t num_hosts, size_t workers_per_host) {
97 
98  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
99 
100  // construct three full mesh loopback cliques, deliver net::Groups.
101  std::array<std::vector<std::unique_ptr<NetGroup> >, kGroupCount> group;
102 
103  for (size_t g = 0; g < kGroupCount; ++g) {
104  group[g] = NetGroup::ConstructLoopbackMesh(num_hosts);
105  }
106 
107  std::vector<std::unique_ptr<net::DispatcherThread> > dispatcher;
108  for (size_t h = 0; h < num_hosts; ++h) {
109  dispatcher.emplace_back(
110  std::make_unique<net::DispatcherThread>(
111  std::make_unique<typename NetGroup::Dispatcher>(), h));
112  }
113 
114  // construct host context
115 
116  std::vector<std::unique_ptr<HostContext> > host_context;
117 
118  for (size_t h = 0; h < num_hosts; h++) {
119  std::array<net::GroupPtr, kGroupCount> host_group = {
120  { std::move(group[0][h]), std::move(group[1][h]) }
121  };
122 
123  host_context.emplace_back(
124  std::make_unique<HostContext>(
125  h, mem_config, std::move(dispatcher[h]),
126  std::move(host_group), workers_per_host));
127  }
128 
129  return host_context;
130 }
131 
132 //! Generic runner for backends supporting loopback tests.
133 template <typename NetGroup>
134 static inline void
136  const MemoryConfig& mem_config,
137  size_t num_hosts, size_t workers_per_host, size_t core_offset,
138  const std::function<void(Context&)>& job_startpoint) {
139 
140  MemoryConfig host_mem_config = mem_config.divide(num_hosts);
141  mem_config.print(workers_per_host);
142 
143  // construct a mock network of hosts
144  typename NetGroup::Dispatcher dispatcher;
145 
146  std::vector<std::unique_ptr<HostContext> > host_contexts =
147  ConstructLoopbackHostContexts<NetGroup>(
148  host_mem_config, num_hosts, workers_per_host);
149 
150  // launch thread for each of the workers on this host.
151  std::vector<std::thread> threads(num_hosts* workers_per_host);
152 
153  for (size_t host = 0; host < num_hosts; ++host) {
154  std::string log_prefix = "host " + std::to_string(host);
155  for (size_t worker = 0; worker < workers_per_host; ++worker) {
156  size_t id = host * workers_per_host + worker;
157  threads[id] = common::CreateThread(
158  [&host_contexts, &job_startpoint, host, worker, log_prefix] {
159  Context ctx(*host_contexts[host], worker);
161  log_prefix + " worker " + std::to_string(worker));
162 
163  ctx.Launch(job_startpoint);
164  });
165  common::SetCpuAffinity(threads[id], core_offset + id);
166  }
167  }
168 
169  // join worker threads
170  for (size_t i = 0; i < num_hosts * workers_per_host; i++) {
171  threads[i].join();
172  }
173 }
174 
175 /******************************************************************************/
176 // Other Configuration Initializations
177 
178 static inline bool SetupBlockSize() {
179 
180  const char* env_block_size = getenv("THRILL_BLOCK_SIZE");
181  if (env_block_size == nullptr || *env_block_size == 0) return true;
182 
183  char* endptr;
184  data::default_block_size = std::strtoul(env_block_size, &endptr, 10);
185 
186  if (endptr == nullptr || *endptr != 0 || data::default_block_size == 0) {
187  std::cerr << "Thrill: environment variable"
188  << " THRILL_BLOCK_SIZE=" << env_block_size
189  << " is not a valid number."
190  << std::endl;
191  return false;
192  }
193 
195 
196  std::cerr << "Thrill: setting default_block_size = "
198  << std::endl;
199 
200  return true;
201 }
202 
203 static inline size_t FindWorkersPerHost(
204  const char*& str_workers_per_host, const char*& env_workers_per_host) {
205 
206  char* endptr;
207 
208  // first check THRILL_WORKERS_PER_HOST
209 
210  str_workers_per_host = "THRILL_WORKERS_PER_HOST";
211  env_workers_per_host = getenv(str_workers_per_host);
212 
213  if (env_workers_per_host && *env_workers_per_host) {
214  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
215  if (!endptr || *endptr != 0 || result == 0) {
216  std::cerr << "Thrill: environment variable"
217  << ' ' << str_workers_per_host
218  << '=' << env_workers_per_host
219  << " is not a valid number of workers per host."
220  << std::endl;
221  return 0;
222  }
223  else {
224  return result;
225  }
226  }
227 
228  // second check: look for OMP_NUM_THREADS
229 
230  str_workers_per_host = "OMP_NUM_THREADS";
231  env_workers_per_host = getenv(str_workers_per_host);
232 
233  if (env_workers_per_host && *env_workers_per_host) {
234  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
235  if (!endptr || *endptr != 0 || result == 0) {
236  std::cerr << "Thrill: environment variable"
237  << ' ' << str_workers_per_host
238  << '=' << env_workers_per_host
239  << " is not a valid number of workers per host."
240  << std::endl;
241  // fall through, try next variable
242  }
243  else {
244  return result;
245  }
246  }
247 
248  // third check: look for SLURM_CPUS_ON_NODE
249 
250  str_workers_per_host = "SLURM_CPUS_ON_NODE";
251  env_workers_per_host = getenv(str_workers_per_host);
252 
253  if (env_workers_per_host && *env_workers_per_host) {
254  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
255  if (!endptr || *endptr != 0 || result == 0) {
256  std::cerr << "Thrill: environment variable"
257  << ' ' << str_workers_per_host
258  << '=' << env_workers_per_host
259  << " is not a valid number of workers per host."
260  << std::endl;
261  // fall through, try next variable
262  }
263  else {
264  return result;
265  }
266  }
267 
268  // last check: return std::thread::hardware_concurrency()
269 
270  return std::thread::hardware_concurrency();
271 }
272 
273 static inline bool Initialize() {
274 
275  if (!SetupBlockSize()) return false;
276 
277  vfs::Initialize();
278 
279  return true;
280 }
281 
282 static inline bool Deinitialize() {
283 
285 
286  return true;
287 }
288 
289 /******************************************************************************/
290 // Constructions using TestGroup (either mock or tcp-loopback) for local testing
291 
292 #if defined(_MSC_VER)
294 #else
296 #endif
297 
298 void RunLocalMock(const MemoryConfig& mem_config,
299  size_t num_hosts, size_t workers_per_host,
300  const std::function<void(Context&)>& job_startpoint) {
301 
302  return RunLoopbackThreads<TestGroup>(
303  mem_config, num_hosts, workers_per_host, 0, job_startpoint);
304 }
305 
306 std::vector<std::unique_ptr<HostContext> >
307 HostContext::ConstructLoopback(size_t num_hosts, size_t workers_per_host) {
308 
309  // set fixed amount of RAM for testing
311  mem_config.setup(4 * 1024 * 1024 * 1024llu);
312  mem_config.verbose_ = false;
313 
314  return ConstructLoopbackHostContexts<TestGroup>(
315  mem_config, num_hosts, workers_per_host);
316 }
317 
318 void RunLocalTests(const std::function<void(Context&)>& job_startpoint) {
319  // set fixed amount of RAM for testing
320  RunLocalTests(4 * 1024 * 1024 * 1024llu, job_startpoint);
321 }
322 
324  size_t ram, const std::function<void(Context&)>& job_startpoint) {
325 
326  // discard json log
327  tlx::setenv("THRILL_LOG", "", /* overwrite */ 1);
328 
329  // set fixed amount of RAM for testing
330  MemoryConfig mem_config;
331  mem_config.verbose_ = false;
332  mem_config.setup(ram);
333 
334  static constexpr size_t num_hosts_list[] = { 1, 2, 5, 8 };
335  static constexpr size_t num_workers_list[] = { 1, 3 };
336 
337  for (const size_t& num_hosts : num_hosts_list) {
338  for (const size_t& workers_per_host : num_workers_list) {
339  RunLocalMock(mem_config, num_hosts, workers_per_host,
340  job_startpoint);
341  }
342  }
343 }
344 
345 void RunLocalSameThread(const std::function<void(Context&)>& job_startpoint) {
346 
347  size_t my_host_rank = 0;
348  size_t workers_per_host = 1;
349  size_t num_hosts = 1;
350  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
351 
352  // set fixed amount of RAM for testing
353  MemoryConfig mem_config;
354  mem_config.verbose_ = false;
355  mem_config.setup(4 * 1024 * 1024 * 1024llu);
356  mem_config.print(workers_per_host);
357 
358  // construct two full mesh connection cliques, deliver net::tcp::Groups.
359  std::array<std::vector<std::unique_ptr<TestGroup> >, kGroupCount> group;
360 
361  for (size_t g = 0; g < kGroupCount; ++g) {
362  group[g] = TestGroup::ConstructLoopbackMesh(num_hosts);
363  }
364 
365  std::array<net::GroupPtr, kGroupCount> host_group = {
366  { std::move(group[0][0]), std::move(group[1][0]) }
367  };
368 
369  auto dispatcher = std::make_unique<net::DispatcherThread>(
370  std::make_unique<TestGroup::Dispatcher>(), my_host_rank);
371 
372  HostContext host_context(
373  0, mem_config,
374  std::move(dispatcher), std::move(host_group), workers_per_host);
375 
376  Context ctx(host_context, 0);
377  common::NameThisThread("worker " + std::to_string(my_host_rank));
378 
379  job_startpoint(ctx);
380 }
381 
382 /******************************************************************************/
383 // Run() Variants for Different Net Backends
384 
385 //! Run() implementation which uses a loopback net backend ("mock" or "tcp").
386 template <typename NetGroup>
387 static inline
389  const char* backend, const std::function<void(Context&)>& job_startpoint) {
390 
391  char* endptr;
392 
393  // determine number of loopback hosts
394 
395  size_t num_hosts = 2;
396 
397  const char* env_local = getenv("THRILL_LOCAL");
398  if (env_local && *env_local) {
399  // parse envvar only if it exists.
400  num_hosts = std::strtoul(env_local, &endptr, 10);
401 
402  if (!endptr || *endptr != 0 || num_hosts == 0) {
403  std::cerr << "Thrill: environment variable"
404  << " THRILL_LOCAL=" << env_local
405  << " is not a valid number of local loopback hosts."
406  << std::endl;
407  return -1;
408  }
409  }
410 
411  // determine number of threads per loopback host
412 
413  const char* str_workers_per_host;
414  const char* env_workers_per_host;
415 
416  size_t workers_per_host = FindWorkersPerHost(
417  str_workers_per_host, env_workers_per_host);
418 
419  if (workers_per_host == 0)
420  return -1;
421 
422  // core offset for pinning
423 
424  const char* env_core_offset = getenv("THRILL_CORE_OFFSET");
425  size_t core_offset = 0;
426  if (env_core_offset && *env_core_offset) {
427  core_offset = std::strtoul(env_core_offset, &endptr, 10);
428 
429  size_t last_core = core_offset + num_hosts * workers_per_host;
430  if (!endptr || *endptr != 0 ||
431  last_core > std::thread::hardware_concurrency())
432  {
433  std::cerr << "Thrill: environment variable"
434  << " THRILL_CORE_OFFSET=" << env_core_offset
435  << " is not a valid number of cores to skip for pinning."
436  << std::endl;
437  return -1;
438  }
439  }
440 
441  // detect memory config
442 
443  MemoryConfig mem_config;
444  if (mem_config.setup_detect() < 0) return -1;
445  mem_config.print(workers_per_host);
446 
447  // okay, configuration is good.
448 
449  std::cerr << "Thrill: running locally with " << num_hosts
450  << " test hosts and " << workers_per_host << " workers per host"
451  << " in a local " << backend << " network." << std::endl;
452 
453  if (!Initialize()) return -1;
454 
455  RunLoopbackThreads<NetGroup>(
456  mem_config, num_hosts, workers_per_host, core_offset, job_startpoint);
457 
458  if (!Deinitialize()) return -1;
459 
460  return 0;
461 }
462 
463 #if THRILL_HAVE_NET_TCP
464 static inline
465 int RunBackendTcp(const std::function<void(Context&)>& job_startpoint) {
466 
467  char* endptr;
468 
469  // select environment variables
470 
471  const char* str_rank = "THRILL_RANK";
472  const char* env_rank = getenv(str_rank);
473 
474  if (env_rank == nullptr) {
475  // take SLURM_PROCID if THRILL_RANK is not set
476  str_rank = "SLURM_PROCID";
477  env_rank = getenv(str_rank);
478  }
479 
480  const char* env_hostlist = getenv("THRILL_HOSTLIST");
481 
482  // parse environment variables
483 
484  size_t my_host_rank = 0;
485 
486  if (env_rank != nullptr && *env_rank != 0) {
487  my_host_rank = std::strtoul(env_rank, &endptr, 10);
488 
489  if (endptr == nullptr || *endptr != 0) {
490  std::cerr << "Thrill: environment variable "
491  << str_rank << '=' << env_rank
492  << " is not a valid number."
493  << std::endl;
494  return -1;
495  }
496  }
497  else {
498  std::cerr << "Thrill: environment variable THRILL_RANK"
499  << " is required for tcp network backend."
500  << std::endl;
501  return -1;
502  }
503 
504  std::vector<std::string> hostlist;
505 
506  if (env_hostlist != nullptr && *env_hostlist != 0) {
507  // first try to split by spaces, then by commas
508  std::vector<std::string> list = tlx::split(' ', env_hostlist);
509 
510  if (list.size() == 1) {
511  tlx::split(&list, ',', env_hostlist);
512  }
513 
514  for (const std::string& host : list) {
515  // skip empty splits
516  if (host.empty()) continue;
517 
518  if (host.find(':') == std::string::npos) {
519  std::cerr << "Thrill: invalid address \"" << host << "\""
520  << "in THRILL_HOSTLIST. It must contain a port number."
521  << std::endl;
522  return -1;
523  }
524 
525  hostlist.push_back(host);
526  }
527 
528  if (my_host_rank >= hostlist.size()) {
529  std::cerr << "Thrill: endpoint list (" << list.size() << " entries) "
530  << "does not include my host_rank (" << my_host_rank << ")"
531  << std::endl;
532  return -1;
533  }
534  }
535  else {
536  std::cerr << "Thrill: environment variable THRILL_HOSTLIST"
537  << " is required for tcp network backend."
538  << std::endl;
539  return -1;
540  }
541 
542  // determine number of local worker threads per process
543 
544  const char* str_workers_per_host;
545  const char* env_workers_per_host;
546 
547  size_t workers_per_host = FindWorkersPerHost(
548  str_workers_per_host, env_workers_per_host);
549 
550  if (workers_per_host == 0)
551  return -1;
552 
553  // detect memory config
554 
555  MemoryConfig mem_config;
556  if (mem_config.setup_detect() < 0) return -1;
557  mem_config.print(workers_per_host);
558 
559  // okay, configuration is good.
560 
561  std::cerr << "Thrill: running in tcp network with " << hostlist.size()
562  << " hosts and " << workers_per_host << " workers per host"
563  << " with " << common::GetHostname()
564  << " as rank " << my_host_rank << " and endpoints";
565  for (const std::string& ep : hostlist)
566  std::cerr << ' ' << ep;
567  std::cerr << std::endl;
568 
569  if (!Initialize()) return -1;
570 
571  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
572 
573  // construct three TCP network groups
574  auto select_dispatcher = std::make_unique<net::tcp::SelectDispatcher>();
575 
576  std::array<std::unique_ptr<net::tcp::Group>, kGroupCount> groups;
578  *select_dispatcher, my_host_rank, hostlist,
579  groups.data(), net::Manager::kGroupCount);
580 
581  std::array<net::GroupPtr, kGroupCount> host_groups = {
582  { std::move(groups[0]), std::move(groups[1]) }
583  };
584 
585  // construct HostContext
586 
587  auto dispatcher = std::make_unique<net::DispatcherThread>(
588  std::move(select_dispatcher), my_host_rank);
589 
590  HostContext host_context(
591  0, mem_config,
592  std::move(dispatcher), std::move(host_groups), workers_per_host);
593 
594  std::vector<std::thread> threads(workers_per_host);
595 
596  for (size_t worker = 0; worker < workers_per_host; worker++) {
597  threads[worker] = common::CreateThread(
598  [&host_context, &job_startpoint, worker] {
599  Context ctx(host_context, worker);
600  common::NameThisThread("worker " + std::to_string(worker));
601 
602  ctx.Launch(job_startpoint);
603  });
604  common::SetCpuAffinity(threads[worker], worker);
605  }
606 
607  // join worker threads
608  for (size_t i = 0; i < workers_per_host; i++) {
609  threads[i].join();
610  }
611 
612  if (!Deinitialize()) return -1;
613 
614  return 0;
615 }
616 #endif
617 
618 #if THRILL_HAVE_NET_MPI
619 static inline
620 int RunBackendMpi(const std::function<void(Context&)>& job_startpoint) {
621 
622  // determine number of local worker threads per MPI process
623 
624  const char* str_workers_per_host;
625  const char* env_workers_per_host;
626 
627  size_t workers_per_host = FindWorkersPerHost(
628  str_workers_per_host, env_workers_per_host);
629 
630  if (workers_per_host == 0)
631  return -1;
632 
633  // reserve one thread for MPI net::Dispatcher which runs a busy-waiting loop
634 
635  if (workers_per_host == 1) {
636  std::cerr << "Thrill: environment variable"
637  << ' ' << str_workers_per_host
638  << '=' << env_workers_per_host
639  << " is not recommended, as one thread is used exclusively"
640  << " for MPI communication."
641  << std::endl;
642  }
643  else {
644  --workers_per_host;
645  }
646 
647  // detect memory config
648 
649  MemoryConfig mem_config;
650  if (mem_config.setup_detect() < 0) return -1;
651  mem_config.print(workers_per_host);
652 
653  // okay, configuration is good.
654 
655  size_t num_hosts = net::mpi::NumMpiProcesses();
656  size_t mpi_rank = net::mpi::MpiRank();
657  std::string hostname = common::GetHostname();
658 
659  std::cerr << "Thrill: running in MPI network with " << num_hosts
660  << " hosts and " << workers_per_host << "+1 workers per host"
661  << " with " << hostname << " as rank " << mpi_rank << "."
662  << std::endl;
663 
664  if (!Initialize()) return -1;
665 
666  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
667 
668  // construct three MPI network groups
669  auto dispatcher = std::make_unique<net::DispatcherThread>(
670  std::make_unique<net::mpi::Dispatcher>(num_hosts), mpi_rank);
671 
672  std::array<std::unique_ptr<net::mpi::Group>, kGroupCount> groups;
673  net::mpi::Construct(num_hosts, *dispatcher, groups.data(), kGroupCount);
674 
675  std::array<net::GroupPtr, kGroupCount> host_groups = {
676  { std::move(groups[0]), std::move(groups[1]) }
677  };
678 
679  // construct HostContext
680  HostContext host_context(
681  0, mem_config,
682  std::move(dispatcher), std::move(host_groups), workers_per_host);
683 
684  // launch worker threads
685  std::vector<std::thread> threads(workers_per_host);
686 
687  for (size_t worker = 0; worker < workers_per_host; worker++) {
688  threads[worker] = common::CreateThread(
689  [&host_context, &job_startpoint, worker] {
690  Context ctx(host_context, worker);
691  common::NameThisThread("host " + std::to_string(ctx.host_rank())
692  + " worker " + std::to_string(worker));
693 
694  ctx.Launch(job_startpoint);
695  });
696  common::SetCpuAffinity(threads[worker], worker);
697  }
698 
699  // join worker threads
700  for (size_t i = 0; i < workers_per_host; i++) {
701  threads[i].join();
702  }
703 
704  if (!Deinitialize()) return -1;
705 
706  return 0;
707 }
708 #endif
709 
710 #if THRILL_HAVE_NET_IB
711 static inline
712 int RunBackendIb(const std::function<void(Context&)>& job_startpoint) {
713 
714  // determine number of local worker threads per IB/MPI process
715 
716  const char* str_workers_per_host;
717  const char* env_workers_per_host;
718 
719  size_t workers_per_host = FindWorkersPerHost(
720  str_workers_per_host, env_workers_per_host);
721 
722  if (workers_per_host == 0)
723  return -1;
724 
725  // detect memory config
726 
727  MemoryConfig mem_config;
728  if (mem_config.setup_detect() < 0) return -1;
729  mem_config.print(workers_per_host);
730 
731  // okay, configuration is good.
732 
733  size_t num_hosts = net::ib::NumMpiProcesses();
734  size_t mpi_rank = net::ib::MpiRank();
735 
736  std::cerr << "Thrill: running in IB/MPI network with " << num_hosts
737  << " hosts and " << workers_per_host << " workers per host"
738  << " with " << common::GetHostname()
739  << " as rank " << mpi_rank << "."
740  << std::endl;
741 
742  if (!Initialize()) return -1;
743 
744  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
745 
746  // construct two MPI network groups
747  std::array<std::unique_ptr<net::ib::Group>, kGroupCount> groups;
748  net::ib::Construct(num_hosts, groups.data(), kGroupCount);
749 
750  std::array<net::GroupPtr, kGroupCount> host_groups = {
751  { std::move(groups[0]), std::move(groups[1]) }
752  };
753 
754  // construct HostContext
755  HostContext host_context(
756  0, mem_config, std::move(host_groups), workers_per_host);
757 
758  // launch worker threads
759  std::vector<std::thread> threads(workers_per_host);
760 
761  for (size_t worker = 0; worker < workers_per_host; worker++) {
762  threads[worker] = common::CreateThread(
763  [&host_context, &job_startpoint, worker] {
764  Context ctx(host_context, worker);
765  common::NameThisThread("host " + std::to_string(ctx.host_rank())
766  + " worker " + std::to_string(worker));
767 
768  ctx.Launch(job_startpoint);
769  });
770  common::SetCpuAffinity(threads[worker], worker);
771  }
772 
773  // join worker threads
774  for (size_t i = 0; i < workers_per_host; i++) {
775  threads[i].join();
776  }
777 
778  if (!Deinitialize()) return -1;
779 
780  return 0;
781 }
782 #endif
783 
784 int RunNotSupported(const char* env_net) {
785  std::cerr << "Thrill: network backend " << env_net
786  << " is not supported by this binary." << std::endl;
787  return -1;
788 }
789 
790 static inline
791 const char * DetectNetBackend() {
792  // detect openmpi and intel mpi run, add others as well.
793  if (getenv("OMPI_COMM_WORLD_SIZE") != nullptr ||
794  getenv("I_MPI_INFO_NP") != nullptr) {
795 #if THRILL_HAVE_NET_IB
796  return "ib";
797 #elif THRILL_HAVE_NET_MPI
798  return "mpi";
799 #else
800  std::cerr << "Thrill: MPI environment detected, but network backend mpi"
801  << " is not supported by this binary." << std::endl;
802  return nullptr;
803 #endif
804  }
805 #if defined(_MSC_VER)
806  return "mock";
807 #else
808  const char* env_rank = getenv("THRILL_RANK");
809  const char* env_hostlist = getenv("THRILL_HOSTLIST");
810 
811  if (env_rank != nullptr || env_hostlist != nullptr)
812  return "tcp";
813  else
814  return "local";
815 #endif
816 }
817 
819 
820  const char* env_die_with_parent = getenv("THRILL_DIE_WITH_PARENT");
821  if (env_die_with_parent == nullptr || *env_die_with_parent == 0) return 0;
822 
823  char* endptr;
824 
825  long die_with_parent = std::strtol(env_die_with_parent, &endptr, 10);
826  if (endptr == nullptr || *endptr != 0 ||
827  (die_with_parent != 0 && die_with_parent != 1)) {
828  std::cerr << "Thrill: environment variable"
829  << " THRILL_DIE_WITH_PARENT=" << env_die_with_parent
830  << " is not either 0 or 1."
831  << std::endl;
832  return -1;
833  }
834 
835  if (die_with_parent == 0) return 0;
836 
837 #if __linux__
838  if (prctl(PR_SET_PDEATHSIG, SIGTERM) != 0) // NOLINT
839  throw common::ErrnoException("Error calling prctl(PR_SET_PDEATHSIG)");
840  return 1;
841 #else
842  std::cerr << "Thrill: DIE_WITH_PARENT is not supported on this platform.\n"
843  << "Please submit a patch."
844  << std::endl;
845  return 0;
846 #endif
847 }
848 
850 
851  const char* env_unlink_binary = getenv("THRILL_UNLINK_BINARY");
852  if (env_unlink_binary == nullptr || *env_unlink_binary == 0) return 0;
853 
854  if (unlink(env_unlink_binary) != 0) {
856  "Error calling unlink binary \""
857  + std::string(env_unlink_binary) + "\"");
858  }
859 
860  return 0;
861 }
862 
863 int Run(const std::function<void(Context&)>& job_startpoint) {
864 
865  if (RunCheckDieWithParent() < 0)
866  return -1;
867 
868  if (RunCheckUnlinkBinary() < 0)
869  return -1;
870 
871  // parse environment: THRILL_NET
872  const char* env_net = getenv("THRILL_NET");
873 
874  // if no backend configured: automatically select one.
875  if (env_net == nullptr || *env_net == 0) {
876  env_net = DetectNetBackend();
877  if (env_net == nullptr) return -1;
878  }
879 
880  // run with selected backend
881  if (strcmp(env_net, "mock") == 0) {
882  // mock network backend
883  return RunBackendLoopback<net::mock::Group>("mock", job_startpoint);
884  }
885 
886  if (strcmp(env_net, "local") == 0) {
887 #if THRILL_HAVE_NET_TCP
888  // tcp loopback network backend
889  return RunBackendLoopback<net::tcp::Group>("tcp", job_startpoint);
890 #else
891  return RunNotSupported(env_net);
892 #endif
893  }
894 
895  if (strcmp(env_net, "tcp") == 0) {
896 #if THRILL_HAVE_NET_TCP
897  // real tcp network backend
898  return RunBackendTcp(job_startpoint);
899 #else
900  return RunNotSupported(env_net);
901 #endif
902  }
903 
904  if (strcmp(env_net, "mpi") == 0) {
905 #if THRILL_HAVE_NET_MPI
906  // mpi network backend
907  return RunBackendMpi(job_startpoint);
908 #else
909  return RunNotSupported(env_net);
910 #endif
911  }
912 
913  if (strcmp(env_net, "ib") == 0) {
914 #if THRILL_HAVE_NET_IB
915  // ib/mpi network backend
916  return RunBackendIb(job_startpoint);
917 #else
918  return RunNotSupported(env_net);
919 #endif
920  }
921 
922  std::cerr << "Thrill: network backend " << env_net << " is unknown."
923  << std::endl;
924  return -1;
925 }
926 
927 /******************************************************************************/
928 // MemoryConfig
929 
930 void MemoryConfig::setup(size_t ram) {
931  ram_ = ram;
932  apply();
933 }
934 
936 
937  // determine amount of physical RAM or take user's limit
938 
939  const char* env_ram = getenv("THRILL_RAM");
940 
941  if (env_ram != nullptr && *env_ram != 0) {
942  uint64_t ram64;
943  if (!tlx::parse_si_iec_units(env_ram, &ram64)) {
944  std::cerr << "Thrill: environment variable"
945  << " THRILL_RAM=" << env_ram
946  << " is not a valid amount of RAM memory."
947  << std::endl;
948  return -1;
949  }
950  ram_ = static_cast<size_t>(ram64);
951  }
952  else {
953  // detect amount of physical memory on system
954 #if defined(_MSC_VER)
955  MEMORYSTATUSEX memstx;
956  memstx.dwLength = sizeof(memstx);
957  GlobalMemoryStatusEx(&memstx);
958 
959  ram_ = memstx.ullTotalPhys;
960 #elif __APPLE__
961  int mib[2];
962  int64_t physical_memory;
963  size_t length;
964 
965  // Get the physical memory size
966  mib[0] = CTL_HW;
967  mib[1] = HW_MEMSIZE;
968  length = sizeof(physical_memory);
969  sysctl(mib, 2, &physical_memory, &length, nullptr, 0);
970  ram_ = static_cast<size_t>(physical_memory);
971 #else
972  ram_ = sysconf(_SC_PHYS_PAGES) * static_cast<size_t>(sysconf(_SC_PAGESIZE));
973 #endif
974 
975 #if __linux__
976  // use getrlimit() to check user limit on address space
977  struct rlimit rl; // NOLINT
978  if (getrlimit(RLIMIT_AS, &rl) == 0) {
979  if (rl.rlim_cur != 0 && rl.rlim_cur * 3 / 4 < ram_) {
980  ram_ = rl.rlim_cur * 3 / 4;
981  }
982  }
983  else {
984  sLOG1 << "getrlimit(): " << strerror(errno);
985  }
986 #endif
987  }
988 
989  apply();
990 
991  return 0;
992 }
993 
995  // divide up ram_
996 
997  ram_workers_ = ram_ / 3;
1001 
1002  // set memory limit, only BlockPool is excluded from malloc tracking, as
1003  // only it uses bypassing allocators.
1005 }
1006 
1007 MemoryConfig MemoryConfig::divide(size_t hosts) const {
1008 
1009  MemoryConfig mc = *this;
1010  mc.ram_ /= hosts;
1011  mc.ram_block_pool_hard_ /= hosts;
1012  mc.ram_block_pool_soft_ /= hosts;
1013  mc.ram_workers_ /= hosts;
1014  // free floating memory is not divided by host, as it is measured overall
1015 
1016  return mc;
1017 }
1018 
1019 void MemoryConfig::print(size_t workers_per_host) const {
1020  if (!verbose_) return;
1021 
1022  std::cerr
1023  << "Thrill: using "
1024  << tlx::format_iec_units(ram_) << "B RAM total,"
1025  << " BlockPool=" << tlx::format_iec_units(ram_block_pool_hard_) << "B,"
1026  << " workers="
1027  << tlx::format_iec_units(ram_workers_ / workers_per_host) << "B,"
1028  << " floating=" << tlx::format_iec_units(ram_floating_) << "B."
1029  << std::endl;
1030 }
1031 
1032 /******************************************************************************/
1033 // HostContext methods
1034 
1036  size_t local_host_id,
1037  const MemoryConfig& mem_config,
1038  std::unique_ptr<net::DispatcherThread> dispatcher,
1039  std::array<net::GroupPtr, net::Manager::kGroupCount>&& groups,
1040  size_t workers_per_host)
1041  : mem_config_(mem_config),
1042  base_logger_(MakeHostLogPath(groups[0]->my_host_rank())),
1043  logger_(&base_logger_, "host_rank", groups[0]->my_host_rank()),
1044  profiler_(std::make_unique<common::ProfileThread>()),
1045  local_host_id_(local_host_id),
1046  workers_per_host_(workers_per_host),
1047  dispatcher_(std::move(dispatcher)),
1048  net_manager_(std::move(groups), logger_) {
1049 
1050  // write command line parameters to json log
1052 
1054 
1055  // run memory profiler only on local host 0 (especially for test runs)
1056  if (local_host_id == 0)
1058 }
1059 
1061  // stop dispatcher _before_ stopping multiplexer
1062  dispatcher_->Terminate();
1063 }
1064 
1066  const char* env_log = getenv("THRILL_LOG");
1067  if (env_log == nullptr) {
1068  if (host_rank == 0 && mem_config().verbose_) {
1069  std::cerr << "Thrill: no THRILL_LOG was found, "
1070  << "so no json log is written."
1071  << std::endl;
1072  }
1073  return std::string();
1074  }
1075 
1076  std::string output = env_log;
1077  if (output == "" || output == "-")
1078  return std::string();
1079  if (output == "/dev/stdout")
1080  return output;
1081  if (output == "stdout")
1082  return "/dev/stdout";
1083 
1084  return output + "-host-" + std::to_string(host_rank) + ".json";
1085 }
1086 
1087 /******************************************************************************/
1088 // Context methods
1089 
1090 Context::Context(HostContext& host_context, size_t local_worker_id)
1091  : local_host_id_(host_context.local_host_id()),
1092  local_worker_id_(local_worker_id),
1093  workers_per_host_(host_context.workers_per_host()),
1094  mem_limit_(host_context.worker_mem_limit()),
1095  mem_config_(host_context.mem_config()),
1096  mem_manager_(host_context.mem_manager()),
1097  net_manager_(host_context.net_manager()),
1098  flow_manager_(host_context.flow_manager()),
1099  block_pool_(host_context.block_pool()),
1100  multiplexer_(host_context.data_multiplexer()),
1101  rng_(std::random_device { }
1102  () + (local_worker_id_ << 16)),
1103  base_logger_(&host_context.base_logger_) {
1104  assert(local_worker_id < workers_per_host());
1105 }
1106 
1108  return GetFile(dia != nullptr ? dia->id() : 0);
1109 }
1110 
1112  return tlx::make_counting<data::File>(
1113  block_pool_, local_worker_id_, dia_id);
1114 }
1115 
1117  return GetFilePtr(dia != nullptr ? dia->id() : 0);
1118 }
1119 
1122 }
1123 
1125  return GetNewCatStream(dia != nullptr ? dia->id() : 0);
1126 }
1127 
1130 }
1131 
1133  return GetNewMixStream(dia != nullptr ? dia->id() : 0);
1134 }
1135 
1136 template <>
1137 data::CatStreamPtr Context::GetNewStream<data::CatStream>(size_t dia_id) {
1138  return GetNewCatStream(dia_id);
1139 }
1140 
1141 template <>
1142 data::MixStreamPtr Context::GetNewStream<data::MixStream>(size_t dia_id) {
1143  return GetNewMixStream(dia_id);
1144 }
1145 
1146 struct OverallStats {
1147 
1148  //! overall run time
1149  double runtime;
1150 
1151  //! maximum ByteBlock allocation on all workers
1152  size_t max_block_bytes;
1153 
1154  //! network traffic performed by net layer
1155  size_t net_traffic_tx, net_traffic_rx;
1156 
1157  //! I/O volume performed by io layer
1158  size_t io_volume;
1159 
1160  //! maximum external memory allocation
1161  size_t io_max_allocation;
1162 
1163  friend std ::ostream& operator << (std::ostream& os, const OverallStats& c) {
1164  return os << "[OverallStats"
1165  << " runtime=" << c.runtime
1166  << " max_block_bytes=" << c.max_block_bytes
1167  << " net_traffic_tx=" << c.net_traffic_tx
1168  << " net_traffic_rx=" << c.net_traffic_rx
1169  << " io_volume=" << c.io_volume
1170  << " io_max_allocation=" << c.io_max_allocation
1171  << "]";
1172  }
1173 
1174  OverallStats operator + (const OverallStats& b) const {
1175  OverallStats r;
1176  r.runtime = std::max(runtime, b.runtime);
1177  r.max_block_bytes = max_block_bytes + b.max_block_bytes;
1178  r.net_traffic_tx = net_traffic_tx + b.net_traffic_tx;
1179  r.net_traffic_rx = net_traffic_rx + b.net_traffic_rx;
1180  r.io_volume = io_volume + b.io_volume;
1181  r.io_max_allocation = std::max(io_max_allocation, b.io_max_allocation);
1182  return r;
1183  }
1184 };
1185 
1186 void Context::Launch(const std::function<void(Context&)>& job_startpoint) {
1187  logger_ << "class" << "Context"
1188  << "event" << "job-start";
1189 
1190  common::StatsTimerStart overall_timer;
1191 
1192  try {
1193  job_startpoint(*this);
1194  }
1195  catch (std::exception& e) {
1196  LOG1 << "worker " << my_rank() << " threw " << typeid(e).name();
1197  LOG1 << " what(): " << e.what();
1198 
1199  logger_ << "class" << "Context"
1200  << "event" << "job-exception"
1201  << "exception" << typeid(e).name()
1202  << "what" << e.what();
1203  throw;
1204  }
1205 
1206  logger_ << "class" << "Context"
1207  << "event" << "job-done"
1208  << "elapsed" << overall_timer;
1209 
1210  overall_timer.Stop();
1211 
1212  // collect overall statistics
1213  OverallStats stats;
1214  stats.runtime = overall_timer.SecondsDouble();
1215 
1216  stats.max_block_bytes =
1218 
1219  stats.net_traffic_tx = local_worker_id_ == 0 ? net_manager_.Traffic().tx : 0;
1220  stats.net_traffic_rx = local_worker_id_ == 0 ? net_manager_.Traffic().rx : 0;
1221 
1222  if (local_host_id_ == 0 && local_worker_id_ == 0) {
1224  stats.io_volume = io_stats.get_read_bytes() + io_stats.get_write_bytes();
1225  stats.io_max_allocation =
1227  }
1228  else {
1229  stats.io_volume = 0;
1230  stats.io_max_allocation = 0;
1231  }
1232 
1233  LOG0 << stats;
1234 
1235  stats = net.Reduce(stats);
1236 
1237  if (my_rank() == 0) {
1238  using tlx::format_iec_units;
1239 
1240  if (stats.net_traffic_rx != stats.net_traffic_tx)
1241  LOG1 << "Manager::Traffic() tx/rx asymmetry = "
1242  << tlx::abs_diff(stats.net_traffic_tx, stats.net_traffic_rx);
1243 
1244  if (mem_config().verbose_) {
1245  std::cerr
1246  << "Thrill:"
1247  << " ran " << stats.runtime << "s with max "
1248  << format_iec_units(stats.max_block_bytes) << "B in DIA Blocks, "
1249  << format_iec_units(stats.net_traffic_tx) << "B network traffic, "
1250  << format_iec_units(stats.io_volume) << "B disk I/O, and "
1251  << format_iec_units(stats.io_max_allocation) << "B max disk use."
1252  << std::endl;
1253  }
1254 
1255  logger_ << "class" << "Context"
1256  << "event" << "summary"
1257  << "runtime" << stats.runtime
1258  << "net_traffic" << stats.net_traffic_tx
1259  << "io_volume" << stats.io_volume
1260  << "io_max_allocation" << stats.io_max_allocation;
1261  }
1262 }
1263 
1264 } // namespace api
1265 } // namespace thrill
1266 
1267 /******************************************************************************/
CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id)
Request next stream.
size_t ram_
total amount of physical ram detected or THRILL_RAM
Definition: context.hpp:63
size_t local_host_id_
id among all local hosts (in test program runs)
Definition: context.hpp:392
net::FlowControlChannel & net
Definition: context.hpp:443
size_t ram_floating_
remaining free-floating RAM used for user and Thrill data structures.
Definition: context.hpp:76
MemoryConfig divide(size_t hosts) const
Definition: context.cpp:1007
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
size_t max_total_bytes() noexcept
Maximum total number of bytes allocated in blocks of this block pool.
Definition: block_pool.cpp:896
The central object of a mock network: the Group containing links to other mock Group forming the netw...
Definition: group.hpp:116
void LogCmdlineParams(JsonLogger &logger)
Definition: porting.cpp:67
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
HostContext(size_t local_host_id, const MemoryConfig &mem_config, std::unique_ptr< net::DispatcherThread > dispatcher, std::array< net::GroupPtr, net::Manager::kGroupCount > &&groups, size_t workers_per_host)
constructor from existing net Groups. Used by the construction methods.
Definition: context.cpp:1035
void StartMemProfiler(common::ProfileThread &sched, common::JsonLogger &logger)
launch profiler task
void Initialize()
Initialize VFS layer.
Definition: file_io.cpp:34
net::Manager & net_manager_
net::Manager instance that is shared among workers
Definition: context.hpp:410
std::string MakeHostLogPath(size_t host_rank)
create host log
Definition: context.cpp:1065
#define LOG1
Definition: logger.hpp:28
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
Collection of NetConnections to workers, allows point-to-point client communication and simple collec...
Definition: group.hpp:43
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:863
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
An Exception which is thrown on system errors and contains errno information.
void set_memory_limit_indication(ssize_t size)
common::JsonLogger logger_
Definition: context.hpp:155
const MemoryConfig & mem_config() const
host-global memory config
Definition: context.hpp:326
int RunCheckDieWithParent()
Check environment variable THRILL_DIE_WITH_PARENT and enable process flag: this is useful for ssh/inv...
Definition: context.cpp:818
int setup_detect()
detect memory configuration from environment
Definition: context.cpp:935
#define sLOG1
Definition: logger.hpp:38
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:218
size_t workers_per_host() const
number of workers per host (all have the same).
Definition: context.hpp:112
void NameThisThread(const std::string &name)
Defines a name for the current thread, only if no name was set previously.
Definition: logger.cpp:40
The DIABase is the untyped super class of DIANode.
Definition: dia_base.hpp:87
static by_string to_string(int val)
convert to string
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
static bool Deinitialize()
Definition: context.cpp:282
static std::vector< std::unique_ptr< HostContext > > ConstructLoopbackHostContexts(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host)
Generic network constructor for net backends supporting loopback tests.
Definition: context.cpp:94
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
Definition: split.cpp:20
int RunCheckUnlinkBinary()
Check environment variable THRILL_UNLINK_BINARY and unlink given program path: this is useful for ssh...
Definition: context.cpp:849
data::BlockPool & block_pool_
data block pool
Definition: context.hpp:416
size_t workers_per_host() const
Returns the number of workers that is hosted on each host.
Definition: context.hpp:235
static void RunLoopbackThreads(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, size_t core_offset, const std::function< void(Context &)> &job_startpoint)
Generic runner for backends supporting loopback tests.
Definition: context.cpp:135
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1120
size_t start_block_size
starting size of blocks in BlockWriter.
Definition: byte_block.cpp:24
int setenv(const char *name, const char *value, int overwrite)
setenv - change or add an environment variable Windows porting madness because setenv() is apparently...
Definition: setenv.cpp:31
~HostContext()
destructor
Definition: context.cpp:1060
std::string GetHostname()
get hostname
Definition: porting.cpp:142
void print(size_t workers_per_host) const
Definition: context.cpp:1019
bool parse_si_iec_units(const char *str, uint64_t *out_size, char default_unit)
Parse a string like "343KB" or "44 GiB" into the corresponding size in bytes.
void RunLocalSameThread(const std::function< void(Context &)> &job_startpoint)
Runs the given job_startpoint within the same thread with a test network –> run test with one host an...
Definition: context.cpp:345
size_t local_worker_id_
number of this host context, 0..p-1, within this host
Definition: context.hpp:395
unique_ptr< T > make_unique(Manager &manager, Args &&...args)
make_unique with Manager tracking
Definition: allocator.hpp:212
external_size_type get_read_bytes() const
Definition: iostats.cpp:527
size_t ram_block_pool_hard_
amount of RAM dedicated to data::BlockPool – hard limit
Definition: context.hpp:66
void RunLocalMock(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, const std::function< void(Context &)> &job_startpoint)
Function to run a number of mock hosts as locally independent threads, which communicate via internal...
Definition: context.cpp:298
void StartLinuxProcStatsProfiler(ProfileThread &, JsonLogger &)
launch profiler task
T abs_diff(const T &a, const T &b)
absolute difference, which also works for unsigned types
Definition: abs_diff.hpp:24
static bool SetupBlockSize()
Definition: context.cpp:178
void Deinitialize()
Deinitialize VFS layer.
Definition: file_io.cpp:39
common::JsonLogger logger_
Definition: context.hpp:459
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Reduce(const T &value, size_t root=0, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers to the given worker, provided a certain red...
int RunNotSupported(const char *env_net)
Definition: context.cpp:784
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:42
static instance_pointer get_instance()
Definition: singleton.hpp:44
net::Traffic Traffic() const
calculate overall traffic for final stats
Definition: group.cpp:67
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a test network with an underlying full mesh of local loopback stream sockets for testing...
Definition: group.cpp:36
data::FilePtr GetFilePtr(size_t dia_id)
Definition: context.cpp:1111
MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id)
Request next stream.
void RunLocalTests(const std::function< void(Context &)> &job_startpoint)
Helper Function to execute RunLocalMock() tests using mock networks in test suite for many different ...
Definition: context.cpp:318
size_t MpiRank()
Return the rank of this process in the MPI COMM WORLD.
Definition: group.cpp:714
void Construct(SelectDispatcher &dispatcher, size_t my_rank, const std::vector< std::string > &endpoints, std::unique_ptr< Group > *groups, size_t group_count)
Definition: construct.cpp:543
static size_t FindWorkersPerHost(const char *&str_workers_per_host, const char *&env_workers_per_host)
Definition: context.cpp:203
Context(HostContext &host_context, size_t local_worker_id)
Definition: context.cpp:1090
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
data::MixStreamPtr GetNewMixStream(size_t dia_id)
Definition: context.cpp:1128
void Launch(const std::function< void(Context &)> &job_startpoint)
method used to launch a job's main procedure. it wraps it in log output.
Definition: context.cpp:1186
data::Multiplexer & multiplexer_
data::Multiplexer instance that is shared among workers
Definition: context.hpp:419
size_t ram_block_pool_soft_
amount of RAM dedicated to data::BlockPool – soft limit
Definition: context.hpp:69
static constexpr size_t kGroupCount
The count of net::Groups to initialize.
Definition: manager.hpp:61
The HostContext contains all data structures shared among workers on the same host.
Definition: context.hpp:87
size_t local_worker_id() const
Definition: context.hpp:260
void setup(size_t ram)
setup memory size
Definition: context.cpp:930
uint64_t maximum_allocation() const
return maximum number of bytes allocated during program run.
static int RunBackendLoopback(const char *backend, const std::function< void(Context &)> &job_startpoint)
Run() implementation which uses a loopback net backend ("mock" or "tcp").
Definition: context.cpp:388
net::tcp::Group TestGroup
Definition: context.cpp:295
static std::vector< std::unique_ptr< HostContext > > ConstructLoopback(size_t num_hosts, size_t workers_per_host)
Construct a number of mock hosts running in this process.
Definition: context.cpp:307
MemoryConfig & mem_config()
host-global memory config
Definition: context.hpp:120
size_t rx
received bytes
Definition: manager.hpp:37
size_t NumMpiProcesses()
Return the number of MPI processes.
Definition: group.cpp:701
static bool Initialize()
Definition: context.cpp:273
std::unique_ptr< common::ProfileThread > profiler_
thread for scheduling profiling methods for statistical output
Definition: context.hpp:158
std::unique_ptr< net::DispatcherThread > dispatcher_
main host network dispatcher thread backend
Definition: context.hpp:173
void SetCpuAffinity(std::thread &thread, size_t cpu_id)
set cpu/core affinity of a thread
Definition: porting.cpp:110
std::string format_iec_units(uint64_t number, int precision)
Format number as something like 1 TiB.
static const char * DetectNetBackend()
Definition: context.cpp:791
bool verbose_
StageBuilder verbosity flag.
Definition: context.hpp:79
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:321
std::ostream & operator<<(std::ostream &os, const DIABase &d)
make ostream-able.
Definition: dia_base.cpp:449
external_size_type get_write_bytes() const
Definition: iostats.cpp:543
bool Construct(size_t group_size, DispatcherThread &dispatcher, std::unique_ptr< Group > *groups, size_t group_count)
Construct Group which connects to peers using MPI.
Definition: group.cpp:675
size_t tx
transmitted bytes
Definition: manager.hpp:35