Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
context.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/context.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/api/context.hpp>
13 
14 #include <thrill/api/dia_base.hpp>
16 #include <thrill/common/logger.hpp>
17 #include <thrill/common/math.hpp>
20 #include <thrill/common/string.hpp>
22 #include <thrill/vfs/file_io.hpp>
23 
24 #include <foxxll/io/iostats.hpp>
25 #include <tlx/math/abs_diff.hpp>
26 #include <tlx/port/setenv.hpp>
29 #include <tlx/string/split.hpp>
30 
31 // mock net backend is always available -tb :)
33 
34 #if THRILL_HAVE_NET_TCP
37 #endif
38 
39 #if THRILL_HAVE_NET_MPI
41 #include <thrill/net/mpi/group.hpp>
42 #endif
43 
44 #if THRILL_HAVE_NET_IB
45 #include <thrill/net/ib/group.hpp>
46 #endif
47 
48 #if __linux__
49 
50 // linux-specific process control
51 #include <sys/prctl.h>
52 
53 // for calling getrlimit() to determine memory limit
54 #include <sys/resource.h>
55 #include <sys/time.h>
56 
57 #endif
58 
59 #if __APPLE__
60 
61 // for sysctl()
62 #include <sys/sysctl.h>
63 #include <sys/types.h>
64 
65 #endif
66 
67 #if defined(_MSC_VER)
68 
69 // for detecting amount of physical memory
70 #include <windows.h>
71 
72 #endif
73 
74 #include <algorithm>
75 #include <csignal>
76 #include <iostream>
77 #include <memory>
78 #include <string>
79 #include <thread>
80 #include <tuple>
81 #include <utility>
82 #include <vector>
83 
84 namespace thrill {
85 namespace api {
86 
87 /******************************************************************************/
88 // Generic Network Construction
89 
90 //! Generic network constructor for net backends supporting loopback tests.
91 template <typename NetGroup>
92 static inline
93 std::vector<std::unique_ptr<HostContext> >
95  const MemoryConfig& mem_config,
96  size_t num_hosts, size_t workers_per_host) {
97 
98  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
99 
100  // construct three full mesh loopback cliques, deliver net::Groups.
101  std::array<std::vector<std::unique_ptr<NetGroup> >, kGroupCount> group;
102 
103  for (size_t g = 0; g < kGroupCount; ++g) {
104  group[g] = NetGroup::ConstructLoopbackMesh(num_hosts);
105  }
106 
107  std::vector<std::unique_ptr<net::DispatcherThread> > dispatcher;
108  for (size_t h = 0; h < num_hosts; ++h) {
109  dispatcher.emplace_back(
110  std::make_unique<net::DispatcherThread>(
111  std::make_unique<typename NetGroup::Dispatcher>(), h));
112  }
113 
114  // construct host context
115 
116  std::vector<std::unique_ptr<HostContext> > host_context;
117 
118  for (size_t h = 0; h < num_hosts; h++) {
119  std::array<net::GroupPtr, kGroupCount> host_group = {
120  { std::move(group[0][h]), std::move(group[1][h]) }
121  };
122 
123  host_context.emplace_back(
124  std::make_unique<HostContext>(
125  h, mem_config, std::move(dispatcher[h]),
126  std::move(host_group), workers_per_host));
127  }
128 
129  return host_context;
130 }
131 
132 //! Generic runner for backends supporting loopback tests.
133 template <typename NetGroup>
134 static inline void
136  const MemoryConfig& mem_config,
137  size_t num_hosts, size_t workers_per_host, size_t core_offset,
138  const std::function<void(Context&)>& job_startpoint) {
139 
140  MemoryConfig host_mem_config = mem_config.divide(num_hosts);
141  mem_config.print(workers_per_host);
142 
143  // construct a mock network of hosts
144  typename NetGroup::Dispatcher dispatcher;
145 
146  std::vector<std::unique_ptr<HostContext> > host_contexts =
147  ConstructLoopbackHostContexts<NetGroup>(
148  host_mem_config, num_hosts, workers_per_host);
149 
150  // launch thread for each of the workers on this host.
151  std::vector<std::thread> threads(num_hosts * workers_per_host);
152 
153  for (size_t host = 0; host < num_hosts; ++host) {
154  std::string log_prefix = "host " + std::to_string(host);
155  for (size_t worker = 0; worker < workers_per_host; ++worker) {
156  size_t id = host * workers_per_host + worker;
157  threads[id] = common::CreateThread(
158  [&host_contexts, &job_startpoint, host, worker, log_prefix] {
159  Context ctx(*host_contexts[host], worker);
161  log_prefix + " worker " + std::to_string(worker));
162 
163  ctx.Launch(job_startpoint);
164  });
165  common::SetCpuAffinity(threads[id], core_offset + id);
166  }
167  }
168 
169  // join worker threads
170  for (size_t i = 0; i < num_hosts * workers_per_host; i++) {
171  threads[i].join();
172  }
173 }
174 
175 /******************************************************************************/
176 // Other Configuration Initializations
177 
178 static inline bool SetupBlockSize() {
179 
180  const char* env_block_size = getenv("THRILL_BLOCK_SIZE");
181  if (env_block_size == nullptr || *env_block_size == 0) return true;
182 
183  char* endptr;
184  data::default_block_size = std::strtoul(env_block_size, &endptr, 10);
185 
186  if (endptr == nullptr || *endptr != 0 || data::default_block_size == 0) {
187  std::cerr << "Thrill: environment variable"
188  << " THRILL_BLOCK_SIZE=" << env_block_size
189  << " is not a valid number."
190  << std::endl;
191  return false;
192  }
193 
195 
196  std::cerr << "Thrill: setting default_block_size = "
198  << std::endl;
199 
200  return true;
201 }
202 
203 static inline size_t FindWorkersPerHost(
204  const char*& str_workers_per_host, const char*& env_workers_per_host) {
205 
206  char* endptr;
207 
208  // first check THRILL_WORKERS_PER_HOST
209 
210  str_workers_per_host = "THRILL_WORKERS_PER_HOST";
211  env_workers_per_host = getenv(str_workers_per_host);
212 
213  if (env_workers_per_host && *env_workers_per_host) {
214  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
215  if (!endptr || *endptr != 0 || result == 0) {
216  std::cerr << "Thrill: environment variable"
217  << ' ' << str_workers_per_host
218  << '=' << env_workers_per_host
219  << " is not a valid number of workers per host."
220  << std::endl;
221  return 0;
222  }
223  else {
224  return result;
225  }
226  }
227 
228  // second check: look for OMP_NUM_THREADS
229 
230  str_workers_per_host = "OMP_NUM_THREADS";
231  env_workers_per_host = getenv(str_workers_per_host);
232 
233  if (env_workers_per_host && *env_workers_per_host) {
234  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
235  if (!endptr || *endptr != 0 || result == 0) {
236  std::cerr << "Thrill: environment variable"
237  << ' ' << str_workers_per_host
238  << '=' << env_workers_per_host
239  << " is not a valid number of workers per host."
240  << std::endl;
241  // fall through, try next variable
242  }
243  else {
244  return result;
245  }
246  }
247 
248  // third check: look for SLURM_CPUS_ON_NODE
249 
250  str_workers_per_host = "SLURM_CPUS_ON_NODE";
251  env_workers_per_host = getenv(str_workers_per_host);
252 
253  if (env_workers_per_host && *env_workers_per_host) {
254  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
255  if (!endptr || *endptr != 0 || result == 0) {
256  std::cerr << "Thrill: environment variable"
257  << ' ' << str_workers_per_host
258  << '=' << env_workers_per_host
259  << " is not a valid number of workers per host."
260  << std::endl;
261  // fall through, try next variable
262  }
263  else {
264  return result;
265  }
266  }
267 
268  // last check: return std::thread::hardware_concurrency()
269 
270  return std::thread::hardware_concurrency();
271 }
272 
273 static inline bool Initialize() {
274 
275  if (!SetupBlockSize()) return false;
276 
277  vfs::Initialize();
278 
279  return true;
280 }
281 
282 static inline bool Deinitialize() {
283 
285 
286  return true;
287 }
288 
289 /******************************************************************************/
290 // Constructions using TestGroup (either mock or tcp-loopback) for local testing
291 
292 #if defined(_MSC_VER)
294 #else
296 #endif
297 
298 void RunLocalMock(const MemoryConfig& mem_config,
299  size_t num_hosts, size_t workers_per_host,
300  const std::function<void(Context&)>& job_startpoint) {
301 
302  return RunLoopbackThreads<TestGroup>(
303  mem_config, num_hosts, workers_per_host, 0, job_startpoint);
304 }
305 
306 std::vector<std::unique_ptr<HostContext> >
307 HostContext::ConstructLoopback(size_t num_hosts, size_t workers_per_host) {
308 
309  // set fixed amount of RAM for testing
311  mem_config.setup(4 * 1024 * 1024 * 1024llu);
312  mem_config.verbose_ = false;
313 
314  return ConstructLoopbackHostContexts<TestGroup>(
315  mem_config, num_hosts, workers_per_host);
316 }
317 
318 void RunLocalTests(const std::function<void(Context&)>& job_startpoint) {
319  // set fixed amount of RAM for testing
320  RunLocalTests(4 * 1024 * 1024 * 1024llu, job_startpoint);
321 }
322 
324  size_t ram, const std::function<void(Context&)>& job_startpoint) {
325 
326  // discard json log
327  tlx::setenv("THRILL_LOG", "", /* overwrite */ 1);
328 
329  // set fixed amount of RAM for testing, disable /proc profiler
330  MemoryConfig mem_config;
331  mem_config.verbose_ = false;
332  mem_config.enable_proc_profiler_ = false;
333  mem_config.setup(ram);
334 
335  static constexpr size_t num_hosts_list[] = { 1, 2, 5, 8 };
336  static constexpr size_t num_workers_list[] = { 1, 3 };
337 
338  size_t max_mock_workers = 1000000;
339 
340  const char* env_max_mock_workers = getenv("THRILL_MAX_MOCK_WORKERS");
341  if (env_max_mock_workers && *env_max_mock_workers) {
342  // parse envvar only if it exists.
343  char* endptr;
344  max_mock_workers = std::strtoul(env_max_mock_workers, &endptr, 10);
345 
346  if (!endptr || *endptr != 0 || max_mock_workers == 0) {
347  std::cerr << "Thrill: environment variable"
348  << " THRILL_MAX_MOCK_WORKERS=" << env_max_mock_workers
349  << " is not a valid maximum number of mock hosts."
350  << std::endl;
351  return;
352  }
353  }
354 
355  for (const size_t& num_hosts : num_hosts_list) {
356  for (const size_t& workers_per_host : num_workers_list) {
357  if (num_hosts * workers_per_host > max_mock_workers) {
358  std::cerr << "Thrill: skipping test with "
359  << num_hosts * workers_per_host
360  << " workers > max workers " << max_mock_workers
361  << std::endl;
362  continue;
363  }
364 
365  LOG0 << "Thrill: running local test with "
366  << num_hosts << " hosts and " << workers_per_host
367  << " workers per host";
368 
369  RunLocalMock(mem_config, num_hosts, workers_per_host,
370  job_startpoint);
371  }
372  }
373 }
374 
375 void RunLocalSameThread(const std::function<void(Context&)>& job_startpoint) {
376 
377  size_t my_host_rank = 0;
378  size_t workers_per_host = 1;
379  size_t num_hosts = 1;
380  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
381 
382  // set fixed amount of RAM for testing
383  MemoryConfig mem_config;
384  mem_config.verbose_ = false;
385  mem_config.setup(4 * 1024 * 1024 * 1024llu);
386  mem_config.print(workers_per_host);
387 
388  // construct two full mesh connection cliques, deliver net::tcp::Groups.
389  std::array<std::vector<std::unique_ptr<TestGroup> >, kGroupCount> group;
390 
391  for (size_t g = 0; g < kGroupCount; ++g) {
392  group[g] = TestGroup::ConstructLoopbackMesh(num_hosts);
393  }
394 
395  std::array<net::GroupPtr, kGroupCount> host_group = {
396  { std::move(group[0][0]), std::move(group[1][0]) }
397  };
398 
399  auto dispatcher = std::make_unique<net::DispatcherThread>(
400  std::make_unique<TestGroup::Dispatcher>(), my_host_rank);
401 
402  HostContext host_context(
403  0, mem_config,
404  std::move(dispatcher), std::move(host_group), workers_per_host);
405 
406  Context ctx(host_context, 0);
407  common::NameThisThread("worker " + std::to_string(my_host_rank));
408 
409  job_startpoint(ctx);
410 }
411 
412 /******************************************************************************/
413 // Run() Variants for Different Net Backends
414 
415 //! Run() implementation which uses a loopback net backend ("mock" or "tcp").
416 template <typename NetGroup>
417 static inline
419  const char* backend, const std::function<void(Context&)>& job_startpoint) {
420 
421  char* endptr;
422 
423  // determine number of loopback hosts
424 
425  size_t num_hosts = 2;
426 
427  const char* env_local = getenv("THRILL_LOCAL");
428  if (env_local && *env_local) {
429  // parse envvar only if it exists.
430  num_hosts = std::strtoul(env_local, &endptr, 10);
431 
432  if (!endptr || *endptr != 0 || num_hosts == 0) {
433  std::cerr << "Thrill: environment variable"
434  << " THRILL_LOCAL=" << env_local
435  << " is not a valid number of local loopback hosts."
436  << std::endl;
437  return -1;
438  }
439  }
440 
441  // determine number of threads per loopback host
442 
443  const char* str_workers_per_host;
444  const char* env_workers_per_host;
445 
446  size_t workers_per_host = FindWorkersPerHost(
447  str_workers_per_host, env_workers_per_host);
448 
449  if (workers_per_host == 0)
450  return -1;
451 
452  // core offset for pinning
453 
454  const char* env_core_offset = getenv("THRILL_CORE_OFFSET");
455  size_t core_offset = 0;
456  if (env_core_offset && *env_core_offset) {
457  core_offset = std::strtoul(env_core_offset, &endptr, 10);
458 
459  size_t last_core = core_offset + num_hosts * workers_per_host;
460  if (!endptr || *endptr != 0 ||
461  last_core > std::thread::hardware_concurrency())
462  {
463  std::cerr << "Thrill: environment variable"
464  << " THRILL_CORE_OFFSET=" << env_core_offset
465  << " is not a valid number of cores to skip for pinning."
466  << std::endl;
467  return -1;
468  }
469  }
470 
471  // detect memory config
472 
473  MemoryConfig mem_config;
474  if (mem_config.setup_detect() < 0) return -1;
475  mem_config.print(workers_per_host);
476 
477  // okay, configuration is good.
478 
479  std::cerr << "Thrill: running locally with " << num_hosts
480  << " test hosts and " << workers_per_host << " workers per host"
481  << " in a local " << backend << " network." << std::endl;
482 
483  if (!Initialize()) return -1;
484 
485  RunLoopbackThreads<NetGroup>(
486  mem_config, num_hosts, workers_per_host, core_offset, job_startpoint);
487 
488  if (!Deinitialize()) return -1;
489 
490  return 0;
491 }
492 
493 #if THRILL_HAVE_NET_TCP
494 static inline
495 int RunBackendTcp(const std::function<void(Context&)>& job_startpoint) {
496 
497  char* endptr;
498 
499  // select environment variables
500 
501  const char* str_rank = "THRILL_RANK";
502  const char* env_rank = getenv(str_rank);
503 
504  if (env_rank == nullptr) {
505  // take SLURM_PROCID if THRILL_RANK is not set
506  str_rank = "SLURM_PROCID";
507  env_rank = getenv(str_rank);
508  }
509 
510  const char* env_hostlist = getenv("THRILL_HOSTLIST");
511 
512  // parse environment variables
513 
514  size_t my_host_rank = 0;
515 
516  if (env_rank != nullptr && *env_rank != 0) {
517  my_host_rank = std::strtoul(env_rank, &endptr, 10);
518 
519  if (endptr == nullptr || *endptr != 0) {
520  std::cerr << "Thrill: environment variable "
521  << str_rank << '=' << env_rank
522  << " is not a valid number."
523  << std::endl;
524  return -1;
525  }
526  }
527  else {
528  std::cerr << "Thrill: environment variable THRILL_RANK"
529  << " is required for tcp network backend."
530  << std::endl;
531  return -1;
532  }
533 
534  std::vector<std::string> hostlist;
535 
536  if (env_hostlist != nullptr && *env_hostlist != 0) {
537  // first try to split by spaces, then by commas
538  std::vector<std::string> list = tlx::split(' ', env_hostlist);
539 
540  if (list.size() == 1) {
541  tlx::split(&list, ',', env_hostlist);
542  }
543 
544  for (const std::string& host : list) {
545  // skip empty splits
546  if (host.empty()) continue;
547 
548  if (host.find(':') == std::string::npos) {
549  std::cerr << "Thrill: invalid address \"" << host << "\""
550  << "in THRILL_HOSTLIST. It must contain a port number."
551  << std::endl;
552  return -1;
553  }
554 
555  hostlist.push_back(host);
556  }
557 
558  if (my_host_rank >= hostlist.size()) {
559  std::cerr << "Thrill: endpoint list (" << list.size() << " entries) "
560  << "does not include my host_rank (" << my_host_rank << ")"
561  << std::endl;
562  return -1;
563  }
564  }
565  else {
566  std::cerr << "Thrill: environment variable THRILL_HOSTLIST"
567  << " is required for tcp network backend."
568  << std::endl;
569  return -1;
570  }
571 
572  // determine number of local worker threads per process
573 
574  const char* str_workers_per_host;
575  const char* env_workers_per_host;
576 
577  size_t workers_per_host = FindWorkersPerHost(
578  str_workers_per_host, env_workers_per_host);
579 
580  if (workers_per_host == 0)
581  return -1;
582 
583  // detect memory config
584 
585  MemoryConfig mem_config;
586  if (mem_config.setup_detect() < 0) return -1;
587  mem_config.print(workers_per_host);
588 
589  // okay, configuration is good.
590 
591  std::cerr << "Thrill: running in tcp network with " << hostlist.size()
592  << " hosts and " << workers_per_host << " workers per host"
593  << " with " << common::GetHostname()
594  << " as rank " << my_host_rank << " and endpoints";
595  for (const std::string& ep : hostlist)
596  std::cerr << ' ' << ep;
597  std::cerr << std::endl;
598 
599  if (!Initialize()) return -1;
600 
601  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
602 
603  // construct three TCP network groups
604  auto select_dispatcher = std::make_unique<net::tcp::SelectDispatcher>();
605 
606  std::array<std::unique_ptr<net::tcp::Group>, kGroupCount> groups;
608  *select_dispatcher, my_host_rank, hostlist,
609  groups.data(), net::Manager::kGroupCount);
610 
611  std::array<net::GroupPtr, kGroupCount> host_groups = {
612  { std::move(groups[0]), std::move(groups[1]) }
613  };
614 
615  // construct HostContext
616 
617  auto dispatcher = std::make_unique<net::DispatcherThread>(
618  std::move(select_dispatcher), my_host_rank);
619 
620  HostContext host_context(
621  0, mem_config,
622  std::move(dispatcher), std::move(host_groups), workers_per_host);
623 
624  std::vector<std::thread> threads(workers_per_host);
625 
626  for (size_t worker = 0; worker < workers_per_host; worker++) {
627  threads[worker] = common::CreateThread(
628  [&host_context, &job_startpoint, worker] {
629  Context ctx(host_context, worker);
630  common::NameThisThread("worker " + std::to_string(worker));
631 
632  ctx.Launch(job_startpoint);
633  });
634  common::SetCpuAffinity(threads[worker], worker);
635  }
636 
637  // join worker threads
638  for (size_t i = 0; i < workers_per_host; i++) {
639  threads[i].join();
640  }
641 
642  if (!Deinitialize()) return -1;
643 
644  return 0;
645 }
646 #endif
647 
648 #if THRILL_HAVE_NET_MPI
649 static inline
650 int RunBackendMpi(const std::function<void(Context&)>& job_startpoint) {
651 
652  // determine number of local worker threads per MPI process
653 
654  const char* str_workers_per_host;
655  const char* env_workers_per_host;
656 
657  size_t workers_per_host = FindWorkersPerHost(
658  str_workers_per_host, env_workers_per_host);
659 
660  if (workers_per_host == 0)
661  return -1;
662 
663  // reserve one thread for MPI net::Dispatcher which runs a busy-waiting loop
664 
665  if (workers_per_host == 1) {
666  std::cerr << "Thrill: environment variable"
667  << ' ' << str_workers_per_host
668  << '=' << env_workers_per_host
669  << " is not recommended, as one thread is used exclusively"
670  << " for MPI communication."
671  << std::endl;
672  }
673  else {
674  --workers_per_host;
675  }
676 
677  // detect memory config
678 
679  MemoryConfig mem_config;
680  if (mem_config.setup_detect() < 0) return -1;
681  mem_config.print(workers_per_host);
682 
683  // okay, configuration is good.
684 
685  size_t num_hosts = net::mpi::NumMpiProcesses();
686  size_t mpi_rank = net::mpi::MpiRank();
687  std::string hostname = common::GetHostname();
688 
689  std::cerr << "Thrill: running in MPI network with " << num_hosts
690  << " hosts and " << workers_per_host << "+1 workers per host"
691  << " with " << hostname << " as rank " << mpi_rank << "."
692  << std::endl;
693 
694  if (!Initialize()) return -1;
695 
696  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
697 
698  // construct three MPI network groups
699  auto dispatcher = std::make_unique<net::DispatcherThread>(
700  std::make_unique<net::mpi::Dispatcher>(num_hosts), mpi_rank);
701 
702  std::array<std::unique_ptr<net::mpi::Group>, kGroupCount> groups;
703  net::mpi::Construct(num_hosts, *dispatcher, groups.data(), kGroupCount);
704 
705  std::array<net::GroupPtr, kGroupCount> host_groups = {
706  { std::move(groups[0]), std::move(groups[1]) }
707  };
708 
709  // construct HostContext
710  HostContext host_context(
711  0, mem_config,
712  std::move(dispatcher), std::move(host_groups), workers_per_host);
713 
714  // launch worker threads
715  std::vector<std::thread> threads(workers_per_host);
716 
717  for (size_t worker = 0; worker < workers_per_host; worker++) {
718  threads[worker] = common::CreateThread(
719  [&host_context, &job_startpoint, worker] {
720  Context ctx(host_context, worker);
721  common::NameThisThread("host " + std::to_string(ctx.host_rank())
722  + " worker " + std::to_string(worker));
723 
724  ctx.Launch(job_startpoint);
725  });
726  common::SetCpuAffinity(threads[worker], worker);
727  }
728 
729  // join worker threads
730  for (size_t i = 0; i < workers_per_host; i++) {
731  threads[i].join();
732  }
733 
734  if (!Deinitialize()) return -1;
735 
736  return 0;
737 }
738 #endif
739 
740 #if THRILL_HAVE_NET_IB
741 static inline
742 int RunBackendIb(const std::function<void(Context&)>& job_startpoint) {
743 
744  // determine number of local worker threads per IB/MPI process
745 
746  const char* str_workers_per_host;
747  const char* env_workers_per_host;
748 
749  size_t workers_per_host = FindWorkersPerHost(
750  str_workers_per_host, env_workers_per_host);
751 
752  if (workers_per_host == 0)
753  return -1;
754 
755  // detect memory config
756 
757  MemoryConfig mem_config;
758  if (mem_config.setup_detect() < 0) return -1;
759  mem_config.print(workers_per_host);
760 
761  // okay, configuration is good.
762 
763  size_t num_hosts = net::ib::NumMpiProcesses();
764  size_t mpi_rank = net::ib::MpiRank();
765 
766  std::cerr << "Thrill: running in IB/MPI network with " << num_hosts
767  << " hosts and " << workers_per_host << " workers per host"
768  << " with " << common::GetHostname()
769  << " as rank " << mpi_rank << "."
770  << std::endl;
771 
772  if (!Initialize()) return -1;
773 
774  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
775 
776  // construct two MPI network groups
777  std::array<std::unique_ptr<net::ib::Group>, kGroupCount> groups;
778  net::ib::Construct(num_hosts, groups.data(), kGroupCount);
779 
780  std::array<net::GroupPtr, kGroupCount> host_groups = {
781  { std::move(groups[0]), std::move(groups[1]) }
782  };
783 
784  // construct HostContext
785  HostContext host_context(
786  0, mem_config, std::move(host_groups), workers_per_host);
787 
788  // launch worker threads
789  std::vector<std::thread> threads(workers_per_host);
790 
791  for (size_t worker = 0; worker < workers_per_host; worker++) {
792  threads[worker] = common::CreateThread(
793  [&host_context, &job_startpoint, worker] {
794  Context ctx(host_context, worker);
795  common::NameThisThread("host " + std::to_string(ctx.host_rank())
796  + " worker " + std::to_string(worker));
797 
798  ctx.Launch(job_startpoint);
799  });
800  common::SetCpuAffinity(threads[worker], worker);
801  }
802 
803  // join worker threads
804  for (size_t i = 0; i < workers_per_host; i++) {
805  threads[i].join();
806  }
807 
808  if (!Deinitialize()) return -1;
809 
810  return 0;
811 }
812 #endif
813 
814 int RunNotSupported(const char* env_net) {
815  std::cerr << "Thrill: network backend " << env_net
816  << " is not supported by this binary." << std::endl;
817  return -1;
818 }
819 
820 static inline
821 const char * DetectNetBackend() {
822  // detect openmpi and intel mpi run, add others as well.
823  if (getenv("OMPI_COMM_WORLD_SIZE") != nullptr ||
824  getenv("I_MPI_INFO_NP") != nullptr) {
825 #if THRILL_HAVE_NET_IB
826  return "ib";
827 #elif THRILL_HAVE_NET_MPI
828  return "mpi";
829 #else
830  std::cerr << "Thrill: MPI environment detected, but network backend mpi"
831  << " is not supported by this binary." << std::endl;
832  return nullptr;
833 #endif
834  }
835 #if defined(_MSC_VER)
836  return "mock";
837 #else
838  const char* env_rank = getenv("THRILL_RANK");
839  const char* env_hostlist = getenv("THRILL_HOSTLIST");
840 
841  if (env_rank != nullptr || env_hostlist != nullptr)
842  return "tcp";
843  else
844  return "local";
845 #endif
846 }
847 
849 
850  const char* env_die_with_parent = getenv("THRILL_DIE_WITH_PARENT");
851  if (env_die_with_parent == nullptr || *env_die_with_parent == 0) return 0;
852 
853  char* endptr;
854 
855  long die_with_parent = std::strtol(env_die_with_parent, &endptr, 10);
856  if (endptr == nullptr || *endptr != 0 ||
857  (die_with_parent != 0 && die_with_parent != 1)) {
858  std::cerr << "Thrill: environment variable"
859  << " THRILL_DIE_WITH_PARENT=" << env_die_with_parent
860  << " is not either 0 or 1."
861  << std::endl;
862  return -1;
863  }
864 
865  if (die_with_parent == 0) return 0;
866 
867 #if __linux__
868  if (prctl(PR_SET_PDEATHSIG, SIGTERM) != 0) // NOLINT
869  throw common::ErrnoException("Error calling prctl(PR_SET_PDEATHSIG)");
870  return 1;
871 #else
872  std::cerr << "Thrill: DIE_WITH_PARENT is not supported on this platform.\n"
873  << "Please submit a patch."
874  << std::endl;
875  return 0;
876 #endif
877 }
878 
880 
881  const char* env_unlink_binary = getenv("THRILL_UNLINK_BINARY");
882  if (env_unlink_binary == nullptr || *env_unlink_binary == 0) return 0;
883 
884  if (unlink(env_unlink_binary) != 0) {
886  "Error calling unlink binary \""
887  + std::string(env_unlink_binary) + "\"");
888  }
889 
890  return 0;
891 }
892 
893 int Run(const std::function<void(Context&)>& job_startpoint) {
894 
895  if (RunCheckDieWithParent() < 0)
896  return -1;
897 
898  if (RunCheckUnlinkBinary() < 0)
899  return -1;
900 
901  // parse environment: THRILL_NET
902  const char* env_net = getenv("THRILL_NET");
903 
904  // if no backend configured: automatically select one.
905  if (env_net == nullptr || *env_net == 0) {
906  env_net = DetectNetBackend();
907  if (env_net == nullptr) return -1;
908  }
909 
910  // run with selected backend
911  if (strcmp(env_net, "mock") == 0) {
912  // mock network backend
913  return RunBackendLoopback<net::mock::Group>("mock", job_startpoint);
914  }
915 
916  if (strcmp(env_net, "local") == 0) {
917 #if THRILL_HAVE_NET_TCP
918  // tcp loopback network backend
919  return RunBackendLoopback<net::tcp::Group>("tcp", job_startpoint);
920 #else
921  return RunNotSupported(env_net);
922 #endif
923  }
924 
925  if (strcmp(env_net, "tcp") == 0) {
926 #if THRILL_HAVE_NET_TCP
927  // real tcp network backend
928  return RunBackendTcp(job_startpoint);
929 #else
930  return RunNotSupported(env_net);
931 #endif
932  }
933 
934  if (strcmp(env_net, "mpi") == 0) {
935 #if THRILL_HAVE_NET_MPI
936  // mpi network backend
937  return RunBackendMpi(job_startpoint);
938 #else
939  return RunNotSupported(env_net);
940 #endif
941  }
942 
943  if (strcmp(env_net, "ib") == 0) {
944 #if THRILL_HAVE_NET_IB
945  // ib/mpi network backend
946  return RunBackendIb(job_startpoint);
947 #else
948  return RunNotSupported(env_net);
949 #endif
950  }
951 
952  std::cerr << "Thrill: network backend " << env_net << " is unknown."
953  << std::endl;
954  return -1;
955 }
956 
957 /******************************************************************************/
958 // MemoryConfig
959 
960 void MemoryConfig::setup(size_t ram) {
961  ram_ = ram;
962  apply();
963 }
964 
966 
967  // determine amount of physical RAM or take user's limit
968 
969  const char* env_ram = getenv("THRILL_RAM");
970 
971  if (env_ram != nullptr && *env_ram != 0) {
972  uint64_t ram64;
973  if (!tlx::parse_si_iec_units(env_ram, &ram64)) {
974  std::cerr << "Thrill: environment variable"
975  << " THRILL_RAM=" << env_ram
976  << " is not a valid amount of RAM memory."
977  << std::endl;
978  return -1;
979  }
980  ram_ = static_cast<size_t>(ram64);
981  }
982  else {
983  // detect amount of physical memory on system
984 #if defined(_MSC_VER)
985  MEMORYSTATUSEX memstx;
986  memstx.dwLength = sizeof(memstx);
987  GlobalMemoryStatusEx(&memstx);
988 
989  ram_ = memstx.ullTotalPhys;
990 #elif __APPLE__
991  int mib[2];
992  int64_t physical_memory;
993  size_t length;
994 
995  // Get the physical memory size
996  mib[0] = CTL_HW;
997  mib[1] = HW_MEMSIZE;
998  length = sizeof(physical_memory);
999  sysctl(mib, 2, &physical_memory, &length, nullptr, 0);
1000  ram_ = static_cast<size_t>(physical_memory);
1001 #else
1002  ram_ = sysconf(_SC_PHYS_PAGES) * static_cast<size_t>(sysconf(_SC_PAGESIZE));
1003 #endif
1004 
1005 #if __linux__
1006  // use getrlimit() to check user limit on address space
1007  struct rlimit rl; // NOLINT
1008  if (getrlimit(RLIMIT_AS, &rl) == 0) {
1009  if (rl.rlim_cur != 0 && rl.rlim_cur * 3 / 4 < ram_) {
1010  ram_ = rl.rlim_cur * 3 / 4;
1011  }
1012  }
1013  else {
1014  sLOG1 << "getrlimit(): " << strerror(errno);
1015  }
1016 #endif
1017  }
1018 
1019  apply();
1020 
1021  return 0;
1022 }
1023 
1025  // divide up ram_
1026 
1027  ram_workers_ = ram_ / 3;
1028  ram_block_pool_hard_ = ram_ / 3;
1031 
1032  // set memory limit, only BlockPool is excluded from malloc tracking, as
1033  // only it uses bypassing allocators.
1035 }
1036 
1037 MemoryConfig MemoryConfig::divide(size_t hosts) const {
1038 
1039  MemoryConfig mc = *this;
1040  mc.ram_ /= hosts;
1041  mc.ram_block_pool_hard_ /= hosts;
1042  mc.ram_block_pool_soft_ /= hosts;
1043  mc.ram_workers_ /= hosts;
1044  // free floating memory is not divided by host, as it is measured overall
1045 
1046  return mc;
1047 }
1048 
1049 void MemoryConfig::print(size_t workers_per_host) const {
1050  if (!verbose_) return;
1051 
1052  std::cerr
1053  << "Thrill: using "
1054  << tlx::format_iec_units(ram_) << "B RAM total,"
1055  << " BlockPool=" << tlx::format_iec_units(ram_block_pool_hard_) << "B,"
1056  << " workers="
1057  << tlx::format_iec_units(ram_workers_ / workers_per_host) << "B,"
1058  << " floating=" << tlx::format_iec_units(ram_floating_) << "B."
1059  << std::endl;
1060 }
1061 
1062 /******************************************************************************/
1063 // HostContext methods
1064 
1066  size_t local_host_id,
1067  const MemoryConfig& mem_config,
1068  std::unique_ptr<net::DispatcherThread> dispatcher,
1069  std::array<net::GroupPtr, net::Manager::kGroupCount>&& groups,
1070  size_t workers_per_host)
1071  : mem_config_(mem_config),
1072  base_logger_(MakeHostLogPath(groups[0]->my_host_rank())),
1073  logger_(&base_logger_, "host_rank", groups[0]->my_host_rank()),
1074  profiler_(std::make_unique<common::ProfileThread>()),
1075  local_host_id_(local_host_id),
1076  workers_per_host_(workers_per_host),
1077  dispatcher_(std::move(dispatcher)),
1078  net_manager_(std::move(groups), logger_) {
1079 
1080  // write command line parameters to json log
1082 
1085 
1086  // run memory profiler only on local host 0 (especially for test runs)
1087  if (local_host_id == 0)
1089 }
1090 
1092  // stop dispatcher _before_ stopping multiplexer
1093  dispatcher_->Terminate();
1094 }
1095 
1097  const char* env_log = getenv("THRILL_LOG");
1098  if (env_log == nullptr) {
1099  if (host_rank == 0 && mem_config().verbose_) {
1100  std::cerr << "Thrill: no THRILL_LOG was found, "
1101  << "so no json log is written."
1102  << std::endl;
1103  }
1104  return std::string();
1105  }
1106 
1107  std::string output = env_log;
1108  if (output == "" || output == "-")
1109  return std::string();
1110  if (output == "/dev/stdout")
1111  return output;
1112  if (output == "stdout")
1113  return "/dev/stdout";
1114 
1115  return output + "-host-" + std::to_string(host_rank) + ".json";
1116 }
1117 
1118 /******************************************************************************/
1119 // Context methods
1120 
1121 Context::Context(HostContext& host_context, size_t local_worker_id)
1122  : local_host_id_(host_context.local_host_id()),
1123  local_worker_id_(local_worker_id),
1124  workers_per_host_(host_context.workers_per_host()),
1125  mem_limit_(host_context.worker_mem_limit()),
1126  mem_config_(host_context.mem_config()),
1127  mem_manager_(host_context.mem_manager()),
1128  net_manager_(host_context.net_manager()),
1129  flow_manager_(host_context.flow_manager()),
1130  block_pool_(host_context.block_pool()),
1131  multiplexer_(host_context.data_multiplexer()),
1132  rng_(std::random_device { }
1133  () + (local_worker_id_ << 16)),
1134  base_logger_(&host_context.base_logger_) {
1135  assert(local_worker_id < workers_per_host());
1136 }
1137 
1139  return GetFile(dia != nullptr ? dia->dia_id() : 0);
1140 }
1141 
1143  return tlx::make_counting<data::File>(
1144  block_pool_, local_worker_id_, dia_id);
1145 }
1146 
1148  return GetFilePtr(dia != nullptr ? dia->dia_id() : 0);
1149 }
1150 
1153 }
1154 
1156  return GetNewCatStream(dia != nullptr ? dia->dia_id() : 0);
1157 }
1158 
1161 }
1162 
1164  return GetNewMixStream(dia != nullptr ? dia->dia_id() : 0);
1165 }
1166 
1167 template <>
1168 data::CatStreamPtr Context::GetNewStream<data::CatStream>(size_t dia_id) {
1169  return GetNewCatStream(dia_id);
1170 }
1171 
1172 template <>
1173 data::MixStreamPtr Context::GetNewStream<data::MixStream>(size_t dia_id) {
1174  return GetNewMixStream(dia_id);
1175 }
1176 
1177 struct OverallStats {
1178 
1179  //! overall run time
1180  double runtime;
1181 
1182  //! maximum ByteBlock allocation on all workers
1183  size_t max_block_bytes;
1184 
1185  //! network traffic performed by net layer
1186  size_t net_traffic_tx, net_traffic_rx;
1187 
1188  //! I/O volume performed by io layer
1189  size_t io_volume;
1190 
1191  //! maximum external memory allocation
1192  size_t io_max_allocation;
1193 
1194  friend std::ostream& operator << (std::ostream& os, const OverallStats& c) {
1195  return os << "[OverallStats"
1196  << " runtime=" << c.runtime
1197  << " max_block_bytes=" << c.max_block_bytes
1198  << " net_traffic_tx=" << c.net_traffic_tx
1199  << " net_traffic_rx=" << c.net_traffic_rx
1200  << " io_volume=" << c.io_volume
1201  << " io_max_allocation=" << c.io_max_allocation
1202  << "]";
1203  }
1204 
1205  OverallStats operator + (const OverallStats& b) const {
1206  OverallStats r;
1207  r.runtime = std::max(runtime, b.runtime);
1208  r.max_block_bytes = max_block_bytes + b.max_block_bytes;
1209  r.net_traffic_tx = net_traffic_tx + b.net_traffic_tx;
1210  r.net_traffic_rx = net_traffic_rx + b.net_traffic_rx;
1211  r.io_volume = io_volume + b.io_volume;
1212  r.io_max_allocation = std::max(io_max_allocation, b.io_max_allocation);
1213  return r;
1214  }
1215 };
1216 
1217 void Context::Launch(const std::function<void(Context&)>& job_startpoint) {
1218  logger_ << "class" << "Context"
1219  << "event" << "job-start";
1220 
1221  common::StatsTimerStart overall_timer;
1222 
1223  try {
1224  job_startpoint(*this);
1225  }
1226  catch (std::exception& e) {
1227  LOG1 << "worker " << my_rank() << " threw " << typeid(e).name();
1228  LOG1 << " what(): " << e.what();
1229 
1230  logger_ << "class" << "Context"
1231  << "event" << "job-exception"
1232  << "exception" << typeid(e).name()
1233  << "what" << e.what();
1234  throw;
1235  }
1236 
1237  logger_ << "class" << "Context"
1238  << "event" << "job-done"
1239  << "elapsed" << overall_timer;
1240 
1241  overall_timer.Stop();
1242 
1243  // collect overall statistics
1244  OverallStats stats;
1245  stats.runtime = overall_timer.SecondsDouble();
1246 
1247  stats.max_block_bytes =
1249 
1250  stats.net_traffic_tx = local_worker_id_ == 0 ? net_manager_.Traffic().tx : 0;
1251  stats.net_traffic_rx = local_worker_id_ == 0 ? net_manager_.Traffic().rx : 0;
1252 
1253  if (local_host_id_ == 0 && local_worker_id_ == 0) {
1255  stats.io_volume = io_stats.get_read_bytes() + io_stats.get_write_bytes();
1256  stats.io_max_allocation =
1258  }
1259  else {
1260  stats.io_volume = 0;
1261  stats.io_max_allocation = 0;
1262  }
1263 
1264  LOG0 << stats;
1265 
1266  stats = net.Reduce(stats);
1267 
1268  if (my_rank() == 0) {
1269  using tlx::format_iec_units;
1270 
1271  if (stats.net_traffic_rx != stats.net_traffic_tx)
1272  LOG1 << "Manager::Traffic() tx/rx asymmetry = "
1273  << tlx::abs_diff(stats.net_traffic_tx, stats.net_traffic_rx);
1274 
1275  if (mem_config().verbose_) {
1276  std::cerr
1277  << "Thrill:"
1278  << " ran " << stats.runtime << "s with max "
1279  << format_iec_units(stats.max_block_bytes) << "B in DIA Blocks, "
1280  << format_iec_units(stats.net_traffic_tx) << "B network traffic, "
1281  << format_iec_units(stats.io_volume) << "B disk I/O, and "
1282  << format_iec_units(stats.io_max_allocation) << "B max disk use."
1283  << std::endl;
1284  }
1285 
1286  logger_ << "class" << "Context"
1287  << "event" << "summary"
1288  << "runtime" << stats.runtime
1289  << "net_traffic" << stats.net_traffic_tx
1290  << "io_volume" << stats.io_volume
1291  << "io_max_allocation" << stats.io_max_allocation;
1292  }
1293 }
1294 
1295 } // namespace api
1296 } // namespace thrill
1297 
1298 /******************************************************************************/
CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id)
Request next stream.
size_t ram_
total amount of physical ram detected or THRILL_RAM
Definition: context.hpp:63
size_t local_host_id_
id among all local hosts (in test program runs)
Definition: context.hpp:395
net::FlowControlChannel & net
Definition: context.hpp:446
size_t ram_floating_
remaining free-floating RAM used for user and Thrill data structures.
Definition: context.hpp:76
MemoryConfig divide(size_t hosts) const
Definition: context.cpp:1037
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
size_t max_total_bytes() noexcept
Maximum total number of bytes allocated in blocks of this block pool.
Definition: block_pool.cpp:896
The central object of a mock network: the Group containing links to other mock Group forming the netw...
Definition: group.hpp:116
void LogCmdlineParams(JsonLogger &logger)
Definition: porting.cpp:68
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
HostContext(size_t local_host_id, const MemoryConfig &mem_config, std::unique_ptr< net::DispatcherThread > dispatcher, std::array< net::GroupPtr, net::Manager::kGroupCount > &&groups, size_t workers_per_host)
constructor from existing net Groups. Used by the construction methods.
Definition: context.cpp:1065
void StartMemProfiler(common::ProfileThread &sched, common::JsonLogger &logger)
launch profiler task
void Initialize()
Initialize VFS layer.
Definition: file_io.cpp:35
net::Manager & net_manager_
net::Manager instance that is shared among workers
Definition: context.hpp:413
std::string MakeHostLogPath(size_t host_rank)
create host log
Definition: context.cpp:1096
#define LOG1
Definition: logger.hpp:28
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
Collection of NetConnections to workers, allows point-to-point client communication and simple collec...
Definition: group.hpp:43
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:893
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
An Exception which is thrown on system errors and contains errno information.
void set_memory_limit_indication(ssize_t size)
common::JsonLogger logger_
Definition: context.hpp:158
const MemoryConfig & mem_config() const
host-global memory config
Definition: context.hpp:329
int RunCheckDieWithParent()
Check environment variable THRILL_DIE_WITH_PARENT and enable process flag: this is useful for ssh/inv...
Definition: context.cpp:848
int setup_detect()
detect memory configuration from environment
Definition: context.cpp:965
#define sLOG1
Definition: logger.hpp:38
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
size_t workers_per_host() const
number of workers per host (all have the same).
Definition: context.hpp:115
void NameThisThread(const std::string &name)
Defines a name for the current thread, only if no name was set previously.
Definition: logger.cpp:40
The DIABase is the untyped super class of DIANode.
Definition: dia_base.hpp:87
static by_string to_string(int val)
convert to string
static bool Deinitialize()
Definition: context.cpp:282
static std::vector< std::unique_ptr< HostContext > > ConstructLoopbackHostContexts(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host)
Generic network constructor for net backends supporting loopback tests.
Definition: context.cpp:94
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
Definition: split.cpp:20
int RunCheckUnlinkBinary()
Check environment variable THRILL_UNLINK_BINARY and unlink given program path: this is useful for ssh...
Definition: context.cpp:879
data::BlockPool & block_pool_
data block pool
Definition: context.hpp:419
size_t workers_per_host() const
Returns the number of workers that is hosted on each host.
Definition: context.hpp:238
static void RunLoopbackThreads(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, size_t core_offset, const std::function< void(Context &)> &job_startpoint)
Generic runner for backends supporting loopback tests.
Definition: context.cpp:135
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1151
size_t start_block_size
starting size of blocks in BlockWriter.
Definition: byte_block.cpp:24
int setenv(const char *name, const char *value, int overwrite)
setenv - change or add an environment variable Windows porting madness because setenv() is apparently...
Definition: setenv.cpp:31
~HostContext()
destructor
Definition: context.cpp:1091
std::string GetHostname()
get hostname
Definition: porting.cpp:143
void print(size_t workers_per_host) const
Definition: context.cpp:1049
bool parse_si_iec_units(const char *str, uint64_t *out_size, char default_unit)
Parse a string like "343KB" or "44 GiB" into the corresponding size in bytes.
void RunLocalSameThread(const std::function< void(Context &)> &job_startpoint)
Runs the given job_startpoint within the same thread with a test network –> run test with one host an...
Definition: context.cpp:375
size_t local_worker_id_
number of this host context, 0..p-1, within this host
Definition: context.hpp:398
unique_ptr< T > make_unique(Manager &manager, Args &&...args)
make_unique with Manager tracking
Definition: allocator.hpp:212
external_size_type get_read_bytes() const
Definition: iostats.cpp:527
size_t ram_block_pool_hard_
amount of RAM dedicated to data::BlockPool – hard limit
Definition: context.hpp:66
void RunLocalMock(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, const std::function< void(Context &)> &job_startpoint)
Function to run a number of mock hosts as locally independent threads, which communicate via internal...
Definition: context.cpp:298
void StartLinuxProcStatsProfiler(ProfileThread &, JsonLogger &)
launch profiler task
T abs_diff(const T &a, const T &b)
absolute difference, which also works for unsigned types
Definition: abs_diff.hpp:24
static bool SetupBlockSize()
Definition: context.cpp:178
void Deinitialize()
Deinitialize VFS layer.
Definition: file_io.cpp:40
common::JsonLogger logger_
Definition: context.hpp:462
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Reduce(const T &value, size_t root=0, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers to the given worker, provided a certain red...
int RunNotSupported(const char *env_net)
Definition: context.cpp:814
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:44
static instance_pointer get_instance()
Definition: singleton.hpp:44
net::Traffic Traffic() const
calculate overall traffic for final stats
Definition: group.cpp:67
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a test network with an underlying full mesh of local loopback stream sockets for testing...
Definition: group.cpp:36
data::FilePtr GetFilePtr(size_t dia_id)
Definition: context.cpp:1142
MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id)
Request next stream.
void RunLocalTests(const std::function< void(Context &)> &job_startpoint)
Helper Function to execute RunLocalMock() tests using mock networks in test suite for many different ...
Definition: context.cpp:318
size_t MpiRank()
Return the rank of this process in the MPI COMM WORLD.
Definition: group.cpp:714
void Construct(SelectDispatcher &dispatcher, size_t my_rank, const std::vector< std::string > &endpoints, std::unique_ptr< Group > *groups, size_t group_count)
Definition: construct.cpp:543
static size_t FindWorkersPerHost(const char *&str_workers_per_host, const char *&env_workers_per_host)
Definition: context.cpp:203
Context(HostContext &host_context, size_t local_worker_id)
Definition: context.cpp:1121
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
data::MixStreamPtr GetNewMixStream(size_t dia_id)
Definition: context.cpp:1159
void Launch(const std::function< void(Context &)> &job_startpoint)
method used to launch a job's main procedure. it wraps it in log output.
Definition: context.cpp:1217
data::Multiplexer & multiplexer_
data::Multiplexer instance that is shared among workers
Definition: context.hpp:422
size_t ram_block_pool_soft_
amount of RAM dedicated to data::BlockPool – soft limit
Definition: context.hpp:69
static constexpr size_t kGroupCount
The count of net::Groups to initialize.
Definition: manager.hpp:61
The HostContext contains all data structures shared among workers on the same host.
Definition: context.hpp:90
size_t local_worker_id() const
Definition: context.hpp:263
void setup(size_t ram)
setup memory size
Definition: context.cpp:960
uint64_t maximum_allocation() const
return maximum number of bytes allocated during program run.
static int RunBackendLoopback(const char *backend, const std::function< void(Context &)> &job_startpoint)
Run() implementation which uses a loopback net backend ("mock" or "tcp").
Definition: context.cpp:418
net::tcp::Group TestGroup
Definition: context.cpp:295
static std::vector< std::unique_ptr< HostContext > > ConstructLoopback(size_t num_hosts, size_t workers_per_host)
Construct a number of mock hosts running in this process.
Definition: context.cpp:307
MemoryConfig & mem_config()
host-global memory config
Definition: context.hpp:123
size_t rx
received bytes
Definition: manager.hpp:37
size_t NumMpiProcesses()
Return the number of MPI processes.
Definition: group.cpp:701
static bool Initialize()
Definition: context.cpp:273
MemoryConfig mem_config_
memory configuration
Definition: context.hpp:146
std::unique_ptr< common::ProfileThread > profiler_
thread for scheduling profiling methods for statistical output
Definition: context.hpp:161
bool enable_proc_profiler_
enable Linux /proc stats profiler (default: on)
Definition: context.hpp:82
std::unique_ptr< net::DispatcherThread > dispatcher_
main host network dispatcher thread backend
Definition: context.hpp:176
void SetCpuAffinity(std::thread &thread, size_t cpu_id)
set cpu/core affinity of a thread
Definition: porting.cpp:111
std::string format_iec_units(uint64_t number, int precision)
Format number as something like 1 TiB.
static const char * DetectNetBackend()
Definition: context.cpp:821
bool verbose_
StageBuilder verbosity flag.
Definition: context.hpp:79
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:324
std::ostream & operator<<(std::ostream &os, const DIABase &d)
make ostream-able.
Definition: dia_base.cpp:449
external_size_type get_write_bytes() const
Definition: iostats.cpp:543
const size_t & dia_id() const
return unique id of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
bool Construct(size_t group_size, DispatcherThread &dispatcher, std::unique_ptr< Group > *groups, size_t group_count)
Construct Group which connects to peers using MPI.
Definition: group.cpp:675
size_t tx
transmitted bytes
Definition: manager.hpp:35