Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
context.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/context.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/api/context.hpp>
13 
14 #include <thrill/api/dia_base.hpp>
16 #include <thrill/common/logger.hpp>
17 #include <thrill/common/math.hpp>
20 #include <thrill/common/string.hpp>
22 #include <thrill/vfs/file_io.hpp>
23 
24 #include <foxxll/io/iostats.hpp>
25 #include <tlx/math/abs_diff.hpp>
26 #include <tlx/port/setenv.hpp>
29 #include <tlx/string/split.hpp>
30 
31 // mock net backend is always available -tb :)
33 
34 #if THRILL_HAVE_NET_TCP
37 #endif
38 
39 #if THRILL_HAVE_NET_MPI
41 #include <thrill/net/mpi/group.hpp>
42 #endif
43 
44 #if THRILL_HAVE_NET_IB
45 #include <thrill/net/ib/group.hpp>
46 #endif
47 
48 #if __linux__
49 
50 // linux-specific process control
51 #include <sys/prctl.h>
52 
53 // for calling getrlimit() to determine memory limit
54 #include <sys/resource.h>
55 #include <sys/time.h>
56 
57 #endif
58 
59 #if __APPLE__
60 
61 // for sysctl()
62 #include <sys/sysctl.h>
63 #include <sys/types.h>
64 
65 #endif
66 
67 #if defined(_MSC_VER)
68 
69 // for detecting amount of physical memory
70 #include <windows.h>
71 
72 #endif
73 
74 #include <algorithm>
75 #include <csignal>
76 #include <iostream>
77 #include <memory>
78 #include <string>
79 #include <thread>
80 #include <tuple>
81 #include <utility>
82 #include <vector>
83 
84 namespace thrill {
85 namespace api {
86 
87 /******************************************************************************/
88 // Generic Network Construction
89 
90 //! Generic network constructor for net backends supporting loopback tests.
91 template <typename NetGroup>
92 static inline
93 std::vector<std::unique_ptr<HostContext> >
95  const MemoryConfig& mem_config,
96  size_t num_hosts, size_t workers_per_host) {
97 
98  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
99 
100  // construct three full mesh loopback cliques, deliver net::Groups.
101  std::array<std::vector<std::unique_ptr<NetGroup> >, kGroupCount> group;
102 
103  for (size_t g = 0; g < kGroupCount; ++g) {
104  group[g] = NetGroup::ConstructLoopbackMesh(num_hosts);
105  }
106 
107  std::vector<std::unique_ptr<net::DispatcherThread> > dispatcher;
108  for (size_t h = 0; h < num_hosts; ++h) {
109  dispatcher.emplace_back(
110  std::make_unique<net::DispatcherThread>(
111  std::make_unique<typename NetGroup::Dispatcher>(), h));
112  }
113 
114  // construct host context
115 
116  std::vector<std::unique_ptr<HostContext> > host_context;
117 
118  for (size_t h = 0; h < num_hosts; h++) {
119  std::array<net::GroupPtr, kGroupCount> host_group = {
120  { std::move(group[0][h]), std::move(group[1][h]) }
121  };
122 
123  host_context.emplace_back(
124  std::make_unique<HostContext>(
125  h, mem_config, std::move(dispatcher[h]),
126  std::move(host_group), workers_per_host));
127  }
128 
129  return host_context;
130 }
131 
132 //! Generic runner for backends supporting loopback tests.
133 template <typename NetGroup>
134 static inline void
136  const MemoryConfig& mem_config,
137  size_t num_hosts, size_t workers_per_host, size_t core_offset,
138  const std::function<void(Context&)>& job_startpoint) {
139 
140  MemoryConfig host_mem_config = mem_config.divide(num_hosts);
141  mem_config.print(workers_per_host);
142 
143  // construct a mock network of hosts
144  typename NetGroup::Dispatcher dispatcher;
145 
146  std::vector<std::unique_ptr<HostContext> > host_contexts =
147  ConstructLoopbackHostContexts<NetGroup>(
148  host_mem_config, num_hosts, workers_per_host);
149 
150  // launch thread for each of the workers on this host.
151  std::vector<std::thread> threads(num_hosts * workers_per_host);
152 
153  for (size_t host = 0; host < num_hosts; ++host) {
154  std::string log_prefix = "host " + std::to_string(host);
155  for (size_t worker = 0; worker < workers_per_host; ++worker) {
156  size_t id = host * workers_per_host + worker;
157  threads[id] = common::CreateThread(
158  [&host_contexts, &job_startpoint, host, worker, log_prefix] {
159  Context ctx(*host_contexts[host], worker);
161  log_prefix + " worker " + std::to_string(worker));
162 
163  ctx.Launch(job_startpoint);
164  });
165  common::SetCpuAffinity(threads[id], core_offset + id);
166  }
167  }
168 
169  // join worker threads
170  for (size_t i = 0; i < num_hosts * workers_per_host; i++) {
171  threads[i].join();
172  }
173 }
174 
175 /******************************************************************************/
176 // Other Configuration Initializations
177 
178 static inline bool SetupBlockSize() {
179 
180  const char* env_block_size = getenv("THRILL_BLOCK_SIZE");
181  if (env_block_size == nullptr || *env_block_size == 0) return true;
182 
183  char* endptr;
184  data::default_block_size = std::strtoul(env_block_size, &endptr, 10);
185 
186  if (endptr == nullptr || *endptr != 0 || data::default_block_size == 0) {
187  std::cerr << "Thrill: environment variable"
188  << " THRILL_BLOCK_SIZE=" << env_block_size
189  << " is not a valid number."
190  << std::endl;
191  return false;
192  }
193 
195 
196  std::cerr << "Thrill: setting default_block_size = "
198  << std::endl;
199 
200  return true;
201 }
202 
203 static inline size_t FindWorkersPerHost(
204  const char*& str_workers_per_host, const char*& env_workers_per_host) {
205 
206  char* endptr;
207 
208  // first check THRILL_WORKERS_PER_HOST
209 
210  str_workers_per_host = "THRILL_WORKERS_PER_HOST";
211  env_workers_per_host = getenv(str_workers_per_host);
212 
213  if (env_workers_per_host && *env_workers_per_host) {
214  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
215  if (!endptr || *endptr != 0 || result == 0) {
216  std::cerr << "Thrill: environment variable"
217  << ' ' << str_workers_per_host
218  << '=' << env_workers_per_host
219  << " is not a valid number of workers per host."
220  << std::endl;
221  return 0;
222  }
223  else {
224  return result;
225  }
226  }
227 
228  // second check: look for OMP_NUM_THREADS
229 
230  str_workers_per_host = "OMP_NUM_THREADS";
231  env_workers_per_host = getenv(str_workers_per_host);
232 
233  if (env_workers_per_host && *env_workers_per_host) {
234  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
235  if (!endptr || *endptr != 0 || result == 0) {
236  std::cerr << "Thrill: environment variable"
237  << ' ' << str_workers_per_host
238  << '=' << env_workers_per_host
239  << " is not a valid number of workers per host."
240  << std::endl;
241  // fall through, try next variable
242  }
243  else {
244  return result;
245  }
246  }
247 
248  // third check: look for SLURM_CPUS_ON_NODE
249 
250  str_workers_per_host = "SLURM_CPUS_ON_NODE";
251  env_workers_per_host = getenv(str_workers_per_host);
252 
253  if (env_workers_per_host && *env_workers_per_host) {
254  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
255  if (!endptr || *endptr != 0 || result == 0) {
256  std::cerr << "Thrill: environment variable"
257  << ' ' << str_workers_per_host
258  << '=' << env_workers_per_host
259  << " is not a valid number of workers per host."
260  << std::endl;
261  // fall through, try next variable
262  }
263  else {
264  return result;
265  }
266  }
267 
268  // last check: return std::thread::hardware_concurrency()
269 
270  return std::thread::hardware_concurrency();
271 }
272 
273 static inline bool Initialize() {
274 
275  if (!SetupBlockSize()) return false;
276 
277  vfs::Initialize();
278 
279  return true;
280 }
281 
282 static inline bool Deinitialize() {
283 
285 
286  return true;
287 }
288 
289 /******************************************************************************/
290 // Constructions using TestGroup (either mock or tcp-loopback) for local testing
291 
292 #if defined(_MSC_VER)
294 #else
296 #endif
297 
298 void RunLocalMock(const MemoryConfig& mem_config,
299  size_t num_hosts, size_t workers_per_host,
300  const std::function<void(Context&)>& job_startpoint) {
301 
302  return RunLoopbackThreads<TestGroup>(
303  mem_config, num_hosts, workers_per_host, 0, job_startpoint);
304 }
305 
306 std::vector<std::unique_ptr<HostContext> >
307 HostContext::ConstructLoopback(size_t num_hosts, size_t workers_per_host) {
308 
309  // set fixed amount of RAM for testing
311  mem_config.setup(4 * 1024 * 1024 * 1024llu);
312  mem_config.verbose_ = false;
313 
314  return ConstructLoopbackHostContexts<TestGroup>(
315  mem_config, num_hosts, workers_per_host);
316 }
317 
318 void RunLocalTests(const std::function<void(Context&)>& job_startpoint) {
319  // set fixed amount of RAM for testing
320  RunLocalTests(4 * 1024 * 1024 * 1024llu, job_startpoint);
321 }
322 
324  size_t ram, const std::function<void(Context&)>& job_startpoint) {
325 
326  // discard json log
327  tlx::setenv("THRILL_LOG", "", /* overwrite */ 1);
328 
329  // set fixed amount of RAM for testing, disable /proc profiler
330  MemoryConfig mem_config;
331  mem_config.verbose_ = false;
332  mem_config.enable_proc_profiler_ = false;
333  mem_config.setup(ram);
334 
335  static constexpr size_t num_hosts_list[] = { 1, 2, 5, 8 };
336  static constexpr size_t num_workers_list[] = { 1, 3 };
337 
338  size_t max_mock_workers = 1000000;
339 
340  const char* env_max_mock_workers = getenv("THRILL_MAX_MOCK_WORKERS");
341  if (env_max_mock_workers && *env_max_mock_workers) {
342  // parse envvar only if it exists.
343  char* endptr;
344  max_mock_workers = std::strtoul(env_max_mock_workers, &endptr, 10);
345 
346  if (!endptr || *endptr != 0 || max_mock_workers == 0) {
347  std::cerr << "Thrill: environment variable"
348  << " THRILL_MAX_MOCK_WORKERS=" << env_max_mock_workers
349  << " is not a valid maximum number of mock hosts."
350  << std::endl;
351  return;
352  }
353  }
354 
355  for (const size_t& num_hosts : num_hosts_list) {
356  for (const size_t& workers_per_host : num_workers_list) {
357  if (num_hosts * workers_per_host > max_mock_workers) {
358  std::cerr << "Thrill: skipping test with "
359  << num_hosts * workers_per_host
360  << " workers > max workers " << max_mock_workers
361  << std::endl;
362  continue;
363  }
364 
365  RunLocalMock(mem_config, num_hosts, workers_per_host,
366  job_startpoint);
367  }
368  }
369 }
370 
371 void RunLocalSameThread(const std::function<void(Context&)>& job_startpoint) {
372 
373  size_t my_host_rank = 0;
374  size_t workers_per_host = 1;
375  size_t num_hosts = 1;
376  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
377 
378  // set fixed amount of RAM for testing
379  MemoryConfig mem_config;
380  mem_config.verbose_ = false;
381  mem_config.setup(4 * 1024 * 1024 * 1024llu);
382  mem_config.print(workers_per_host);
383 
384  // construct two full mesh connection cliques, deliver net::tcp::Groups.
385  std::array<std::vector<std::unique_ptr<TestGroup> >, kGroupCount> group;
386 
387  for (size_t g = 0; g < kGroupCount; ++g) {
388  group[g] = TestGroup::ConstructLoopbackMesh(num_hosts);
389  }
390 
391  std::array<net::GroupPtr, kGroupCount> host_group = {
392  { std::move(group[0][0]), std::move(group[1][0]) }
393  };
394 
395  auto dispatcher = std::make_unique<net::DispatcherThread>(
396  std::make_unique<TestGroup::Dispatcher>(), my_host_rank);
397 
398  HostContext host_context(
399  0, mem_config,
400  std::move(dispatcher), std::move(host_group), workers_per_host);
401 
402  Context ctx(host_context, 0);
403  common::NameThisThread("worker " + std::to_string(my_host_rank));
404 
405  job_startpoint(ctx);
406 }
407 
408 /******************************************************************************/
409 // Run() Variants for Different Net Backends
410 
411 //! Run() implementation which uses a loopback net backend ("mock" or "tcp").
412 template <typename NetGroup>
413 static inline
415  const char* backend, const std::function<void(Context&)>& job_startpoint) {
416 
417  char* endptr;
418 
419  // determine number of loopback hosts
420 
421  size_t num_hosts = 2;
422 
423  const char* env_local = getenv("THRILL_LOCAL");
424  if (env_local && *env_local) {
425  // parse envvar only if it exists.
426  num_hosts = std::strtoul(env_local, &endptr, 10);
427 
428  if (!endptr || *endptr != 0 || num_hosts == 0) {
429  std::cerr << "Thrill: environment variable"
430  << " THRILL_LOCAL=" << env_local
431  << " is not a valid number of local loopback hosts."
432  << std::endl;
433  return -1;
434  }
435  }
436 
437  // determine number of threads per loopback host
438 
439  const char* str_workers_per_host;
440  const char* env_workers_per_host;
441 
442  size_t workers_per_host = FindWorkersPerHost(
443  str_workers_per_host, env_workers_per_host);
444 
445  if (workers_per_host == 0)
446  return -1;
447 
448  // core offset for pinning
449 
450  const char* env_core_offset = getenv("THRILL_CORE_OFFSET");
451  size_t core_offset = 0;
452  if (env_core_offset && *env_core_offset) {
453  core_offset = std::strtoul(env_core_offset, &endptr, 10);
454 
455  size_t last_core = core_offset + num_hosts * workers_per_host;
456  if (!endptr || *endptr != 0 ||
457  last_core > std::thread::hardware_concurrency())
458  {
459  std::cerr << "Thrill: environment variable"
460  << " THRILL_CORE_OFFSET=" << env_core_offset
461  << " is not a valid number of cores to skip for pinning."
462  << std::endl;
463  return -1;
464  }
465  }
466 
467  // detect memory config
468 
469  MemoryConfig mem_config;
470  if (mem_config.setup_detect() < 0) return -1;
471  mem_config.print(workers_per_host);
472 
473  // okay, configuration is good.
474 
475  std::cerr << "Thrill: running locally with " << num_hosts
476  << " test hosts and " << workers_per_host << " workers per host"
477  << " in a local " << backend << " network." << std::endl;
478 
479  if (!Initialize()) return -1;
480 
481  RunLoopbackThreads<NetGroup>(
482  mem_config, num_hosts, workers_per_host, core_offset, job_startpoint);
483 
484  if (!Deinitialize()) return -1;
485 
486  return 0;
487 }
488 
489 #if THRILL_HAVE_NET_TCP
490 static inline
491 int RunBackendTcp(const std::function<void(Context&)>& job_startpoint) {
492 
493  char* endptr;
494 
495  // select environment variables
496 
497  const char* str_rank = "THRILL_RANK";
498  const char* env_rank = getenv(str_rank);
499 
500  if (env_rank == nullptr) {
501  // take SLURM_PROCID if THRILL_RANK is not set
502  str_rank = "SLURM_PROCID";
503  env_rank = getenv(str_rank);
504  }
505 
506  const char* env_hostlist = getenv("THRILL_HOSTLIST");
507 
508  // parse environment variables
509 
510  size_t my_host_rank = 0;
511 
512  if (env_rank != nullptr && *env_rank != 0) {
513  my_host_rank = std::strtoul(env_rank, &endptr, 10);
514 
515  if (endptr == nullptr || *endptr != 0) {
516  std::cerr << "Thrill: environment variable "
517  << str_rank << '=' << env_rank
518  << " is not a valid number."
519  << std::endl;
520  return -1;
521  }
522  }
523  else {
524  std::cerr << "Thrill: environment variable THRILL_RANK"
525  << " is required for tcp network backend."
526  << std::endl;
527  return -1;
528  }
529 
530  std::vector<std::string> hostlist;
531 
532  if (env_hostlist != nullptr && *env_hostlist != 0) {
533  // first try to split by spaces, then by commas
534  std::vector<std::string> list = tlx::split(' ', env_hostlist);
535 
536  if (list.size() == 1) {
537  tlx::split(&list, ',', env_hostlist);
538  }
539 
540  for (const std::string& host : list) {
541  // skip empty splits
542  if (host.empty()) continue;
543 
544  if (host.find(':') == std::string::npos) {
545  std::cerr << "Thrill: invalid address \"" << host << "\""
546  << "in THRILL_HOSTLIST. It must contain a port number."
547  << std::endl;
548  return -1;
549  }
550 
551  hostlist.push_back(host);
552  }
553 
554  if (my_host_rank >= hostlist.size()) {
555  std::cerr << "Thrill: endpoint list (" << list.size() << " entries) "
556  << "does not include my host_rank (" << my_host_rank << ")"
557  << std::endl;
558  return -1;
559  }
560  }
561  else {
562  std::cerr << "Thrill: environment variable THRILL_HOSTLIST"
563  << " is required for tcp network backend."
564  << std::endl;
565  return -1;
566  }
567 
568  // determine number of local worker threads per process
569 
570  const char* str_workers_per_host;
571  const char* env_workers_per_host;
572 
573  size_t workers_per_host = FindWorkersPerHost(
574  str_workers_per_host, env_workers_per_host);
575 
576  if (workers_per_host == 0)
577  return -1;
578 
579  // detect memory config
580 
581  MemoryConfig mem_config;
582  if (mem_config.setup_detect() < 0) return -1;
583  mem_config.print(workers_per_host);
584 
585  // okay, configuration is good.
586 
587  std::cerr << "Thrill: running in tcp network with " << hostlist.size()
588  << " hosts and " << workers_per_host << " workers per host"
589  << " with " << common::GetHostname()
590  << " as rank " << my_host_rank << " and endpoints";
591  for (const std::string& ep : hostlist)
592  std::cerr << ' ' << ep;
593  std::cerr << std::endl;
594 
595  if (!Initialize()) return -1;
596 
597  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
598 
599  // construct three TCP network groups
600  auto select_dispatcher = std::make_unique<net::tcp::SelectDispatcher>();
601 
602  std::array<std::unique_ptr<net::tcp::Group>, kGroupCount> groups;
604  *select_dispatcher, my_host_rank, hostlist,
605  groups.data(), net::Manager::kGroupCount);
606 
607  std::array<net::GroupPtr, kGroupCount> host_groups = {
608  { std::move(groups[0]), std::move(groups[1]) }
609  };
610 
611  // construct HostContext
612 
613  auto dispatcher = std::make_unique<net::DispatcherThread>(
614  std::move(select_dispatcher), my_host_rank);
615 
616  HostContext host_context(
617  0, mem_config,
618  std::move(dispatcher), std::move(host_groups), workers_per_host);
619 
620  std::vector<std::thread> threads(workers_per_host);
621 
622  for (size_t worker = 0; worker < workers_per_host; worker++) {
623  threads[worker] = common::CreateThread(
624  [&host_context, &job_startpoint, worker] {
625  Context ctx(host_context, worker);
626  common::NameThisThread("worker " + std::to_string(worker));
627 
628  ctx.Launch(job_startpoint);
629  });
630  common::SetCpuAffinity(threads[worker], worker);
631  }
632 
633  // join worker threads
634  for (size_t i = 0; i < workers_per_host; i++) {
635  threads[i].join();
636  }
637 
638  if (!Deinitialize()) return -1;
639 
640  return 0;
641 }
642 #endif
643 
644 #if THRILL_HAVE_NET_MPI
645 static inline
646 int RunBackendMpi(const std::function<void(Context&)>& job_startpoint) {
647 
648  // determine number of local worker threads per MPI process
649 
650  const char* str_workers_per_host;
651  const char* env_workers_per_host;
652 
653  size_t workers_per_host = FindWorkersPerHost(
654  str_workers_per_host, env_workers_per_host);
655 
656  if (workers_per_host == 0)
657  return -1;
658 
659  // reserve one thread for MPI net::Dispatcher which runs a busy-waiting loop
660 
661  if (workers_per_host == 1) {
662  std::cerr << "Thrill: environment variable"
663  << ' ' << str_workers_per_host
664  << '=' << env_workers_per_host
665  << " is not recommended, as one thread is used exclusively"
666  << " for MPI communication."
667  << std::endl;
668  }
669  else {
670  --workers_per_host;
671  }
672 
673  // detect memory config
674 
675  MemoryConfig mem_config;
676  if (mem_config.setup_detect() < 0) return -1;
677  mem_config.print(workers_per_host);
678 
679  // okay, configuration is good.
680 
681  size_t num_hosts = net::mpi::NumMpiProcesses();
682  size_t mpi_rank = net::mpi::MpiRank();
683  std::string hostname = common::GetHostname();
684 
685  std::cerr << "Thrill: running in MPI network with " << num_hosts
686  << " hosts and " << workers_per_host << "+1 workers per host"
687  << " with " << hostname << " as rank " << mpi_rank << "."
688  << std::endl;
689 
690  if (!Initialize()) return -1;
691 
692  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
693 
694  // construct three MPI network groups
695  auto dispatcher = std::make_unique<net::DispatcherThread>(
696  std::make_unique<net::mpi::Dispatcher>(num_hosts), mpi_rank);
697 
698  std::array<std::unique_ptr<net::mpi::Group>, kGroupCount> groups;
699  net::mpi::Construct(num_hosts, *dispatcher, groups.data(), kGroupCount);
700 
701  std::array<net::GroupPtr, kGroupCount> host_groups = {
702  { std::move(groups[0]), std::move(groups[1]) }
703  };
704 
705  // construct HostContext
706  HostContext host_context(
707  0, mem_config,
708  std::move(dispatcher), std::move(host_groups), workers_per_host);
709 
710  // launch worker threads
711  std::vector<std::thread> threads(workers_per_host);
712 
713  for (size_t worker = 0; worker < workers_per_host; worker++) {
714  threads[worker] = common::CreateThread(
715  [&host_context, &job_startpoint, worker] {
716  Context ctx(host_context, worker);
717  common::NameThisThread("host " + std::to_string(ctx.host_rank())
718  + " worker " + std::to_string(worker));
719 
720  ctx.Launch(job_startpoint);
721  });
722  common::SetCpuAffinity(threads[worker], worker);
723  }
724 
725  // join worker threads
726  for (size_t i = 0; i < workers_per_host; i++) {
727  threads[i].join();
728  }
729 
730  if (!Deinitialize()) return -1;
731 
732  return 0;
733 }
734 #endif
735 
736 #if THRILL_HAVE_NET_IB
737 static inline
738 int RunBackendIb(const std::function<void(Context&)>& job_startpoint) {
739 
740  // determine number of local worker threads per IB/MPI process
741 
742  const char* str_workers_per_host;
743  const char* env_workers_per_host;
744 
745  size_t workers_per_host = FindWorkersPerHost(
746  str_workers_per_host, env_workers_per_host);
747 
748  if (workers_per_host == 0)
749  return -1;
750 
751  // detect memory config
752 
753  MemoryConfig mem_config;
754  if (mem_config.setup_detect() < 0) return -1;
755  mem_config.print(workers_per_host);
756 
757  // okay, configuration is good.
758 
759  size_t num_hosts = net::ib::NumMpiProcesses();
760  size_t mpi_rank = net::ib::MpiRank();
761 
762  std::cerr << "Thrill: running in IB/MPI network with " << num_hosts
763  << " hosts and " << workers_per_host << " workers per host"
764  << " with " << common::GetHostname()
765  << " as rank " << mpi_rank << "."
766  << std::endl;
767 
768  if (!Initialize()) return -1;
769 
770  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
771 
772  // construct two MPI network groups
773  std::array<std::unique_ptr<net::ib::Group>, kGroupCount> groups;
774  net::ib::Construct(num_hosts, groups.data(), kGroupCount);
775 
776  std::array<net::GroupPtr, kGroupCount> host_groups = {
777  { std::move(groups[0]), std::move(groups[1]) }
778  };
779 
780  // construct HostContext
781  HostContext host_context(
782  0, mem_config, std::move(host_groups), workers_per_host);
783 
784  // launch worker threads
785  std::vector<std::thread> threads(workers_per_host);
786 
787  for (size_t worker = 0; worker < workers_per_host; worker++) {
788  threads[worker] = common::CreateThread(
789  [&host_context, &job_startpoint, worker] {
790  Context ctx(host_context, worker);
791  common::NameThisThread("host " + std::to_string(ctx.host_rank())
792  + " worker " + std::to_string(worker));
793 
794  ctx.Launch(job_startpoint);
795  });
796  common::SetCpuAffinity(threads[worker], worker);
797  }
798 
799  // join worker threads
800  for (size_t i = 0; i < workers_per_host; i++) {
801  threads[i].join();
802  }
803 
804  if (!Deinitialize()) return -1;
805 
806  return 0;
807 }
808 #endif
809 
810 int RunNotSupported(const char* env_net) {
811  std::cerr << "Thrill: network backend " << env_net
812  << " is not supported by this binary." << std::endl;
813  return -1;
814 }
815 
816 static inline
817 const char * DetectNetBackend() {
818  // detect openmpi and intel mpi run, add others as well.
819  if (getenv("OMPI_COMM_WORLD_SIZE") != nullptr ||
820  getenv("I_MPI_INFO_NP") != nullptr) {
821 #if THRILL_HAVE_NET_IB
822  return "ib";
823 #elif THRILL_HAVE_NET_MPI
824  return "mpi";
825 #else
826  std::cerr << "Thrill: MPI environment detected, but network backend mpi"
827  << " is not supported by this binary." << std::endl;
828  return nullptr;
829 #endif
830  }
831 #if defined(_MSC_VER)
832  return "mock";
833 #else
834  const char* env_rank = getenv("THRILL_RANK");
835  const char* env_hostlist = getenv("THRILL_HOSTLIST");
836 
837  if (env_rank != nullptr || env_hostlist != nullptr)
838  return "tcp";
839  else
840  return "local";
841 #endif
842 }
843 
845 
846  const char* env_die_with_parent = getenv("THRILL_DIE_WITH_PARENT");
847  if (env_die_with_parent == nullptr || *env_die_with_parent == 0) return 0;
848 
849  char* endptr;
850 
851  long die_with_parent = std::strtol(env_die_with_parent, &endptr, 10);
852  if (endptr == nullptr || *endptr != 0 ||
853  (die_with_parent != 0 && die_with_parent != 1)) {
854  std::cerr << "Thrill: environment variable"
855  << " THRILL_DIE_WITH_PARENT=" << env_die_with_parent
856  << " is not either 0 or 1."
857  << std::endl;
858  return -1;
859  }
860 
861  if (die_with_parent == 0) return 0;
862 
863 #if __linux__
864  if (prctl(PR_SET_PDEATHSIG, SIGTERM) != 0) // NOLINT
865  throw common::ErrnoException("Error calling prctl(PR_SET_PDEATHSIG)");
866  return 1;
867 #else
868  std::cerr << "Thrill: DIE_WITH_PARENT is not supported on this platform.\n"
869  << "Please submit a patch."
870  << std::endl;
871  return 0;
872 #endif
873 }
874 
876 
877  const char* env_unlink_binary = getenv("THRILL_UNLINK_BINARY");
878  if (env_unlink_binary == nullptr || *env_unlink_binary == 0) return 0;
879 
880  if (unlink(env_unlink_binary) != 0) {
882  "Error calling unlink binary \""
883  + std::string(env_unlink_binary) + "\"");
884  }
885 
886  return 0;
887 }
888 
889 int Run(const std::function<void(Context&)>& job_startpoint) {
890 
891  if (RunCheckDieWithParent() < 0)
892  return -1;
893 
894  if (RunCheckUnlinkBinary() < 0)
895  return -1;
896 
897  // parse environment: THRILL_NET
898  const char* env_net = getenv("THRILL_NET");
899 
900  // if no backend configured: automatically select one.
901  if (env_net == nullptr || *env_net == 0) {
902  env_net = DetectNetBackend();
903  if (env_net == nullptr) return -1;
904  }
905 
906  // run with selected backend
907  if (strcmp(env_net, "mock") == 0) {
908  // mock network backend
909  return RunBackendLoopback<net::mock::Group>("mock", job_startpoint);
910  }
911 
912  if (strcmp(env_net, "local") == 0) {
913 #if THRILL_HAVE_NET_TCP
914  // tcp loopback network backend
915  return RunBackendLoopback<net::tcp::Group>("tcp", job_startpoint);
916 #else
917  return RunNotSupported(env_net);
918 #endif
919  }
920 
921  if (strcmp(env_net, "tcp") == 0) {
922 #if THRILL_HAVE_NET_TCP
923  // real tcp network backend
924  return RunBackendTcp(job_startpoint);
925 #else
926  return RunNotSupported(env_net);
927 #endif
928  }
929 
930  if (strcmp(env_net, "mpi") == 0) {
931 #if THRILL_HAVE_NET_MPI
932  // mpi network backend
933  return RunBackendMpi(job_startpoint);
934 #else
935  return RunNotSupported(env_net);
936 #endif
937  }
938 
939  if (strcmp(env_net, "ib") == 0) {
940 #if THRILL_HAVE_NET_IB
941  // ib/mpi network backend
942  return RunBackendIb(job_startpoint);
943 #else
944  return RunNotSupported(env_net);
945 #endif
946  }
947 
948  std::cerr << "Thrill: network backend " << env_net << " is unknown."
949  << std::endl;
950  return -1;
951 }
952 
953 /******************************************************************************/
954 // MemoryConfig
955 
956 void MemoryConfig::setup(size_t ram) {
957  ram_ = ram;
958  apply();
959 }
960 
962 
963  // determine amount of physical RAM or take user's limit
964 
965  const char* env_ram = getenv("THRILL_RAM");
966 
967  if (env_ram != nullptr && *env_ram != 0) {
968  uint64_t ram64;
969  if (!tlx::parse_si_iec_units(env_ram, &ram64)) {
970  std::cerr << "Thrill: environment variable"
971  << " THRILL_RAM=" << env_ram
972  << " is not a valid amount of RAM memory."
973  << std::endl;
974  return -1;
975  }
976  ram_ = static_cast<size_t>(ram64);
977  }
978  else {
979  // detect amount of physical memory on system
980 #if defined(_MSC_VER)
981  MEMORYSTATUSEX memstx;
982  memstx.dwLength = sizeof(memstx);
983  GlobalMemoryStatusEx(&memstx);
984 
985  ram_ = memstx.ullTotalPhys;
986 #elif __APPLE__
987  int mib[2];
988  int64_t physical_memory;
989  size_t length;
990 
991  // Get the physical memory size
992  mib[0] = CTL_HW;
993  mib[1] = HW_MEMSIZE;
994  length = sizeof(physical_memory);
995  sysctl(mib, 2, &physical_memory, &length, nullptr, 0);
996  ram_ = static_cast<size_t>(physical_memory);
997 #else
998  ram_ = sysconf(_SC_PHYS_PAGES) * static_cast<size_t>(sysconf(_SC_PAGESIZE));
999 #endif
1000 
1001 #if __linux__
1002  // use getrlimit() to check user limit on address space
1003  struct rlimit rl; // NOLINT
1004  if (getrlimit(RLIMIT_AS, &rl) == 0) {
1005  if (rl.rlim_cur != 0 && rl.rlim_cur * 3 / 4 < ram_) {
1006  ram_ = rl.rlim_cur * 3 / 4;
1007  }
1008  }
1009  else {
1010  sLOG1 << "getrlimit(): " << strerror(errno);
1011  }
1012 #endif
1013  }
1014 
1015  apply();
1016 
1017  return 0;
1018 }
1019 
1021  // divide up ram_
1022 
1023  ram_workers_ = ram_ / 3;
1024  ram_block_pool_hard_ = ram_ / 3;
1027 
1028  // set memory limit, only BlockPool is excluded from malloc tracking, as
1029  // only it uses bypassing allocators.
1031 }
1032 
1033 MemoryConfig MemoryConfig::divide(size_t hosts) const {
1034 
1035  MemoryConfig mc = *this;
1036  mc.ram_ /= hosts;
1037  mc.ram_block_pool_hard_ /= hosts;
1038  mc.ram_block_pool_soft_ /= hosts;
1039  mc.ram_workers_ /= hosts;
1040  // free floating memory is not divided by host, as it is measured overall
1041 
1042  return mc;
1043 }
1044 
1045 void MemoryConfig::print(size_t workers_per_host) const {
1046  if (!verbose_) return;
1047 
1048  std::cerr
1049  << "Thrill: using "
1050  << tlx::format_iec_units(ram_) << "B RAM total,"
1051  << " BlockPool=" << tlx::format_iec_units(ram_block_pool_hard_) << "B,"
1052  << " workers="
1053  << tlx::format_iec_units(ram_workers_ / workers_per_host) << "B,"
1054  << " floating=" << tlx::format_iec_units(ram_floating_) << "B."
1055  << std::endl;
1056 }
1057 
1058 /******************************************************************************/
1059 // HostContext methods
1060 
1062  size_t local_host_id,
1063  const MemoryConfig& mem_config,
1064  std::unique_ptr<net::DispatcherThread> dispatcher,
1065  std::array<net::GroupPtr, net::Manager::kGroupCount>&& groups,
1066  size_t workers_per_host)
1067  : mem_config_(mem_config),
1068  base_logger_(MakeHostLogPath(groups[0]->my_host_rank())),
1069  logger_(&base_logger_, "host_rank", groups[0]->my_host_rank()),
1070  profiler_(std::make_unique<common::ProfileThread>()),
1071  local_host_id_(local_host_id),
1072  workers_per_host_(workers_per_host),
1073  dispatcher_(std::move(dispatcher)),
1074  net_manager_(std::move(groups), logger_) {
1075 
1076  // write command line parameters to json log
1078 
1081 
1082  // run memory profiler only on local host 0 (especially for test runs)
1083  if (local_host_id == 0)
1085 }
1086 
1088  // stop dispatcher _before_ stopping multiplexer
1089  dispatcher_->Terminate();
1090 }
1091 
1093  const char* env_log = getenv("THRILL_LOG");
1094  if (env_log == nullptr) {
1095  if (host_rank == 0 && mem_config().verbose_) {
1096  std::cerr << "Thrill: no THRILL_LOG was found, "
1097  << "so no json log is written."
1098  << std::endl;
1099  }
1100  return std::string();
1101  }
1102 
1103  std::string output = env_log;
1104  if (output == "" || output == "-")
1105  return std::string();
1106  if (output == "/dev/stdout")
1107  return output;
1108  if (output == "stdout")
1109  return "/dev/stdout";
1110 
1111  return output + "-host-" + std::to_string(host_rank) + ".json";
1112 }
1113 
1114 /******************************************************************************/
1115 // Context methods
1116 
1117 Context::Context(HostContext& host_context, size_t local_worker_id)
1118  : local_host_id_(host_context.local_host_id()),
1119  local_worker_id_(local_worker_id),
1120  workers_per_host_(host_context.workers_per_host()),
1121  mem_limit_(host_context.worker_mem_limit()),
1122  mem_config_(host_context.mem_config()),
1123  mem_manager_(host_context.mem_manager()),
1124  net_manager_(host_context.net_manager()),
1125  flow_manager_(host_context.flow_manager()),
1126  block_pool_(host_context.block_pool()),
1127  multiplexer_(host_context.data_multiplexer()),
1128  rng_(std::random_device { }
1129  () + (local_worker_id_ << 16)),
1130  base_logger_(&host_context.base_logger_) {
1131  assert(local_worker_id < workers_per_host());
1132 }
1133 
1135  return GetFile(dia != nullptr ? dia->dia_id() : 0);
1136 }
1137 
1139  return tlx::make_counting<data::File>(
1140  block_pool_, local_worker_id_, dia_id);
1141 }
1142 
1144  return GetFilePtr(dia != nullptr ? dia->dia_id() : 0);
1145 }
1146 
1149 }
1150 
1152  return GetNewCatStream(dia != nullptr ? dia->dia_id() : 0);
1153 }
1154 
1157 }
1158 
1160  return GetNewMixStream(dia != nullptr ? dia->dia_id() : 0);
1161 }
1162 
1163 template <>
1164 data::CatStreamPtr Context::GetNewStream<data::CatStream>(size_t dia_id) {
1165  return GetNewCatStream(dia_id);
1166 }
1167 
1168 template <>
1169 data::MixStreamPtr Context::GetNewStream<data::MixStream>(size_t dia_id) {
1170  return GetNewMixStream(dia_id);
1171 }
1172 
1173 struct OverallStats {
1174 
1175  //! overall run time
1176  double runtime;
1177 
1178  //! maximum ByteBlock allocation on all workers
1179  size_t max_block_bytes;
1180 
1181  //! network traffic performed by net layer
1182  size_t net_traffic_tx, net_traffic_rx;
1183 
1184  //! I/O volume performed by io layer
1185  size_t io_volume;
1186 
1187  //! maximum external memory allocation
1188  size_t io_max_allocation;
1189 
1190  friend std::ostream& operator << (std::ostream& os, const OverallStats& c) {
1191  return os << "[OverallStats"
1192  << " runtime=" << c.runtime
1193  << " max_block_bytes=" << c.max_block_bytes
1194  << " net_traffic_tx=" << c.net_traffic_tx
1195  << " net_traffic_rx=" << c.net_traffic_rx
1196  << " io_volume=" << c.io_volume
1197  << " io_max_allocation=" << c.io_max_allocation
1198  << "]";
1199  }
1200 
1201  OverallStats operator + (const OverallStats& b) const {
1202  OverallStats r;
1203  r.runtime = std::max(runtime, b.runtime);
1204  r.max_block_bytes = max_block_bytes + b.max_block_bytes;
1205  r.net_traffic_tx = net_traffic_tx + b.net_traffic_tx;
1206  r.net_traffic_rx = net_traffic_rx + b.net_traffic_rx;
1207  r.io_volume = io_volume + b.io_volume;
1208  r.io_max_allocation = std::max(io_max_allocation, b.io_max_allocation);
1209  return r;
1210  }
1211 };
1212 
1213 void Context::Launch(const std::function<void(Context&)>& job_startpoint) {
1214  logger_ << "class" << "Context"
1215  << "event" << "job-start";
1216 
1217  common::StatsTimerStart overall_timer;
1218 
1219  try {
1220  job_startpoint(*this);
1221  }
1222  catch (std::exception& e) {
1223  LOG1 << "worker " << my_rank() << " threw " << typeid(e).name();
1224  LOG1 << " what(): " << e.what();
1225 
1226  logger_ << "class" << "Context"
1227  << "event" << "job-exception"
1228  << "exception" << typeid(e).name()
1229  << "what" << e.what();
1230  throw;
1231  }
1232 
1233  logger_ << "class" << "Context"
1234  << "event" << "job-done"
1235  << "elapsed" << overall_timer;
1236 
1237  overall_timer.Stop();
1238 
1239  // collect overall statistics
1240  OverallStats stats;
1241  stats.runtime = overall_timer.SecondsDouble();
1242 
1243  stats.max_block_bytes =
1245 
1246  stats.net_traffic_tx = local_worker_id_ == 0 ? net_manager_.Traffic().tx : 0;
1247  stats.net_traffic_rx = local_worker_id_ == 0 ? net_manager_.Traffic().rx : 0;
1248 
1249  if (local_host_id_ == 0 && local_worker_id_ == 0) {
1251  stats.io_volume = io_stats.get_read_bytes() + io_stats.get_write_bytes();
1252  stats.io_max_allocation =
1254  }
1255  else {
1256  stats.io_volume = 0;
1257  stats.io_max_allocation = 0;
1258  }
1259 
1260  LOG0 << stats;
1261 
1262  stats = net.Reduce(stats);
1263 
1264  if (my_rank() == 0) {
1265  using tlx::format_iec_units;
1266 
1267  if (stats.net_traffic_rx != stats.net_traffic_tx)
1268  LOG1 << "Manager::Traffic() tx/rx asymmetry = "
1269  << tlx::abs_diff(stats.net_traffic_tx, stats.net_traffic_rx);
1270 
1271  if (mem_config().verbose_) {
1272  std::cerr
1273  << "Thrill:"
1274  << " ran " << stats.runtime << "s with max "
1275  << format_iec_units(stats.max_block_bytes) << "B in DIA Blocks, "
1276  << format_iec_units(stats.net_traffic_tx) << "B network traffic, "
1277  << format_iec_units(stats.io_volume) << "B disk I/O, and "
1278  << format_iec_units(stats.io_max_allocation) << "B max disk use."
1279  << std::endl;
1280  }
1281 
1282  logger_ << "class" << "Context"
1283  << "event" << "summary"
1284  << "runtime" << stats.runtime
1285  << "net_traffic" << stats.net_traffic_tx
1286  << "io_volume" << stats.io_volume
1287  << "io_max_allocation" << stats.io_max_allocation;
1288  }
1289 }
1290 
1291 } // namespace api
1292 } // namespace thrill
1293 
1294 /******************************************************************************/
CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id)
Request next stream.
size_t ram_
total amount of physical ram detected or THRILL_RAM
Definition: context.hpp:63
size_t local_host_id_
id among all local hosts (in test program runs)
Definition: context.hpp:395
net::FlowControlChannel & net
Definition: context.hpp:446
size_t ram_floating_
remaining free-floating RAM used for user and Thrill data structures.
Definition: context.hpp:76
MemoryConfig divide(size_t hosts) const
Definition: context.cpp:1033
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
size_t max_total_bytes() noexcept
Maximum total number of bytes allocated in blocks of this block pool.
Definition: block_pool.cpp:896
The central object of a mock network: the Group containing links to other mock Group forming the netw...
Definition: group.hpp:116
void LogCmdlineParams(JsonLogger &logger)
Definition: porting.cpp:67
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
HostContext(size_t local_host_id, const MemoryConfig &mem_config, std::unique_ptr< net::DispatcherThread > dispatcher, std::array< net::GroupPtr, net::Manager::kGroupCount > &&groups, size_t workers_per_host)
constructor from existing net Groups. Used by the construction methods.
Definition: context.cpp:1061
void StartMemProfiler(common::ProfileThread &sched, common::JsonLogger &logger)
launch profiler task
void Initialize()
Initialize VFS layer.
Definition: file_io.cpp:34
net::Manager & net_manager_
net::Manager instance that is shared among workers
Definition: context.hpp:413
std::string MakeHostLogPath(size_t host_rank)
create host log
Definition: context.cpp:1092
#define LOG1
Definition: logger.hpp:28
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
Collection of NetConnections to workers, allows point-to-point client communication and simple collec...
Definition: group.hpp:43
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:889
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
An Exception which is thrown on system errors and contains errno information.
void set_memory_limit_indication(ssize_t size)
common::JsonLogger logger_
Definition: context.hpp:158
const MemoryConfig & mem_config() const
host-global memory config
Definition: context.hpp:329
int RunCheckDieWithParent()
Check environment variable THRILL_DIE_WITH_PARENT and enable process flag: this is useful for ssh/inv...
Definition: context.cpp:844
int setup_detect()
detect memory configuration from environment
Definition: context.cpp:961
#define sLOG1
Definition: logger.hpp:38
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
size_t workers_per_host() const
number of workers per host (all have the same).
Definition: context.hpp:115
void NameThisThread(const std::string &name)
Defines a name for the current thread, only if no name was set previously.
Definition: logger.cpp:40
The DIABase is the untyped super class of DIANode.
Definition: dia_base.hpp:87
static by_string to_string(int val)
convert to string
static bool Deinitialize()
Definition: context.cpp:282
static std::vector< std::unique_ptr< HostContext > > ConstructLoopbackHostContexts(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host)
Generic network constructor for net backends supporting loopback tests.
Definition: context.cpp:94
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
Definition: split.cpp:20
int RunCheckUnlinkBinary()
Check environment variable THRILL_UNLINK_BINARY and unlink given program path: this is useful for ssh...
Definition: context.cpp:875
data::BlockPool & block_pool_
data block pool
Definition: context.hpp:419
size_t workers_per_host() const
Returns the number of workers that is hosted on each host.
Definition: context.hpp:238
static void RunLoopbackThreads(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, size_t core_offset, const std::function< void(Context &)> &job_startpoint)
Generic runner for backends supporting loopback tests.
Definition: context.cpp:135
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1147
size_t start_block_size
starting size of blocks in BlockWriter.
Definition: byte_block.cpp:24
int setenv(const char *name, const char *value, int overwrite)
setenv - change or add an environment variable Windows porting madness because setenv() is apparently...
Definition: setenv.cpp:31
~HostContext()
destructor
Definition: context.cpp:1087
std::string GetHostname()
get hostname
Definition: porting.cpp:142
void print(size_t workers_per_host) const
Definition: context.cpp:1045
bool parse_si_iec_units(const char *str, uint64_t *out_size, char default_unit)
Parse a string like "343KB" or "44 GiB" into the corresponding size in bytes.
void RunLocalSameThread(const std::function< void(Context &)> &job_startpoint)
Runs the given job_startpoint within the same thread with a test network –> run test with one host an...
Definition: context.cpp:371
size_t local_worker_id_
number of this host context, 0..p-1, within this host
Definition: context.hpp:398
unique_ptr< T > make_unique(Manager &manager, Args &&...args)
make_unique with Manager tracking
Definition: allocator.hpp:212
external_size_type get_read_bytes() const
Definition: iostats.cpp:527
size_t ram_block_pool_hard_
amount of RAM dedicated to data::BlockPool – hard limit
Definition: context.hpp:66
void RunLocalMock(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, const std::function< void(Context &)> &job_startpoint)
Function to run a number of mock hosts as locally independent threads, which communicate via internal...
Definition: context.cpp:298
void StartLinuxProcStatsProfiler(ProfileThread &, JsonLogger &)
launch profiler task
T abs_diff(const T &a, const T &b)
absolute difference, which also works for unsigned types
Definition: abs_diff.hpp:24
static bool SetupBlockSize()
Definition: context.cpp:178
void Deinitialize()
Deinitialize VFS layer.
Definition: file_io.cpp:39
common::JsonLogger logger_
Definition: context.hpp:462
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Reduce(const T &value, size_t root=0, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers to the given worker, provided a certain red...
int RunNotSupported(const char *env_net)
Definition: context.cpp:810
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:42
static instance_pointer get_instance()
Definition: singleton.hpp:44
net::Traffic Traffic() const
calculate overall traffic for final stats
Definition: group.cpp:67
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a test network with an underlying full mesh of local loopback stream sockets for testing...
Definition: group.cpp:36
data::FilePtr GetFilePtr(size_t dia_id)
Definition: context.cpp:1138
MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id)
Request next stream.
void RunLocalTests(const std::function< void(Context &)> &job_startpoint)
Helper Function to execute RunLocalMock() tests using mock networks in test suite for many different ...
Definition: context.cpp:318
size_t MpiRank()
Return the rank of this process in the MPI COMM WORLD.
Definition: group.cpp:714
void Construct(SelectDispatcher &dispatcher, size_t my_rank, const std::vector< std::string > &endpoints, std::unique_ptr< Group > *groups, size_t group_count)
Definition: construct.cpp:543
static size_t FindWorkersPerHost(const char *&str_workers_per_host, const char *&env_workers_per_host)
Definition: context.cpp:203
Context(HostContext &host_context, size_t local_worker_id)
Definition: context.cpp:1117
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
data::MixStreamPtr GetNewMixStream(size_t dia_id)
Definition: context.cpp:1155
void Launch(const std::function< void(Context &)> &job_startpoint)
method used to launch a job's main procedure. it wraps it in log output.
Definition: context.cpp:1213
data::Multiplexer & multiplexer_
data::Multiplexer instance that is shared among workers
Definition: context.hpp:422
size_t ram_block_pool_soft_
amount of RAM dedicated to data::BlockPool – soft limit
Definition: context.hpp:69
static constexpr size_t kGroupCount
The count of net::Groups to initialize.
Definition: manager.hpp:61
The HostContext contains all data structures shared among workers on the same host.
Definition: context.hpp:90
size_t local_worker_id() const
Definition: context.hpp:263
void setup(size_t ram)
setup memory size
Definition: context.cpp:956
uint64_t maximum_allocation() const
return maximum number of bytes allocated during program run.
static int RunBackendLoopback(const char *backend, const std::function< void(Context &)> &job_startpoint)
Run() implementation which uses a loopback net backend ("mock" or "tcp").
Definition: context.cpp:414
net::tcp::Group TestGroup
Definition: context.cpp:295
static std::vector< std::unique_ptr< HostContext > > ConstructLoopback(size_t num_hosts, size_t workers_per_host)
Construct a number of mock hosts running in this process.
Definition: context.cpp:307
MemoryConfig & mem_config()
host-global memory config
Definition: context.hpp:123
size_t rx
received bytes
Definition: manager.hpp:37
size_t NumMpiProcesses()
Return the number of MPI processes.
Definition: group.cpp:701
static bool Initialize()
Definition: context.cpp:273
MemoryConfig mem_config_
memory configuration
Definition: context.hpp:146
std::unique_ptr< common::ProfileThread > profiler_
thread for scheduling profiling methods for statistical output
Definition: context.hpp:161
bool enable_proc_profiler_
enable Linux /proc stats profiler (default: on)
Definition: context.hpp:82
std::unique_ptr< net::DispatcherThread > dispatcher_
main host network dispatcher thread backend
Definition: context.hpp:176
void SetCpuAffinity(std::thread &thread, size_t cpu_id)
set cpu/core affinity of a thread
Definition: porting.cpp:110
std::string format_iec_units(uint64_t number, int precision)
Format number as something like 1 TiB.
static const char * DetectNetBackend()
Definition: context.cpp:817
bool verbose_
StageBuilder verbosity flag.
Definition: context.hpp:79
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:324
std::ostream & operator<<(std::ostream &os, const DIABase &d)
make ostream-able.
Definition: dia_base.cpp:449
external_size_type get_write_bytes() const
Definition: iostats.cpp:543
const size_t & dia_id() const
return unique id of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
bool Construct(size_t group_size, DispatcherThread &dispatcher, std::unique_ptr< Group > *groups, size_t group_count)
Construct Group which connects to peers using MPI.
Definition: group.cpp:675
size_t tx
transmitted bytes
Definition: manager.hpp:35