Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
context.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/context.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/api/context.hpp>
13 
14 #include <thrill/api/dia_base.hpp>
16 #include <thrill/common/logger.hpp>
17 #include <thrill/common/math.hpp>
20 #include <thrill/common/string.hpp>
22 #include <thrill/io/iostats.hpp>
23 #include <thrill/vfs/file_io.hpp>
24 
25 #include <tlx/math/abs_diff.hpp>
28 #include <tlx/string/split.hpp>
29 
30 // mock net backend is always available -tb :)
32 
33 #if THRILL_HAVE_NET_TCP
36 #endif
37 
38 #if THRILL_HAVE_NET_MPI
40 #include <thrill/net/mpi/group.hpp>
41 #endif
42 
43 #if THRILL_HAVE_NET_IB
44 #include <thrill/net/ib/group.hpp>
45 #endif
46 
47 #if __linux__
48 
49 // linux-specific process control
50 #include <sys/prctl.h>
51 
52 // for calling getrlimit() to determine memory limit
53 #include <sys/resource.h>
54 #include <sys/time.h>
55 
56 #endif
57 
58 #if __APPLE__
59 
60 // for sysctl()
61 #include <sys/sysctl.h>
62 #include <sys/types.h>
63 
64 #endif
65 
66 #if defined(_MSC_VER)
67 
68 // for detecting amount of physical memory
69 #include <windows.h>
70 
71 #endif
72 
73 #include <algorithm>
74 #include <csignal>
75 #include <iostream>
76 #include <memory>
77 #include <string>
78 #include <thread>
79 #include <tuple>
80 #include <utility>
81 #include <vector>
82 
83 namespace thrill {
84 namespace api {
85 
86 /******************************************************************************/
87 // Generic Network Construction
88 
89 //! Generic network constructor for net backends supporting loopback tests.
90 template <typename NetGroup>
91 static inline
92 std::vector<std::unique_ptr<HostContext> >
94  const MemoryConfig& mem_config,
95  size_t num_hosts, size_t workers_per_host) {
96 
97  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
98 
99  // construct three full mesh loopback cliques, deliver net::Groups.
100  std::array<std::vector<std::unique_ptr<NetGroup> >, kGroupCount> group;
101 
102  for (size_t g = 0; g < kGroupCount; ++g) {
103  group[g] = NetGroup::ConstructLoopbackMesh(num_hosts);
104  }
105 
106  std::vector<std::unique_ptr<net::DispatcherThread> > dispatcher;
107  for (size_t h = 0; h < num_hosts; ++h) {
108  dispatcher.emplace_back(
109  std::make_unique<net::DispatcherThread>(
110  std::make_unique<typename NetGroup::Dispatcher>(), h));
111  }
112 
113  // construct host context
114 
115  std::vector<std::unique_ptr<HostContext> > host_context;
116 
117  for (size_t h = 0; h < num_hosts; h++) {
118  std::array<net::GroupPtr, kGroupCount> host_group = {
119  { std::move(group[0][h]), std::move(group[1][h]) }
120  };
121 
122  host_context.emplace_back(
123  std::make_unique<HostContext>(
124  h, mem_config, std::move(dispatcher[h]),
125  std::move(host_group), workers_per_host));
126  }
127 
128  return host_context;
129 }
130 
131 //! Generic runner for backends supporting loopback tests.
132 template <typename NetGroup>
133 static inline void
135  const MemoryConfig& mem_config,
136  size_t num_hosts, size_t workers_per_host, size_t core_offset,
137  const std::function<void(Context&)>& job_startpoint) {
138 
139  MemoryConfig host_mem_config = mem_config.divide(num_hosts);
140  mem_config.print(workers_per_host);
141 
142  // construct a mock network of hosts
143  typename NetGroup::Dispatcher dispatcher;
144 
145  std::vector<std::unique_ptr<HostContext> > host_contexts =
146  ConstructLoopbackHostContexts<NetGroup>(
147  host_mem_config, num_hosts, workers_per_host);
148 
149  // launch thread for each of the workers on this host.
150  std::vector<std::thread> threads(num_hosts* workers_per_host);
151 
152  for (size_t host = 0; host < num_hosts; ++host) {
153  mem::by_string log_prefix = "host " + mem::to_string(host);
154  for (size_t worker = 0; worker < workers_per_host; ++worker) {
155  size_t id = host * workers_per_host + worker;
156  threads[id] = common::CreateThread(
157  [&host_contexts, &job_startpoint, host, worker, log_prefix] {
158  Context ctx(*host_contexts[host], worker);
160  log_prefix + " worker " + mem::to_string(worker));
161 
162  ctx.Launch(job_startpoint);
163  });
164  common::SetCpuAffinity(threads[id], core_offset + id);
165  }
166  }
167 
168  // join worker threads
169  for (size_t i = 0; i < num_hosts * workers_per_host; i++) {
170  threads[i].join();
171  }
172 }
173 
174 /******************************************************************************/
175 // Other Configuration Initializations
176 
177 static inline bool SetupBlockSize() {
178 
179  const char* env_block_size = getenv("THRILL_BLOCK_SIZE");
180  if (env_block_size == nullptr || *env_block_size == 0) return true;
181 
182  char* endptr;
183  data::default_block_size = std::strtoul(env_block_size, &endptr, 10);
184 
185  if (endptr == nullptr || *endptr != 0 || data::default_block_size == 0) {
186  std::cerr << "Thrill: environment variable"
187  << " THRILL_BLOCK_SIZE=" << env_block_size
188  << " is not a valid number."
189  << std::endl;
190  return false;
191  }
192 
194 
195  std::cerr << "Thrill: setting default_block_size = "
197  << std::endl;
198 
199  return true;
200 }
201 
202 static inline size_t FindWorkersPerHost(
203  const char*& str_workers_per_host, const char*& env_workers_per_host) {
204 
205  char* endptr;
206 
207  // first check THRILL_WORKERS_PER_HOST
208 
209  str_workers_per_host = "THRILL_WORKERS_PER_HOST";
210  env_workers_per_host = getenv(str_workers_per_host);
211 
212  if (env_workers_per_host && *env_workers_per_host) {
213  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
214  if (!endptr || *endptr != 0 || result == 0) {
215  std::cerr << "Thrill: environment variable"
216  << ' ' << str_workers_per_host
217  << '=' << env_workers_per_host
218  << " is not a valid number of workers per host."
219  << std::endl;
220  return 0;
221  }
222  else {
223  return result;
224  }
225  }
226 
227  // second check: look for OMP_NUM_THREADS
228 
229  str_workers_per_host = "OMP_NUM_THREADS";
230  env_workers_per_host = getenv(str_workers_per_host);
231 
232  if (env_workers_per_host && *env_workers_per_host) {
233  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
234  if (!endptr || *endptr != 0 || result == 0) {
235  std::cerr << "Thrill: environment variable"
236  << ' ' << str_workers_per_host
237  << '=' << env_workers_per_host
238  << " is not a valid number of workers per host."
239  << std::endl;
240  // fall through, try next variable
241  }
242  else {
243  return result;
244  }
245  }
246 
247  // third check: look for SLURM_CPUS_ON_NODE
248 
249  str_workers_per_host = "SLURM_CPUS_ON_NODE";
250  env_workers_per_host = getenv(str_workers_per_host);
251 
252  if (env_workers_per_host && *env_workers_per_host) {
253  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
254  if (!endptr || *endptr != 0 || result == 0) {
255  std::cerr << "Thrill: environment variable"
256  << ' ' << str_workers_per_host
257  << '=' << env_workers_per_host
258  << " is not a valid number of workers per host."
259  << std::endl;
260  // fall through, try next variable
261  }
262  else {
263  return result;
264  }
265  }
266 
267  // last check: return std::thread::hardware_concurrency()
268 
269  return std::thread::hardware_concurrency();
270 }
271 
272 static inline bool Initialize() {
273 
274  if (!SetupBlockSize()) return false;
275 
276  vfs::Initialize();
277 
278  return true;
279 }
280 
281 static inline bool Deinitialize() {
282 
284 
285  return true;
286 }
287 
288 /******************************************************************************/
289 // Constructions using TestGroup (either mock or tcp-loopback) for local testing
290 
291 #if defined(_MSC_VER)
293 #else
295 #endif
296 
297 void RunLocalMock(const MemoryConfig& mem_config,
298  size_t num_hosts, size_t workers_per_host,
299  const std::function<void(Context&)>& job_startpoint) {
300 
301  return RunLoopbackThreads<TestGroup>(
302  mem_config, num_hosts, workers_per_host, 0, job_startpoint);
303 }
304 
305 std::vector<std::unique_ptr<HostContext> >
306 HostContext::ConstructLoopback(size_t num_hosts, size_t workers_per_host) {
307 
308  // set fixed amount of RAM for testing
310  mem_config.setup(4 * 1024 * 1024 * 1024llu);
311  mem_config.verbose_ = false;
312 
313  return ConstructLoopbackHostContexts<TestGroup>(
314  mem_config, num_hosts, workers_per_host);
315 }
316 
317 // Windows porting madness because setenv() is apparently dangerous
318 #if defined(_MSC_VER)
319 int wrap_setenv(const char* name, const char* value, int overwrite) {
320  if (!overwrite) {
321  size_t envsize = 0;
322  int errcode = getenv_s(&envsize, nullptr, 0, name);
323  if (errcode || envsize) return errcode;
324  }
325  return _putenv_s(name, value);
326 }
327 #else
328 int wrap_setenv(const char* name, const char* value, int overwrite) {
329  return setenv(name, value, overwrite);
330 }
331 #endif
332 
333 void RunLocalTests(const std::function<void(Context&)>& job_startpoint) {
334  // set fixed amount of RAM for testing
335  RunLocalTests(4 * 1024 * 1024 * 1024llu, job_startpoint);
336 }
337 
339  size_t ram, const std::function<void(Context&)>& job_startpoint) {
340 
341  // discard json log
342  wrap_setenv("THRILL_LOG", "", /* overwrite */ 1);
343 
344  // set fixed amount of RAM for testing
345  MemoryConfig mem_config;
346  mem_config.verbose_ = false;
347  mem_config.setup(ram);
348 
349  static constexpr size_t num_hosts_list[] = { 1, 2, 5, 8 };
350  static constexpr size_t num_workers_list[] = { 1, 3 };
351 
352  for (const size_t& num_hosts : num_hosts_list) {
353  for (const size_t& workers_per_host : num_workers_list) {
354  RunLocalMock(mem_config, num_hosts, workers_per_host,
355  job_startpoint);
356  }
357  }
358 }
359 
360 void RunLocalSameThread(const std::function<void(Context&)>& job_startpoint) {
361 
362  size_t my_host_rank = 0;
363  size_t workers_per_host = 1;
364  size_t num_hosts = 1;
365  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
366 
367  // set fixed amount of RAM for testing
368  MemoryConfig mem_config;
369  mem_config.verbose_ = false;
370  mem_config.setup(4 * 1024 * 1024 * 1024llu);
371  mem_config.print(workers_per_host);
372 
373  // construct two full mesh connection cliques, deliver net::tcp::Groups.
374  std::array<std::vector<std::unique_ptr<TestGroup> >, kGroupCount> group;
375 
376  for (size_t g = 0; g < kGroupCount; ++g) {
377  group[g] = TestGroup::ConstructLoopbackMesh(num_hosts);
378  }
379 
380  std::array<net::GroupPtr, kGroupCount> host_group = {
381  { std::move(group[0][0]), std::move(group[1][0]) }
382  };
383 
384  auto dispatcher = std::make_unique<net::DispatcherThread>(
385  std::make_unique<TestGroup::Dispatcher>(), my_host_rank);
386 
387  HostContext host_context(
388  0, mem_config,
389  std::move(dispatcher), std::move(host_group), workers_per_host);
390 
391  Context ctx(host_context, 0);
392  common::NameThisThread("worker " + mem::to_string(my_host_rank));
393 
394  job_startpoint(ctx);
395 }
396 
397 /******************************************************************************/
398 // Run() Variants for Different Net Backends
399 
400 //! Run() implementation which uses a loopback net backend ("mock" or "tcp").
401 template <typename NetGroup>
402 static inline
404  const char* backend, const std::function<void(Context&)>& job_startpoint) {
405 
406  char* endptr;
407 
408  // determine number of loopback hosts
409 
410  size_t num_hosts = 2;
411 
412  const char* env_local = getenv("THRILL_LOCAL");
413  if (env_local && *env_local) {
414  // parse envvar only if it exists.
415  num_hosts = std::strtoul(env_local, &endptr, 10);
416 
417  if (!endptr || *endptr != 0 || num_hosts == 0) {
418  std::cerr << "Thrill: environment variable"
419  << " THRILL_LOCAL=" << env_local
420  << " is not a valid number of local loopback hosts."
421  << std::endl;
422  return -1;
423  }
424  }
425 
426  // determine number of threads per loopback host
427 
428  const char* str_workers_per_host;
429  const char* env_workers_per_host;
430 
431  size_t workers_per_host = FindWorkersPerHost(
432  str_workers_per_host, env_workers_per_host);
433 
434  if (workers_per_host == 0)
435  return -1;
436 
437  // core offset for pinning
438 
439  const char* env_core_offset = getenv("THRILL_CORE_OFFSET");
440  size_t core_offset = 0;
441  if (env_core_offset && *env_core_offset) {
442  core_offset = std::strtoul(env_core_offset, &endptr, 10);
443 
444  size_t last_core = core_offset + num_hosts * workers_per_host;
445  if (!endptr || *endptr != 0 ||
446  last_core > std::thread::hardware_concurrency())
447  {
448  std::cerr << "Thrill: environment variable"
449  << " THRILL_CORE_OFFSET=" << env_core_offset
450  << " is not a valid number of cores to skip for pinning."
451  << std::endl;
452  return -1;
453  }
454  }
455 
456  // detect memory config
457 
458  MemoryConfig mem_config;
459  if (mem_config.setup_detect() < 0) return -1;
460  mem_config.print(workers_per_host);
461 
462  // okay, configuration is good.
463 
464  std::cerr << "Thrill: running locally with " << num_hosts
465  << " test hosts and " << workers_per_host << " workers per host"
466  << " in a local " << backend << " network." << std::endl;
467 
468  if (!Initialize()) return -1;
469 
470  RunLoopbackThreads<NetGroup>(
471  mem_config, num_hosts, workers_per_host, core_offset, job_startpoint);
472 
473  if (!Deinitialize()) return -1;
474 
475  return 0;
476 }
477 
478 #if THRILL_HAVE_NET_TCP
479 static inline
480 int RunBackendTcp(const std::function<void(Context&)>& job_startpoint) {
481 
482  char* endptr;
483 
484  // select environment variables
485 
486  const char* str_rank = "THRILL_RANK";
487  const char* env_rank = getenv(str_rank);
488 
489  if (env_rank == nullptr) {
490  // take SLURM_PROCID if THRILL_RANK is not set
491  str_rank = "SLURM_PROCID";
492  env_rank = getenv(str_rank);
493  }
494 
495  const char* env_hostlist = getenv("THRILL_HOSTLIST");
496 
497  // parse environment variables
498 
499  size_t my_host_rank = 0;
500 
501  if (env_rank != nullptr && *env_rank != 0) {
502  my_host_rank = std::strtoul(env_rank, &endptr, 10);
503 
504  if (endptr == nullptr || *endptr != 0) {
505  std::cerr << "Thrill: environment variable "
506  << str_rank << '=' << env_rank
507  << " is not a valid number."
508  << std::endl;
509  return -1;
510  }
511  }
512  else {
513  std::cerr << "Thrill: environment variable THRILL_RANK"
514  << " is required for tcp network backend."
515  << std::endl;
516  return -1;
517  }
518 
519  std::vector<std::string> hostlist;
520 
521  if (env_hostlist != nullptr && *env_hostlist != 0) {
522  // first try to split by spaces, then by commas
523  std::vector<std::string> list = tlx::split(' ', env_hostlist);
524 
525  if (list.size() == 1) {
526  tlx::split(&list, ',', env_hostlist);
527  }
528 
529  for (const std::string& host : list) {
530  // skip empty splits
531  if (host.empty()) continue;
532 
533  if (host.find(':') == std::string::npos) {
534  std::cerr << "Thrill: invalid address \"" << host << "\""
535  << "in THRILL_HOSTLIST. It must contain a port number."
536  << std::endl;
537  return -1;
538  }
539 
540  hostlist.push_back(host);
541  }
542 
543  if (my_host_rank >= hostlist.size()) {
544  std::cerr << "Thrill: endpoint list (" << list.size() << " entries) "
545  << "does not include my host_rank (" << my_host_rank << ")"
546  << std::endl;
547  return -1;
548  }
549  }
550  else {
551  std::cerr << "Thrill: environment variable THRILL_HOSTLIST"
552  << " is required for tcp network backend."
553  << std::endl;
554  return -1;
555  }
556 
557  // determine number of local worker threads per process
558 
559  const char* str_workers_per_host;
560  const char* env_workers_per_host;
561 
562  size_t workers_per_host = FindWorkersPerHost(
563  str_workers_per_host, env_workers_per_host);
564 
565  if (workers_per_host == 0)
566  return -1;
567 
568  // detect memory config
569 
570  MemoryConfig mem_config;
571  if (mem_config.setup_detect() < 0) return -1;
572  mem_config.print(workers_per_host);
573 
574  // okay, configuration is good.
575 
576  std::cerr << "Thrill: running in tcp network with " << hostlist.size()
577  << " hosts and " << workers_per_host << " workers per host"
578  << " with " << common::GetHostname()
579  << " as rank " << my_host_rank << " and endpoints";
580  for (const std::string& ep : hostlist)
581  std::cerr << ' ' << ep;
582  std::cerr << std::endl;
583 
584  if (!Initialize()) return -1;
585 
586  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
587 
588  // construct three TCP network groups
589  auto select_dispatcher = std::make_unique<net::tcp::SelectDispatcher>();
590 
591  std::array<std::unique_ptr<net::tcp::Group>, kGroupCount> groups;
593  *select_dispatcher, my_host_rank, hostlist,
594  groups.data(), net::Manager::kGroupCount);
595 
596  std::array<net::GroupPtr, kGroupCount> host_groups = {
597  { std::move(groups[0]), std::move(groups[1]) }
598  };
599 
600  // construct HostContext
601 
602  auto dispatcher = std::make_unique<net::DispatcherThread>(
603  std::move(select_dispatcher), my_host_rank);
604 
605  HostContext host_context(
606  0, mem_config,
607  std::move(dispatcher), std::move(host_groups), workers_per_host);
608 
609  std::vector<std::thread> threads(workers_per_host);
610 
611  for (size_t worker = 0; worker < workers_per_host; worker++) {
612  threads[worker] = common::CreateThread(
613  [&host_context, &job_startpoint, worker] {
614  Context ctx(host_context, worker);
615  common::NameThisThread("worker " + mem::to_string(worker));
616 
617  ctx.Launch(job_startpoint);
618  });
619  common::SetCpuAffinity(threads[worker], worker);
620  }
621 
622  // join worker threads
623  int global_result = 0;
624 
625  for (size_t i = 0; i < workers_per_host; i++) {
626  threads[i].join();
627  }
628 
629  if (!Deinitialize()) return -1;
630 
631  // terminate dispatcher, this waits for unfinished AsyncWrites.
632  dispatcher->Terminate();
633 
634  return global_result;
635 }
636 #endif
637 
638 #if THRILL_HAVE_NET_MPI
639 static inline
640 int RunBackendMpi(const std::function<void(Context&)>& job_startpoint) {
641 
642  // determine number of local worker threads per MPI process
643 
644  const char* str_workers_per_host;
645  const char* env_workers_per_host;
646 
647  size_t workers_per_host = FindWorkersPerHost(
648  str_workers_per_host, env_workers_per_host);
649 
650  if (workers_per_host == 0)
651  return -1;
652 
653  // reserve one thread for MPI net::Dispatcher which runs a busy-waiting loop
654 
655  if (workers_per_host == 1) {
656  std::cerr << "Thrill: environment variable"
657  << ' ' << str_workers_per_host
658  << '=' << env_workers_per_host
659  << " is not recommended, as one thread is used exclusively"
660  << " for MPI communication."
661  << std::endl;
662  }
663  else {
664  --workers_per_host;
665  }
666 
667  // detect memory config
668 
669  MemoryConfig mem_config;
670  if (mem_config.setup_detect() < 0) return -1;
671  mem_config.print(workers_per_host);
672 
673  // okay, configuration is good.
674 
675  size_t num_hosts = net::mpi::NumMpiProcesses();
676  size_t mpi_rank = net::mpi::MpiRank();
677  std::string hostname = common::GetHostname();
678 
679  std::cerr << "Thrill: running in MPI network with " << num_hosts
680  << " hosts and " << workers_per_host << "+1 workers per host"
681  << " with " << hostname << " as rank " << mpi_rank << "."
682  << std::endl;
683 
684  if (!Initialize()) return -1;
685 
686  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
687 
688  // construct three MPI network groups
689  auto dispatcher = std::make_unique<net::DispatcherThread>(
690  std::make_unique<net::mpi::Dispatcher>(num_hosts), mpi_rank);
691 
692  std::array<std::unique_ptr<net::mpi::Group>, kGroupCount> groups;
693  net::mpi::Construct(num_hosts, *dispatcher, groups.data(), kGroupCount);
694 
695  std::array<net::GroupPtr, kGroupCount> host_groups = {
696  { std::move(groups[0]), std::move(groups[1]) }
697  };
698 
699  // construct HostContext
700  HostContext host_context(
701  0, mem_config,
702  std::move(dispatcher), std::move(host_groups), workers_per_host);
703 
704  // launch worker threads
705  std::vector<std::thread> threads(workers_per_host);
706 
707  for (size_t worker = 0; worker < workers_per_host; worker++) {
708  threads[worker] = common::CreateThread(
709  [&host_context, &job_startpoint, worker] {
710  Context ctx(host_context, worker);
711  common::NameThisThread("host " + mem::to_string(ctx.host_rank())
712  + " worker " + mem::to_string(worker));
713 
714  ctx.Launch(job_startpoint);
715  });
716  common::SetCpuAffinity(threads[worker], worker);
717  }
718 
719  // join worker threads
720  int global_result = 0;
721 
722  for (size_t i = 0; i < workers_per_host; i++) {
723  threads[i].join();
724  }
725 
726  if (!Deinitialize()) return -1;
727 
728  return global_result;
729 }
730 #endif
731 
732 #if THRILL_HAVE_NET_IB
733 static inline
734 int RunBackendIb(const std::function<void(Context&)>& job_startpoint) {
735 
736  // determine number of local worker threads per IB/MPI process
737 
738  const char* str_workers_per_host;
739  const char* env_workers_per_host;
740 
741  size_t workers_per_host = FindWorkersPerHost(
742  str_workers_per_host, env_workers_per_host);
743 
744  if (workers_per_host == 0)
745  return -1;
746 
747  // detect memory config
748 
749  MemoryConfig mem_config;
750  if (mem_config.setup_detect() < 0) return -1;
751  mem_config.print(workers_per_host);
752 
753  // okay, configuration is good.
754 
755  size_t num_hosts = net::ib::NumMpiProcesses();
756  size_t mpi_rank = net::ib::MpiRank();
757 
758  std::cerr << "Thrill: running in IB/MPI network with " << num_hosts
759  << " hosts and " << workers_per_host << " workers per host"
760  << " with " << common::GetHostname()
761  << " as rank " << mpi_rank << "."
762  << std::endl;
763 
764  if (!Initialize()) return -1;
765 
766  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
767 
768  // construct two MPI network groups
769  std::array<std::unique_ptr<net::ib::Group>, kGroupCount> groups;
770  net::ib::Construct(num_hosts, groups.data(), kGroupCount);
771 
772  std::array<net::GroupPtr, kGroupCount> host_groups = {
773  { std::move(groups[0]), std::move(groups[1]) }
774  };
775 
776  // construct HostContext
777  HostContext host_context(
778  0, mem_config, std::move(host_groups), workers_per_host);
779 
780  // launch worker threads
781  std::vector<std::thread> threads(workers_per_host);
782 
783  for (size_t worker = 0; worker < workers_per_host; worker++) {
784  threads[worker] = common::CreateThread(
785  [&host_context, &job_startpoint, worker] {
786  Context ctx(host_context, worker);
787  common::NameThisThread("host " + mem::to_string(ctx.host_rank())
788  + " worker " + mem::to_string(worker));
789 
790  ctx.Launch(job_startpoint);
791  });
792  common::SetCpuAffinity(threads[worker], worker);
793  }
794 
795  // join worker threads
796  int global_result = 0;
797 
798  for (size_t i = 0; i < workers_per_host; i++) {
799  threads[i].join();
800  }
801 
802  if (!Deinitialize()) return -1;
803 
804  return global_result;
805 }
806 #endif
807 
808 int RunNotSupported(const char* env_net) {
809  std::cerr << "Thrill: network backend " << env_net
810  << " is not supported by this binary." << std::endl;
811  return -1;
812 }
813 
814 static inline
815 const char * DetectNetBackend() {
816  // detect openmpi and intel mpi run, add others as well.
817  if (getenv("OMPI_COMM_WORLD_SIZE") != nullptr ||
818  getenv("I_MPI_INFO_NP") != nullptr) {
819 #if THRILL_HAVE_NET_IB
820  return "ib";
821 #elif THRILL_HAVE_NET_MPI
822  return "mpi";
823 #else
824  std::cerr << "Thrill: MPI environment detected, but network backend mpi"
825  << " is not supported by this binary." << std::endl;
826  return nullptr;
827 #endif
828  }
829 #if defined(_MSC_VER)
830  return "mock";
831 #else
832  const char* env_rank = getenv("THRILL_RANK");
833  const char* env_hostlist = getenv("THRILL_HOSTLIST");
834 
835  if (env_rank != nullptr || env_hostlist != nullptr)
836  return "tcp";
837  else
838  return "local";
839 #endif
840 }
841 
843 
844  const char* env_die_with_parent = getenv("THRILL_DIE_WITH_PARENT");
845  if (env_die_with_parent == nullptr || *env_die_with_parent == 0) return 0;
846 
847  char* endptr;
848 
849  long die_with_parent = std::strtol(env_die_with_parent, &endptr, 10);
850  if (endptr == nullptr || *endptr != 0 ||
851  (die_with_parent != 0 && die_with_parent != 1)) {
852  std::cerr << "Thrill: environment variable"
853  << " THRILL_DIE_WITH_PARENT=" << env_die_with_parent
854  << " is not either 0 or 1."
855  << std::endl;
856  return -1;
857  }
858 
859  if (die_with_parent == 0) return 0;
860 
861 #if __linux__
862  if (prctl(PR_SET_PDEATHSIG, SIGTERM) != 0) // NOLINT
863  throw common::ErrnoException("Error calling prctl(PR_SET_PDEATHSIG)");
864  return 1;
865 #else
866  std::cerr << "Thrill: DIE_WITH_PARENT is not supported on this platform.\n"
867  << "Please submit a patch."
868  << std::endl;
869  return 0;
870 #endif
871 }
872 
874 
875  const char* env_unlink_binary = getenv("THRILL_UNLINK_BINARY");
876  if (env_unlink_binary == nullptr || *env_unlink_binary == 0) return 0;
877 
878  if (unlink(env_unlink_binary) != 0) {
880  "Error calling unlink binary \""
881  + std::string(env_unlink_binary) + "\"");
882  }
883 
884  return 0;
885 }
886 
887 int Run(const std::function<void(Context&)>& job_startpoint) {
888 
889  if (RunCheckDieWithParent() < 0)
890  return -1;
891 
892  if (RunCheckUnlinkBinary() < 0)
893  return -1;
894 
895  // parse environment: THRILL_NET
896  const char* env_net = getenv("THRILL_NET");
897 
898  // if no backend configured: automatically select one.
899  if (env_net == nullptr || *env_net == 0) {
900  env_net = DetectNetBackend();
901  if (env_net == nullptr) return -1;
902  }
903 
904  // run with selected backend
905  if (strcmp(env_net, "mock") == 0) {
906  // mock network backend
907  return RunBackendLoopback<net::mock::Group>("mock", job_startpoint);
908  }
909 
910  if (strcmp(env_net, "local") == 0) {
911 #if THRILL_HAVE_NET_TCP
912  // tcp loopback network backend
913  return RunBackendLoopback<net::tcp::Group>("tcp", job_startpoint);
914 #else
915  return RunNotSupported(env_net);
916 #endif
917  }
918 
919  if (strcmp(env_net, "tcp") == 0) {
920 #if THRILL_HAVE_NET_TCP
921  // real tcp network backend
922  return RunBackendTcp(job_startpoint);
923 #else
924  return RunNotSupported(env_net);
925 #endif
926  }
927 
928  if (strcmp(env_net, "mpi") == 0) {
929 #if THRILL_HAVE_NET_MPI
930  // mpi network backend
931  return RunBackendMpi(job_startpoint);
932 #else
933  return RunNotSupported(env_net);
934 #endif
935  }
936 
937  if (strcmp(env_net, "ib") == 0) {
938 #if THRILL_HAVE_NET_IB
939  // ib/mpi network backend
940  return RunBackendIb(job_startpoint);
941 #else
942  return RunNotSupported(env_net);
943 #endif
944  }
945 
946  std::cerr << "Thrill: network backend " << env_net << " is unknown."
947  << std::endl;
948  return -1;
949 }
950 
951 /******************************************************************************/
952 // MemoryConfig
953 
954 void MemoryConfig::setup(size_t ram) {
955  ram_ = ram;
956  apply();
957 }
958 
960 
961  // determine amount of physical RAM or take user's limit
962 
963  const char* env_ram = getenv("THRILL_RAM");
964 
965  if (env_ram != nullptr && *env_ram != 0) {
966  uint64_t ram64;
967  if (!tlx::parse_si_iec_units(env_ram, &ram64)) {
968  std::cerr << "Thrill: environment variable"
969  << " THRILL_RAM=" << env_ram
970  << " is not a valid amount of RAM memory."
971  << std::endl;
972  return -1;
973  }
974  ram_ = static_cast<size_t>(ram64);
975  }
976  else {
977  // detect amount of physical memory on system
978 #if defined(_MSC_VER)
979  MEMORYSTATUSEX memstx;
980  memstx.dwLength = sizeof(memstx);
981  GlobalMemoryStatusEx(&memstx);
982 
983  ram_ = memstx.ullTotalPhys;
984 #elif __APPLE__
985  int mib[2];
986  int64_t physical_memory;
987  size_t length;
988 
989  // Get the physical memory size
990  mib[0] = CTL_HW;
991  mib[1] = HW_MEMSIZE;
992  length = sizeof(physical_memory);
993  sysctl(mib, 2, &physical_memory, &length, nullptr, 0);
994  ram_ = static_cast<size_t>(physical_memory);
995 #else
996  ram_ = sysconf(_SC_PHYS_PAGES) * static_cast<size_t>(sysconf(_SC_PAGESIZE));
997 #endif
998 
999 #if __linux__
1000  // use getrlimit() to check user limit on address space
1001  struct rlimit rl; // NOLINT
1002  if (getrlimit(RLIMIT_AS, &rl) == 0) {
1003  if (rl.rlim_cur != 0 && rl.rlim_cur * 3 / 4 < ram_) {
1004  ram_ = rl.rlim_cur * 3 / 4;
1005  }
1006  }
1007  else {
1008  sLOG1 << "getrlimit(): " << strerror(errno);
1009  }
1010 #endif
1011  }
1012 
1013  apply();
1014 
1015  return 0;
1016 }
1017 
1019  // divide up ram_
1020 
1021  ram_workers_ = ram_ / 3;
1022  ram_block_pool_hard_ = ram_ / 3;
1025 
1026  // set memory limit, only BlockPool is excluded from malloc tracking, as
1027  // only it uses bypassing allocators.
1029 }
1030 
1031 MemoryConfig MemoryConfig::divide(size_t hosts) const {
1032 
1033  MemoryConfig mc = *this;
1034  mc.ram_ /= hosts;
1035  mc.ram_block_pool_hard_ /= hosts;
1036  mc.ram_block_pool_soft_ /= hosts;
1037  mc.ram_workers_ /= hosts;
1038  // free floating memory is not divided by host, as it is measured overall
1039 
1040  return mc;
1041 }
1042 
1043 void MemoryConfig::print(size_t workers_per_host) const {
1044  if (!verbose_) return;
1045 
1046  std::cerr
1047  << "Thrill: using "
1048  << tlx::format_iec_units(ram_) << "B RAM total,"
1049  << " BlockPool=" << tlx::format_iec_units(ram_block_pool_hard_) << "B,"
1050  << " workers="
1051  << tlx::format_iec_units(ram_workers_ / workers_per_host) << "B,"
1052  << " floating=" << tlx::format_iec_units(ram_floating_) << "B."
1053  << std::endl;
1054 }
1055 
1056 /******************************************************************************/
1057 // HostContext methods
1058 
1060  size_t local_host_id,
1061  const MemoryConfig& mem_config,
1062  std::unique_ptr<net::DispatcherThread> dispatcher,
1063  std::array<net::GroupPtr, net::Manager::kGroupCount>&& groups,
1064  size_t workers_per_host)
1065  : mem_config_(mem_config),
1066  base_logger_(MakeHostLogPath(groups[0]->my_host_rank())),
1067  logger_(&base_logger_, "host_rank", groups[0]->my_host_rank()),
1068  profiler_(std::make_unique<common::ProfileThread>()),
1069  local_host_id_(local_host_id),
1070  workers_per_host_(workers_per_host),
1071  dispatcher_(std::move(dispatcher)),
1072  net_manager_(std::move(groups), logger_) {
1073 
1074  // write command line parameters to json log
1076 
1078 
1079  // run memory profiler only on local host 0 (especially for test runs)
1080  if (local_host_id == 0)
1082 }
1083 
1085  // stop dispatcher _before_ stopping multiplexer
1086  dispatcher_->Terminate();
1087 }
1088 
1090  const char* env_log = getenv("THRILL_LOG");
1091  if (env_log == nullptr) {
1092  if (host_rank == 0 && mem_config().verbose_) {
1093  std::cerr << "Thrill: no THRILL_LOG was found, "
1094  << "so no json log is written."
1095  << std::endl;
1096  }
1097  return std::string();
1098  }
1099 
1100  std::string output = env_log;
1101  if (output == "" || output == "-")
1102  return std::string();
1103  if (output == "/dev/stdout")
1104  return output;
1105  if (output == "stdout")
1106  return "/dev/stdout";
1107 
1108  return output + "-host-" + std::to_string(host_rank) + ".json";
1109 }
1110 
1111 /******************************************************************************/
1112 // Context methods
1113 
1114 Context::Context(HostContext& host_context, size_t local_worker_id)
1115  : local_host_id_(host_context.local_host_id()),
1116  local_worker_id_(local_worker_id),
1117  workers_per_host_(host_context.workers_per_host()),
1118  mem_limit_(host_context.worker_mem_limit()),
1119  mem_config_(host_context.mem_config()),
1120  mem_manager_(host_context.mem_manager()),
1121  net_manager_(host_context.net_manager()),
1122  flow_manager_(host_context.flow_manager()),
1123  block_pool_(host_context.block_pool()),
1124  multiplexer_(host_context.data_multiplexer()),
1125  rng_(std::random_device { }
1126  () + (local_worker_id_ << 16)),
1127  base_logger_(&host_context.base_logger_) {
1128  assert(local_worker_id < workers_per_host());
1129 }
1130 
1132  return GetFile(dia != nullptr ? dia->id() : 0);
1133 }
1134 
1136  return tlx::make_counting<data::File>(
1137  block_pool_, local_worker_id_, dia_id);
1138 }
1139 
1141  return GetFilePtr(dia != nullptr ? dia->id() : 0);
1142 }
1143 
1146 }
1147 
1149  return GetNewCatStream(dia != nullptr ? dia->id() : 0);
1150 }
1151 
1154 }
1155 
1157  return GetNewMixStream(dia != nullptr ? dia->id() : 0);
1158 }
1159 
1160 template <>
1161 data::CatStreamPtr Context::GetNewStream<data::CatStream>(size_t dia_id) {
1162  return GetNewCatStream(dia_id);
1163 }
1164 
1165 template <>
1166 data::MixStreamPtr Context::GetNewStream<data::MixStream>(size_t dia_id) {
1167  return GetNewMixStream(dia_id);
1168 }
1169 
1170 struct OverallStats {
1171 
1172  //! overall run time
1173  double runtime;
1174 
1175  //! maximum ByteBlock allocation on all workers
1176  size_t max_block_bytes;
1177 
1178  //! network traffic performed by net layer
1179  size_t net_traffic_tx, net_traffic_rx;
1180 
1181  //! I/O volume performed by io layer
1182  size_t io_volume;
1183 
1184  //! maximum external memory allocation
1185  size_t io_max_allocation;
1186 
1187  friend std::ostream& operator << (std::ostream& os, const OverallStats& c) {
1188  return os << "[OverallStats"
1189  << " runtime=" << c.runtime
1190  << " max_block_bytes=" << c.max_block_bytes
1191  << " net_traffic_tx=" << c.net_traffic_tx
1192  << " net_traffic_rx=" << c.net_traffic_rx
1193  << " io_volume=" << c.io_volume
1194  << " io_max_allocation=" << c.io_max_allocation
1195  << "]";
1196  }
1197 
1198  OverallStats operator + (const OverallStats& b) const {
1199  OverallStats r;
1200  r.runtime = std::max(runtime, b.runtime);
1201  r.max_block_bytes = max_block_bytes + b.max_block_bytes;
1202  r.net_traffic_tx = net_traffic_tx + b.net_traffic_tx;
1203  r.net_traffic_rx = net_traffic_rx + b.net_traffic_rx;
1204  r.io_volume = io_volume + b.io_volume;
1205  r.io_max_allocation = std::max(io_max_allocation, b.io_max_allocation);
1206  return r;
1207  }
1208 };
1209 
1210 void Context::Launch(const std::function<void(Context&)>& job_startpoint) {
1211  logger_ << "class" << "Context"
1212  << "event" << "job-start";
1213 
1214  common::StatsTimerStart overall_timer;
1215 
1216  try {
1217  job_startpoint(*this);
1218  }
1219  catch (std::exception& e) {
1220  LOG1 << "worker " << my_rank() << " threw " << typeid(e).name();
1221  LOG1 << " what(): " << e.what();
1222 
1223  logger_ << "class" << "Context"
1224  << "event" << "job-exception"
1225  << "exception" << typeid(e).name()
1226  << "what" << e.what();
1227  throw;
1228  }
1229 
1230  logger_ << "class" << "Context"
1231  << "event" << "job-done"
1232  << "elapsed" << overall_timer;
1233 
1234  // wait for all local workers to finish, prior to closing multiplexer's
1235  // streams.
1236  net.LocalBarrier();
1237 
1238  if (my_rank() == 0)
1239  multiplexer_.Close();
1240 
1241  net.LocalBarrier();
1242 
1243  overall_timer.Stop();
1244 
1245  // collect overall statistics
1246  OverallStats stats;
1247  stats.runtime = overall_timer.SecondsDouble();
1248 
1249  stats.max_block_bytes =
1251 
1252  stats.net_traffic_tx = local_worker_id_ == 0 ? net_manager_.Traffic().tx : 0;
1253  stats.net_traffic_rx = local_worker_id_ == 0 ? net_manager_.Traffic().rx : 0;
1254 
1255  if (local_host_id_ == 0 && local_worker_id_ == 0) {
1257  stats.io_volume = io_stats.read_volume() + io_stats.write_volume();
1258  stats.io_max_allocation =
1259  io::BlockManager::GetInstance()->maximum_allocation();
1260  }
1261  else {
1262  stats.io_volume = 0;
1263  stats.io_max_allocation = 0;
1264  }
1265 
1266  LOG0 << stats;
1267 
1268  stats = net.Reduce(stats);
1269 
1270  if (my_rank() == 0) {
1271  using tlx::format_iec_units;
1272 
1273  if (stats.net_traffic_rx != stats.net_traffic_tx)
1274  LOG1 << "Manager::Traffic() tx/rx asymmetry = "
1275  << tlx::abs_diff(stats.net_traffic_tx, stats.net_traffic_rx);
1276 
1277  if (mem_config().verbose_) {
1278  std::cerr
1279  << "Thrill:"
1280  << " ran " << stats.runtime << "s with max "
1281  << format_iec_units(stats.max_block_bytes) << "B in DIA Blocks, "
1282  << format_iec_units(stats.net_traffic_tx) << "B network traffic, "
1283  << format_iec_units(stats.io_volume) << "B disk I/O, and "
1284  << format_iec_units(stats.io_max_allocation) << "B max disk use."
1285  << std::endl;
1286  }
1287 
1288  logger_ << "class" << "Context"
1289  << "event" << "summary"
1290  << "runtime" << stats.runtime
1291  << "net_traffic" << stats.net_traffic_tx
1292  << "io_volume" << stats.io_volume
1293  << "io_max_allocation" << stats.io_max_allocation;
1294  }
1295 }
1296 
1297 } // namespace api
1298 } // namespace thrill
1299 
1300 /******************************************************************************/
CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id)
Request next stream.
size_t ram_
total amount of physical ram detected or THRILL_RAM
Definition: context.hpp:63
size_t local_host_id_
id among all local hosts (in test program runs)
Definition: context.hpp:392
net::FlowControlChannel & net
Definition: context.hpp:443
size_t ram_floating_
remaining free-floating RAM used for user and Thrill data structures.
Definition: context.hpp:76
MemoryConfig divide(size_t hosts) const
Definition: context.cpp:1031
size_t max_total_bytes() noexcept
Maximum total number of bytes allocated in blocks of this block pool.
Definition: block_pool.cpp:872
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:175
#define sLOG1
Definition: logger.hpp:188
The central object of a mock network: the Group containing links to other mock Group forming the netw...
Definition: group.hpp:116
void LogCmdlineParams(JsonLogger &logger)
Definition: porting.cpp:67
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
HostContext(size_t local_host_id, const MemoryConfig &mem_config, std::unique_ptr< net::DispatcherThread > dispatcher, std::array< net::GroupPtr, net::Manager::kGroupCount > &&groups, size_t workers_per_host)
constructor from existing net Groups. Used by the construction methods.
Definition: context.cpp:1059
void StartMemProfiler(common::ProfileThread &sched, common::JsonLogger &logger)
launch profiler task
void Initialize()
Initialize VFS layer.
Definition: file_io.cpp:34
net::Manager & net_manager_
net::Manager instance that is shared among workers
Definition: context.hpp:410
#define LOG1
Definition: logger.hpp:176
std::string MakeHostLogPath(size_t host_rank)
create host log
Definition: context.cpp:1089
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:24
Collection of NetConnections to workers, allows point-to-point client communication and simple collec...
Definition: group.hpp:43
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:887
An Exception which is thrown on system errors and contains errno information.
void set_memory_limit_indication(ssize_t size)
common::JsonLogger logger_
Definition: context.hpp:155
const MemoryConfig & mem_config() const
host-global memory config
Definition: context.hpp:326
int RunCheckDieWithParent()
Check environment variable THRILL_DIE_WITH_PARENT and enable process flag: this is useful for ssh/inv...
Definition: context.cpp:842
int setup_detect()
detect memory configuration from environment
Definition: context.cpp:959
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:218
size_t workers_per_host() const
number of workers per host (all have the same).
Definition: context.hpp:112
int64_t write_volume() const
Definition: iostats.hpp:411
The DIABase is the untyped super class of DIANode.
Definition: dia_base.hpp:87
static by_string to_string(int val)
convert to string
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
static bool Deinitialize()
Definition: context.cpp:281
void NameThisThread(const mem::by_string &name)
Defines a name for the current thread, only if no name was set previously.
Definition: logger.cpp:42
static std::vector< std::unique_ptr< HostContext > > ConstructLoopbackHostContexts(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host)
Generic network constructor for net backends supporting loopback tests.
Definition: context.cpp:93
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
Definition: split.cpp:20
int RunCheckUnlinkBinary()
Check environment variable THRILL_UNLINK_BINARY and unlink given program path: this is useful for ssh...
Definition: context.cpp:873
data::BlockPool & block_pool_
data block pool
Definition: context.hpp:416
size_t workers_per_host() const
Returns the number of workers that is hosted on each host.
Definition: context.hpp:235
static void RunLoopbackThreads(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, size_t core_offset, const std::function< void(Context &)> &job_startpoint)
Generic runner for backends supporting loopback tests.
Definition: context.cpp:134
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1144
size_t start_block_size
starting size of blocks in BlockWriter.
Definition: byte_block.cpp:23
~HostContext()
destructor
Definition: context.cpp:1084
std::string GetHostname()
get hostname
Definition: porting.cpp:142
void print(size_t workers_per_host) const
Definition: context.cpp:1043
bool parse_si_iec_units(const char *str, uint64_t *out_size, char default_unit)
Parse a string like "343KB" or "44 GiB" into the corresponding size in bytes.
static instance_pointer GetInstance()
Definition: singleton.hpp:50
void RunLocalSameThread(const std::function< void(Context &)> &job_startpoint)
Runs the given job_startpoint within the same thread with a test network –> run test with one host an...
Definition: context.cpp:360
size_t local_worker_id_
number of this host context, 0..p-1, within this host
Definition: context.hpp:395
unique_ptr< T > make_unique(Manager &manager, Args &&...args)
make_unique with Manager tracking
Definition: allocator.hpp:212
int value
Definition: gen_data.py:41
size_t ram_block_pool_hard_
amount of RAM dedicated to data::BlockPool – hard limit
Definition: context.hpp:66
int wrap_setenv(const char *name, const char *value, int overwrite)
Definition: context.cpp:328
void RunLocalMock(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, const std::function< void(Context &)> &job_startpoint)
Function to run a number of mock hosts as locally independent threads, which communicate via internal...
Definition: context.cpp:297
void StartLinuxProcStatsProfiler(ProfileThread &, JsonLogger &)
launch profiler task
T abs_diff(const T &a, const T &b)
absolute difference, which also works for unsigned types
Definition: abs_diff.hpp:24
static bool SetupBlockSize()
Definition: context.cpp:177
void Deinitialize()
Deinitialize VFS layer.
Definition: file_io.cpp:39
common::JsonLogger logger_
Definition: context.hpp:459
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Reduce(const T &value, size_t root=0, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers to the given worker, provided a certain red...
int RunNotSupported(const char *env_net)
Definition: context.cpp:808
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:42
void LocalBarrier()
A trivial local thread barrier.
net::Traffic Traffic() const
calculate overall traffic for final stats
Definition: group.cpp:67
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a test network with an underlying full mesh of local loopback stream sockets for testing...
Definition: group.cpp:36
data::FilePtr GetFilePtr(size_t dia_id)
Definition: context.cpp:1135
MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id)
Request next stream.
void RunLocalTests(const std::function< void(Context &)> &job_startpoint)
Helper Function to execute RunLocalMock() tests using mock networks in test suite for many different ...
Definition: context.cpp:333
size_t MpiRank()
Return the rank of this process in the MPI COMM WORLD.
Definition: group.cpp:707
void Construct(SelectDispatcher &dispatcher, size_t my_rank, const std::vector< std::string > &endpoints, std::unique_ptr< Group > *groups, size_t group_count)
Definition: construct.cpp:543
static size_t FindWorkersPerHost(const char *&str_workers_per_host, const char *&env_workers_per_host)
Definition: context.cpp:202
Context(HostContext &host_context, size_t local_worker_id)
Definition: context.cpp:1114
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
data::MixStreamPtr GetNewMixStream(size_t dia_id)
Definition: context.cpp:1152
void Launch(const std::function< void(Context &)> &job_startpoint)
method used to launch a job's main procedure. it wraps it in log output.
Definition: context.cpp:1210
data::Multiplexer & multiplexer_
data::Multiplexer instance that is shared among workers
Definition: context.hpp:419
size_t ram_block_pool_soft_
amount of RAM dedicated to data::BlockPool – soft limit
Definition: context.hpp:69
static constexpr size_t kGroupCount
The count of net::Groups to initialize.
Definition: manager.hpp:61
The HostContext contains all data structures shared among workers on the same host.
Definition: context.hpp:87
size_t local_worker_id() const
Definition: context.hpp:260
std::basic_string< char, std::char_traits< char >, BypassAllocator< char > > by_string
string without malloc tracking
void setup(size_t ram)
setup memory size
Definition: context.cpp:954
static int RunBackendLoopback(const char *backend, const std::function< void(Context &)> &job_startpoint)
Run() implementation which uses a loopback net backend ("mock" or "tcp").
Definition: context.cpp:403
net::tcp::Group TestGroup
Definition: context.cpp:294
static std::vector< std::unique_ptr< HostContext > > ConstructLoopback(size_t num_hosts, size_t workers_per_host)
Construct a number of mock hosts running in this process.
Definition: context.cpp:306
MemoryConfig & mem_config()
host-global memory config
Definition: context.hpp:120
size_t rx
received bytes
Definition: manager.hpp:37
size_t NumMpiProcesses()
Return the number of MPI processes.
Definition: group.cpp:694
static bool Initialize()
Definition: context.cpp:272
int64_t read_volume() const
Definition: iostats.hpp:407
std::unique_ptr< common::ProfileThread > profiler_
thread for scheduling profiling methods for statistical output
Definition: context.hpp:158
void Close()
Closes all client connections.
std::unique_ptr< net::DispatcherThread > dispatcher_
main host network dispatcher thread backend
Definition: context.hpp:173
void SetCpuAffinity(std::thread &thread, size_t cpu_id)
set cpu/core affinity of a thread
Definition: porting.cpp:110
std::string format_iec_units(uint64_t number, int precision)
Format number as something like 1 TiB.
static const char * DetectNetBackend()
Definition: context.cpp:815
bool verbose_
StageBuilder verbosity flag.
Definition: context.hpp:79
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:321
std::ostream & operator<<(std::ostream &os, const DIABase &d)
make ostream-able.
Definition: dia_base.cpp:448
static constexpr const T & max(const T &a, const T &b)
template for constexpr max, because std::max is not good enough.
Definition: functional.hpp:65
bool Construct(size_t group_size, DispatcherThread &dispatcher, std::unique_ptr< Group > *groups, size_t group_count)
Construct Group which connects to peers using MPI.
Definition: group.cpp:668
size_t tx
transmitted bytes
Definition: manager.hpp:35