Thrill  0.1
context.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/context.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/api/context.hpp>
13 
14 #include <thrill/api/dia_base.hpp>
16 #include <thrill/common/logger.hpp>
17 #include <thrill/common/math.hpp>
20 #include <thrill/common/string.hpp>
22 #include <thrill/vfs/file_io.hpp>
23 
24 #include <foxxll/io/iostats.hpp>
25 #include <foxxll/mng/config.hpp>
26 #include <tlx/math/abs_diff.hpp>
27 #include <tlx/port/setenv.hpp>
30 #include <tlx/string/split.hpp>
31 
32 // mock net backend is always available -tb :)
34 
35 #if THRILL_HAVE_NET_TCP
38 #endif
39 
40 #if THRILL_HAVE_NET_MPI
42 #include <thrill/net/mpi/group.hpp>
43 #endif
44 
45 #if THRILL_HAVE_NET_IB
46 #include <thrill/net/ib/group.hpp>
47 #endif
48 
49 #if __linux__
50 
51 // linux-specific process control
52 #include <sys/prctl.h>
53 
54 // for calling getrlimit() to determine memory limit
55 #include <sys/resource.h>
56 #include <sys/time.h>
57 
58 #endif
59 
60 #if __APPLE__
61 
62 // for sysctl()
63 #include <sys/sysctl.h>
64 #include <sys/types.h>
65 
66 #endif
67 
68 #if defined(_MSC_VER)
69 
70 // for detecting amount of physical memory
71 #include <windows.h>
72 
73 #endif
74 
75 #include <algorithm>
76 #include <csignal>
77 #include <iostream>
78 #include <memory>
79 #include <string>
80 #include <thread>
81 #include <tuple>
82 #include <utility>
83 #include <vector>
84 
85 namespace thrill {
86 namespace api {
87 
88 /******************************************************************************/
89 // Generic Network Construction
90 
91 //! Generic network constructor for net backends supporting loopback tests.
92 template <typename NetGroup>
93 static inline
94 std::vector<std::unique_ptr<HostContext> >
96  const MemoryConfig& mem_config,
97  size_t num_hosts, size_t workers_per_host) {
98 
99  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
100 
101  // construct three full mesh loopback cliques, deliver net::Groups.
102  std::array<std::vector<std::unique_ptr<NetGroup> >, kGroupCount> group;
103 
104  for (size_t g = 0; g < kGroupCount; ++g) {
105  group[g] = NetGroup::ConstructLoopbackMesh(num_hosts);
106  }
107 
108  std::vector<std::unique_ptr<net::DispatcherThread> > dispatcher;
109  for (size_t h = 0; h < num_hosts; ++h) {
110  dispatcher.emplace_back(
111  std::make_unique<net::DispatcherThread>(
112  std::make_unique<typename NetGroup::Dispatcher>(), h));
113  }
114 
115  // construct host context
116 
117  std::vector<std::unique_ptr<HostContext> > host_context;
118 
119  for (size_t h = 0; h < num_hosts; h++) {
120  std::array<net::GroupPtr, kGroupCount> host_group = {
121  { std::move(group[0][h]), std::move(group[1][h]) }
122  };
123 
124  host_context.emplace_back(
125  std::make_unique<HostContext>(
126  h, mem_config, std::move(dispatcher[h]),
127  std::move(host_group), workers_per_host));
128  }
129 
130  return host_context;
131 }
132 
133 //! Generic runner for backends supporting loopback tests.
134 template <typename NetGroup>
135 static inline void
137  const MemoryConfig& mem_config,
138  size_t num_hosts, size_t workers_per_host, size_t core_offset,
139  const std::function<void(Context&)>& job_startpoint) {
140 
141  MemoryConfig host_mem_config = mem_config.divide(num_hosts);
142  mem_config.print(workers_per_host);
143 
144  // construct a mock network of hosts
145  typename NetGroup::Dispatcher dispatcher;
146 
147  std::vector<std::unique_ptr<HostContext> > host_contexts =
148  ConstructLoopbackHostContexts<NetGroup>(
149  host_mem_config, num_hosts, workers_per_host);
150 
151  // launch thread for each of the workers on this host.
152  std::vector<std::thread> threads(num_hosts * workers_per_host);
153 
154  for (size_t host = 0; host < num_hosts; ++host) {
155  std::string log_prefix = "host " + std::to_string(host);
156  for (size_t worker = 0; worker < workers_per_host; ++worker) {
157  size_t id = host * workers_per_host + worker;
158  threads[id] = common::CreateThread(
159  [&host_contexts, &job_startpoint, host, worker, log_prefix] {
160  Context ctx(*host_contexts[host], worker);
162  log_prefix + " worker " + std::to_string(worker));
163 
164  ctx.Launch(job_startpoint);
165  });
166  common::SetCpuAffinity(threads[id], core_offset + id);
167  }
168  }
169 
170  // join worker threads
171  for (size_t i = 0; i < num_hosts * workers_per_host; i++) {
172  threads[i].join();
173  }
174 }
175 
176 /******************************************************************************/
177 // Other Configuration Initializations
178 
179 static inline bool SetupBlockSize() {
180 
181  const char* env_block_size = getenv("THRILL_BLOCK_SIZE");
182  if (env_block_size == nullptr || *env_block_size == 0) return true;
183 
184  char* endptr;
185  data::default_block_size = std::strtoul(env_block_size, &endptr, 10);
186 
187  if (endptr == nullptr || *endptr != 0 || data::default_block_size == 0) {
188  std::cerr << "Thrill: environment variable"
189  << " THRILL_BLOCK_SIZE=" << env_block_size
190  << " is not a valid number."
191  << std::endl;
192  return false;
193  }
194 
196 
197  std::cerr << "Thrill: setting default_block_size = "
199  << std::endl;
200 
201  return true;
202 }
203 
204 static inline size_t FindWorkersPerHost(
205  const char*& str_workers_per_host, const char*& env_workers_per_host) {
206 
207  char* endptr;
208 
209  // first check THRILL_WORKERS_PER_HOST
210 
211  str_workers_per_host = "THRILL_WORKERS_PER_HOST";
212  env_workers_per_host = getenv(str_workers_per_host);
213 
214  if (env_workers_per_host && *env_workers_per_host) {
215  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
216  if (!endptr || *endptr != 0 || result == 0) {
217  std::cerr << "Thrill: environment variable"
218  << ' ' << str_workers_per_host
219  << '=' << env_workers_per_host
220  << " is not a valid number of workers per host."
221  << std::endl;
222  return 0;
223  }
224  else {
225  return result;
226  }
227  }
228 
229  // second check: look for OMP_NUM_THREADS
230 
231  str_workers_per_host = "OMP_NUM_THREADS";
232  env_workers_per_host = getenv(str_workers_per_host);
233 
234  if (env_workers_per_host && *env_workers_per_host) {
235  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
236  if (!endptr || *endptr != 0 || result == 0) {
237  std::cerr << "Thrill: environment variable"
238  << ' ' << str_workers_per_host
239  << '=' << env_workers_per_host
240  << " is not a valid number of workers per host."
241  << std::endl;
242  // fall through, try next variable
243  }
244  else {
245  return result;
246  }
247  }
248 
249  // third check: look for SLURM_CPUS_ON_NODE
250 
251  str_workers_per_host = "SLURM_CPUS_ON_NODE";
252  env_workers_per_host = getenv(str_workers_per_host);
253 
254  if (env_workers_per_host && *env_workers_per_host) {
255  size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
256  if (!endptr || *endptr != 0 || result == 0) {
257  std::cerr << "Thrill: environment variable"
258  << ' ' << str_workers_per_host
259  << '=' << env_workers_per_host
260  << " is not a valid number of workers per host."
261  << std::endl;
262  // fall through, try next variable
263  }
264  else {
265  return result;
266  }
267  }
268 
269  // last check: return std::thread::hardware_concurrency()
270 
271  return std::thread::hardware_concurrency();
272 }
273 
274 static inline bool Initialize() {
275 
276  if (!SetupBlockSize()) return false;
277 
278  vfs::Initialize();
279 
280  return true;
281 }
282 
283 static inline bool Deinitialize() {
284 
286 
287  return true;
288 }
289 
290 /******************************************************************************/
291 // Constructions using TestGroup (either mock or tcp-loopback) for local testing
292 
293 #if defined(_MSC_VER)
295 #else
297 #endif
298 
299 void RunLocalMock(const MemoryConfig& mem_config,
300  size_t num_hosts, size_t workers_per_host,
301  const std::function<void(Context&)>& job_startpoint) {
302 
303  return RunLoopbackThreads<TestGroup>(
304  mem_config, num_hosts, workers_per_host, 0, job_startpoint);
305 }
306 
307 std::vector<std::unique_ptr<HostContext> >
308 HostContext::ConstructLoopback(size_t num_hosts, size_t workers_per_host) {
309 
310  // set fixed amount of RAM for testing
312  mem_config.setup(4 * 1024 * 1024 * 1024llu);
313  mem_config.verbose_ = false;
314 
315  return ConstructLoopbackHostContexts<TestGroup>(
316  mem_config, num_hosts, workers_per_host);
317 }
318 
319 void RunLocalTests(const std::function<void(Context&)>& job_startpoint) {
320  // set fixed amount of RAM for testing
321  RunLocalTests(4 * 1024 * 1024 * 1024llu, job_startpoint);
322 }
323 
325  size_t ram, const std::function<void(Context&)>& job_startpoint) {
326 
327  // discard json log
328  tlx::setenv("THRILL_LOG", "", /* overwrite */ 1);
329 
330  // set fixed amount of RAM for testing, disable /proc profiler
332  mem_config.verbose_ = false;
333  mem_config.enable_proc_profiler_ = false;
334  mem_config.setup(ram);
335 
336  static constexpr size_t num_hosts_list[] = { 1, 2, 5, 8 };
337  static constexpr size_t num_workers_list[] = { 1, 3 };
338 
339  size_t max_mock_workers = 1000000;
340 
341  const char* env_max_mock_workers = getenv("THRILL_MAX_MOCK_WORKERS");
342  if (env_max_mock_workers && *env_max_mock_workers) {
343  // parse envvar only if it exists.
344  char* endptr;
345  max_mock_workers = std::strtoul(env_max_mock_workers, &endptr, 10);
346 
347  if (!endptr || *endptr != 0 || max_mock_workers == 0) {
348  std::cerr << "Thrill: environment variable"
349  << " THRILL_MAX_MOCK_WORKERS=" << env_max_mock_workers
350  << " is not a valid maximum number of mock hosts."
351  << std::endl;
352  return;
353  }
354  }
355 
356  for (const size_t& num_hosts : num_hosts_list) {
357  for (const size_t& workers_per_host : num_workers_list) {
358  if (num_hosts * workers_per_host > max_mock_workers) {
359  std::cerr << "Thrill: skipping test with "
360  << num_hosts * workers_per_host
361  << " workers > max workers " << max_mock_workers
362  << std::endl;
363  continue;
364  }
365 
366  LOG0 << "Thrill: running local test with "
367  << num_hosts << " hosts and " << workers_per_host
368  << " workers per host";
369 
370  RunLocalMock(mem_config, num_hosts, workers_per_host,
371  job_startpoint);
372  }
373  }
374 }
375 
376 void RunLocalSameThread(const std::function<void(Context&)>& job_startpoint) {
377 
378  size_t my_host_rank = 0;
379  size_t workers_per_host = 1;
380  size_t num_hosts = 1;
381  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
382 
383  // set fixed amount of RAM for testing
385  mem_config.verbose_ = false;
386  mem_config.setup(4 * 1024 * 1024 * 1024llu);
387  mem_config.print(workers_per_host);
388 
389  // construct two full mesh connection cliques, deliver net::tcp::Groups.
390  std::array<std::vector<std::unique_ptr<TestGroup> >, kGroupCount> group;
391 
392  for (size_t g = 0; g < kGroupCount; ++g) {
393  group[g] = TestGroup::ConstructLoopbackMesh(num_hosts);
394  }
395 
396  std::array<net::GroupPtr, kGroupCount> host_group = {
397  { std::move(group[0][0]), std::move(group[1][0]) }
398  };
399 
400  auto dispatcher = std::make_unique<net::DispatcherThread>(
401  std::make_unique<TestGroup::Dispatcher>(), my_host_rank);
402 
403  HostContext host_context(
404  0, mem_config,
405  std::move(dispatcher), std::move(host_group), workers_per_host);
406 
407  Context ctx(host_context, 0);
408  common::NameThisThread("worker " + std::to_string(my_host_rank));
409 
410  job_startpoint(ctx);
411 }
412 
413 /******************************************************************************/
414 // Run() Variants for Different Net Backends
415 
416 //! Run() implementation which uses a loopback net backend ("mock" or "tcp").
417 template <typename NetGroup>
418 static inline
420  const char* backend, const std::function<void(Context&)>& job_startpoint) {
421 
422  char* endptr;
423 
424  // determine number of loopback hosts
425 
426  size_t num_hosts = 2;
427 
428  const char* env_local = getenv("THRILL_LOCAL");
429  if (env_local && *env_local) {
430  // parse envvar only if it exists.
431  num_hosts = std::strtoul(env_local, &endptr, 10);
432 
433  if (!endptr || *endptr != 0 || num_hosts == 0) {
434  std::cerr << "Thrill: environment variable"
435  << " THRILL_LOCAL=" << env_local
436  << " is not a valid number of local loopback hosts."
437  << std::endl;
438  return -1;
439  }
440  }
441 
442  // determine number of threads per loopback host
443 
444  const char* str_workers_per_host;
445  const char* env_workers_per_host;
446 
447  size_t workers_per_host = FindWorkersPerHost(
448  str_workers_per_host, env_workers_per_host);
449 
450  if (workers_per_host == 0)
451  return -1;
452 
453  // core offset for pinning
454 
455  const char* env_core_offset = getenv("THRILL_CORE_OFFSET");
456  size_t core_offset = 0;
457  if (env_core_offset && *env_core_offset) {
458  core_offset = std::strtoul(env_core_offset, &endptr, 10);
459 
460  size_t last_core = core_offset + num_hosts * workers_per_host;
461  if (!endptr || *endptr != 0 ||
462  last_core > std::thread::hardware_concurrency())
463  {
464  std::cerr << "Thrill: environment variable"
465  << " THRILL_CORE_OFFSET=" << env_core_offset
466  << " is not a valid number of cores to skip for pinning."
467  << std::endl;
468  return -1;
469  }
470  }
471 
472  // detect memory config
473 
475  if (mem_config.setup_detect() < 0) return -1;
476  mem_config.print(workers_per_host);
477 
478  // okay, configuration is good.
479 
480  std::cerr << "Thrill: running locally with " << num_hosts
481  << " test hosts and " << workers_per_host << " workers per host"
482  << " in a local " << backend << " network." << std::endl;
483 
484  if (!Initialize()) return -1;
485 
486  RunLoopbackThreads<NetGroup>(
487  mem_config, num_hosts, workers_per_host, core_offset, job_startpoint);
488 
489  if (!Deinitialize()) return -1;
490 
491  return 0;
492 }
493 
494 #if THRILL_HAVE_NET_TCP
495 static inline
496 int RunBackendTcp(const std::function<void(Context&)>& job_startpoint) {
497 
498  char* endptr;
499 
500  // select environment variables
501 
502  const char* str_rank = "THRILL_RANK";
503  const char* env_rank = getenv(str_rank);
504 
505  if (env_rank == nullptr) {
506  // take SLURM_PROCID if THRILL_RANK is not set
507  str_rank = "SLURM_PROCID";
508  env_rank = getenv(str_rank);
509  }
510 
511  const char* env_hostlist = getenv("THRILL_HOSTLIST");
512 
513  // parse environment variables
514 
515  size_t my_host_rank = 0;
516 
517  if (env_rank != nullptr && *env_rank != 0) {
518  my_host_rank = std::strtoul(env_rank, &endptr, 10);
519 
520  if (endptr == nullptr || *endptr != 0) {
521  std::cerr << "Thrill: environment variable "
522  << str_rank << '=' << env_rank
523  << " is not a valid number."
524  << std::endl;
525  return -1;
526  }
527  }
528  else {
529  std::cerr << "Thrill: environment variable THRILL_RANK"
530  << " is required for tcp network backend."
531  << std::endl;
532  return -1;
533  }
534 
535  std::vector<std::string> hostlist;
536 
537  if (env_hostlist != nullptr && *env_hostlist != 0) {
538  // first try to split by spaces, then by commas
539  std::vector<std::string> list = tlx::split(' ', env_hostlist);
540 
541  if (list.size() == 1) {
542  tlx::split(&list, ',', env_hostlist);
543  }
544 
545  for (const std::string& host : list) {
546  // skip empty splits
547  if (host.empty()) continue;
548 
549  if (host.find(':') == std::string::npos) {
550  std::cerr << "Thrill: invalid address \"" << host << "\""
551  << "in THRILL_HOSTLIST. It must contain a port number."
552  << std::endl;
553  return -1;
554  }
555 
556  hostlist.push_back(host);
557  }
558 
559  if (my_host_rank >= hostlist.size()) {
560  std::cerr << "Thrill: endpoint list (" << list.size() << " entries) "
561  << "does not include my host_rank (" << my_host_rank << ")"
562  << std::endl;
563  return -1;
564  }
565  }
566  else {
567  std::cerr << "Thrill: environment variable THRILL_HOSTLIST"
568  << " is required for tcp network backend."
569  << std::endl;
570  return -1;
571  }
572 
573  // determine number of local worker threads per process
574 
575  const char* str_workers_per_host;
576  const char* env_workers_per_host;
577 
578  size_t workers_per_host = FindWorkersPerHost(
579  str_workers_per_host, env_workers_per_host);
580 
581  if (workers_per_host == 0)
582  return -1;
583 
584  // detect memory config
585 
587  if (mem_config.setup_detect() < 0) return -1;
588  mem_config.print(workers_per_host);
589 
590  // okay, configuration is good.
591 
592  std::cerr << "Thrill: running in tcp network with " << hostlist.size()
593  << " hosts and " << workers_per_host << " workers per host"
594  << " with " << common::GetHostname()
595  << " as rank " << my_host_rank << " and endpoints";
596  for (const std::string& ep : hostlist)
597  std::cerr << ' ' << ep;
598  std::cerr << std::endl;
599 
600  if (!Initialize()) return -1;
601 
602  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
603 
604  // construct three TCP network groups
605  auto select_dispatcher = std::make_unique<net::tcp::SelectDispatcher>();
606 
607  std::array<std::unique_ptr<net::tcp::Group>, kGroupCount> groups;
609  *select_dispatcher, my_host_rank, hostlist,
610  groups.data(), net::Manager::kGroupCount);
611 
612  std::array<net::GroupPtr, kGroupCount> host_groups = {
613  { std::move(groups[0]), std::move(groups[1]) }
614  };
615 
616  // construct HostContext
617 
618  auto dispatcher = std::make_unique<net::DispatcherThread>(
619  std::move(select_dispatcher), my_host_rank);
620 
621  HostContext host_context(
622  0, mem_config,
623  std::move(dispatcher), std::move(host_groups), workers_per_host);
624 
625  std::vector<std::thread> threads(workers_per_host);
626 
627  for (size_t worker = 0; worker < workers_per_host; worker++) {
628  threads[worker] = common::CreateThread(
629  [&host_context, &job_startpoint, worker] {
630  Context ctx(host_context, worker);
631  common::NameThisThread("worker " + std::to_string(worker));
632 
633  ctx.Launch(job_startpoint);
634  });
635  common::SetCpuAffinity(threads[worker], worker);
636  }
637 
638  // join worker threads
639  for (size_t i = 0; i < workers_per_host; i++) {
640  threads[i].join();
641  }
642 
643  if (!Deinitialize()) return -1;
644 
645  return 0;
646 }
647 #endif
648 
649 #if THRILL_HAVE_NET_MPI
650 static inline
651 int RunBackendMpi(const std::function<void(Context&)>& job_startpoint) {
652 
653  // determine number of local worker threads per MPI process
654 
655  const char* str_workers_per_host;
656  const char* env_workers_per_host;
657 
658  size_t workers_per_host = FindWorkersPerHost(
659  str_workers_per_host, env_workers_per_host);
660 
661  if (workers_per_host == 0)
662  return -1;
663 
664  // reserve one thread for MPI net::Dispatcher which runs a busy-waiting loop
665 
666  if (workers_per_host == 1) {
667  std::cerr << "Thrill: environment variable"
668  << ' ' << str_workers_per_host
669  << '=' << env_workers_per_host
670  << " is not recommended, as one thread is used exclusively"
671  << " for MPI communication."
672  << std::endl;
673  }
674  else {
676  }
677 
678  // detect memory config
679 
681  if (mem_config.setup_detect() < 0) return -1;
682  mem_config.print(workers_per_host);
683 
684  // okay, configuration is good.
685 
686  size_t num_hosts = net::mpi::NumMpiProcesses();
687  size_t mpi_rank = net::mpi::MpiRank();
688  std::string hostname = common::GetHostname();
689 
690  std::cerr << "Thrill: running in MPI network with " << num_hosts
691  << " hosts and " << workers_per_host << "+1 workers per host"
692  << " with " << hostname << " as rank " << mpi_rank << "."
693  << std::endl;
694 
695  if (!Initialize()) return -1;
696 
697  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
698 
699  // construct three MPI network groups
700  auto dispatcher = std::make_unique<net::DispatcherThread>(
701  std::make_unique<net::mpi::Dispatcher>(num_hosts), mpi_rank);
702 
703  std::array<std::unique_ptr<net::mpi::Group>, kGroupCount> groups;
704  net::mpi::Construct(num_hosts, *dispatcher, groups.data(), kGroupCount);
705 
706  std::array<net::GroupPtr, kGroupCount> host_groups = {
707  { std::move(groups[0]), std::move(groups[1]) }
708  };
709 
710  // construct HostContext
711  HostContext host_context(
712  0, mem_config,
713  std::move(dispatcher), std::move(host_groups), workers_per_host);
714 
715  // launch worker threads
716  std::vector<std::thread> threads(workers_per_host);
717 
718  for (size_t worker = 0; worker < workers_per_host; worker++) {
719  threads[worker] = common::CreateThread(
720  [&host_context, &job_startpoint, worker] {
721  Context ctx(host_context, worker);
723  + " worker " + std::to_string(worker));
724 
725  ctx.Launch(job_startpoint);
726  });
727  common::SetCpuAffinity(threads[worker], worker);
728  }
729 
730  // join worker threads
731  for (size_t i = 0; i < workers_per_host; i++) {
732  threads[i].join();
733  }
734 
735  if (!Deinitialize()) return -1;
736 
737  return 0;
738 }
739 #endif
740 
741 #if THRILL_HAVE_NET_IB
742 static inline
743 int RunBackendIb(const std::function<void(Context&)>& job_startpoint) {
744 
745  // determine number of local worker threads per IB/MPI process
746 
747  const char* str_workers_per_host;
748  const char* env_workers_per_host;
749 
750  size_t workers_per_host = FindWorkersPerHost(
751  str_workers_per_host, env_workers_per_host);
752 
753  if (workers_per_host == 0)
754  return -1;
755 
756  // detect memory config
757 
759  if (mem_config.setup_detect() < 0) return -1;
760  mem_config.print(workers_per_host);
761 
762  // okay, configuration is good.
763 
764  size_t num_hosts = net::ib::NumMpiProcesses();
765  size_t mpi_rank = net::ib::MpiRank();
766 
767  std::cerr << "Thrill: running in IB/MPI network with " << num_hosts
768  << " hosts and " << workers_per_host << " workers per host"
769  << " with " << common::GetHostname()
770  << " as rank " << mpi_rank << "."
771  << std::endl;
772 
773  if (!Initialize()) return -1;
774 
775  static constexpr size_t kGroupCount = net::Manager::kGroupCount;
776 
777  // construct two MPI network groups
778  std::array<std::unique_ptr<net::ib::Group>, kGroupCount> groups;
779  net::ib::Construct(num_hosts, groups.data(), kGroupCount);
780 
781  std::array<net::GroupPtr, kGroupCount> host_groups = {
782  { std::move(groups[0]), std::move(groups[1]) }
783  };
784 
785  // construct HostContext
786  HostContext host_context(
787  0, mem_config, std::move(host_groups), workers_per_host);
788 
789  // launch worker threads
790  std::vector<std::thread> threads(workers_per_host);
791 
792  for (size_t worker = 0; worker < workers_per_host; worker++) {
793  threads[worker] = common::CreateThread(
794  [&host_context, &job_startpoint, worker] {
795  Context ctx(host_context, worker);
797  + " worker " + std::to_string(worker));
798 
799  ctx.Launch(job_startpoint);
800  });
801  common::SetCpuAffinity(threads[worker], worker);
802  }
803 
804  // join worker threads
805  for (size_t i = 0; i < workers_per_host; i++) {
806  threads[i].join();
807  }
808 
809  if (!Deinitialize()) return -1;
810 
811  return 0;
812 }
813 #endif
814 
815 int RunNotSupported(const char* env_net) {
816  std::cerr << "Thrill: network backend " << env_net
817  << " is not supported by this binary." << std::endl;
818  return -1;
819 }
820 
821 static inline
822 const char * DetectNetBackend() {
823  // detect openmpi and intel mpi run, add others as well.
824  if (getenv("OMPI_COMM_WORLD_SIZE") != nullptr ||
825  getenv("I_MPI_INFO_NP") != nullptr) {
826 #if THRILL_HAVE_NET_IB
827  return "ib";
828 #elif THRILL_HAVE_NET_MPI
829  return "mpi";
830 #else
831  std::cerr << "Thrill: MPI environment detected, but network backend mpi"
832  << " is not supported by this binary." << std::endl;
833  return nullptr;
834 #endif
835  }
836 #if defined(_MSC_VER)
837  return "mock";
838 #else
839  const char* env_rank = getenv("THRILL_RANK");
840  const char* env_hostlist = getenv("THRILL_HOSTLIST");
841 
842  if (env_rank != nullptr || env_hostlist != nullptr)
843  return "tcp";
844  else
845  return "local";
846 #endif
847 }
848 
850 
851  const char* env_die_with_parent = getenv("THRILL_DIE_WITH_PARENT");
852  if (env_die_with_parent == nullptr || *env_die_with_parent == 0) return 0;
853 
854  char* endptr;
855 
856  long die_with_parent = std::strtol(env_die_with_parent, &endptr, 10);
857  if (endptr == nullptr || *endptr != 0 ||
858  (die_with_parent != 0 && die_with_parent != 1)) {
859  std::cerr << "Thrill: environment variable"
860  << " THRILL_DIE_WITH_PARENT=" << env_die_with_parent
861  << " is not either 0 or 1."
862  << std::endl;
863  return -1;
864  }
865 
866  if (die_with_parent == 0) return 0;
867 
868 #if __linux__
869  if (prctl(PR_SET_PDEATHSIG, SIGTERM) != 0) // NOLINT
870  throw common::ErrnoException("Error calling prctl(PR_SET_PDEATHSIG)");
871  return 1;
872 #else
873  std::cerr << "Thrill: DIE_WITH_PARENT is not supported on this platform.\n"
874  << "Please submit a patch."
875  << std::endl;
876  return 0;
877 #endif
878 }
879 
881 
882  const char* env_unlink_binary = getenv("THRILL_UNLINK_BINARY");
883  if (env_unlink_binary == nullptr || *env_unlink_binary == 0) return 0;
884 
885  if (unlink(env_unlink_binary) != 0) {
887  "Error calling unlink binary \""
888  + std::string(env_unlink_binary) + "\"");
889  }
890 
891  return 0;
892 }
893 
894 /*----------------------------------------------------------------------------*/
895 // Customized FOXXLL Disk Config
896 
897 //! config class to override foxxll's default config
898 class FoxxllConfig : public foxxll::config
899 {
900 public:
901  //! override load_default_config()
902  void load_default_config() override;
903 
904  //! Returns default path of disk.
905  std::string default_disk_path() override;
906 
907  //! returns the name of the default config file prefix
908  std::string default_config_file_name() override;
909 };
910 
911 void FoxxllConfig::load_default_config() {
912  TLX_LOG1 << "foxxll: Using default disk configuration.";
913  foxxll::disk_config entry1(
914  default_disk_path(), 1000 * 1024 * 1024, default_disk_io_impl());
915  entry1.unlink_on_open = true;
916  entry1.autogrow = true;
917  add_disk(entry1);
918 }
919 
920 std::string FoxxllConfig::default_disk_path() {
921 #if !FOXXLL_WINDOWS
922  int pid = getpid();
923  return "/var/tmp/thrill." + common::to_str(pid) + ".tmp";
924 #else
925  DWORD pid = GetCurrentProcessId();
926  char* tmpstr = new char[255];
927  if (GetTempPathA(255, tmpstr) == 0)
928  FOXXLL_THROW_WIN_LASTERROR(resource_error, "GetTempPathA()");
929  std::string result = tmpstr;
930  result += "thrill." + common::to_str(pid) + ".tmp";
931  delete[] tmpstr;
932  return result;
933 #endif
934 }
935 
936 std::string FoxxllConfig::default_config_file_name() {
937  return ".thrill";
938 }
939 
941  // install derived config instance in foxxll's singleton
942  foxxll::config::create_instance<FoxxllConfig>();
943 }
944 
945 /*----------------------------------------------------------------------------*/
946 
947 int Run(const std::function<void(Context&)>& job_startpoint) {
948 
949  common::NameThisThread("main");
950 
951  if (RunCheckDieWithParent() < 0)
952  return -1;
953 
954  if (RunCheckUnlinkBinary() < 0)
955  return -1;
956 
957  RunSetupFoxxll();
958 
959  // parse environment: THRILL_NET
960  const char* env_net = getenv("THRILL_NET");
961 
962  // if no backend configured: automatically select one.
963  if (env_net == nullptr || *env_net == 0) {
964  env_net = DetectNetBackend();
965  if (env_net == nullptr) return -1;
966  }
967 
968  // run with selected backend
969  if (strcmp(env_net, "mock") == 0) {
970  // mock network backend
971  return RunBackendLoopback<net::mock::Group>("mock", job_startpoint);
972  }
973 
974  if (strcmp(env_net, "local") == 0) {
975 #if THRILL_HAVE_NET_TCP
976  // tcp loopback network backend
977  return RunBackendLoopback<net::tcp::Group>("tcp", job_startpoint);
978 #else
979  return RunNotSupported(env_net);
980 #endif
981  }
982 
983  if (strcmp(env_net, "tcp") == 0) {
984 #if THRILL_HAVE_NET_TCP
985  // real tcp network backend
986  return RunBackendTcp(job_startpoint);
987 #else
988  return RunNotSupported(env_net);
989 #endif
990  }
991 
992  if (strcmp(env_net, "mpi") == 0) {
993 #if THRILL_HAVE_NET_MPI
994  // mpi network backend
995  return RunBackendMpi(job_startpoint);
996 #else
997  return RunNotSupported(env_net);
998 #endif
999  }
1000 
1001  if (strcmp(env_net, "ib") == 0) {
1002 #if THRILL_HAVE_NET_IB
1003  // ib/mpi network backend
1004  return RunBackendIb(job_startpoint);
1005 #else
1006  return RunNotSupported(env_net);
1007 #endif
1008  }
1009 
1010  std::cerr << "Thrill: network backend " << env_net << " is unknown."
1011  << std::endl;
1012  return -1;
1013 }
1014 
1015 /******************************************************************************/
1016 // MemoryConfig
1017 
1018 void MemoryConfig::setup(size_t ram) {
1019  ram_ = ram;
1020  apply();
1021 }
1022 
1024 
1025  // determine amount of physical RAM or take user's limit
1026 
1027  const char* env_ram = getenv("THRILL_RAM");
1028 
1029  if (env_ram != nullptr && *env_ram != 0) {
1030  uint64_t ram64;
1031  if (!tlx::parse_si_iec_units(env_ram, &ram64)) {
1032  std::cerr << "Thrill: environment variable"
1033  << " THRILL_RAM=" << env_ram
1034  << " is not a valid amount of RAM memory."
1035  << std::endl;
1036  return -1;
1037  }
1038  ram_ = static_cast<size_t>(ram64);
1039  }
1040  else {
1041  // detect amount of physical memory on system
1042 #if defined(_MSC_VER)
1043  MEMORYSTATUSEX memstx;
1044  memstx.dwLength = sizeof(memstx);
1045  GlobalMemoryStatusEx(&memstx);
1046 
1047  ram_ = memstx.ullTotalPhys;
1048 #elif __APPLE__
1049  int mib[2];
1050  int64_t physical_memory;
1051  size_t length;
1052 
1053  // Get the physical memory size
1054  mib[0] = CTL_HW;
1055  mib[1] = HW_MEMSIZE;
1056  length = sizeof(physical_memory);
1057  sysctl(mib, 2, &physical_memory, &length, nullptr, 0);
1058  ram_ = static_cast<size_t>(physical_memory);
1059 #else
1060  ram_ = sysconf(_SC_PHYS_PAGES) * static_cast<size_t>(sysconf(_SC_PAGESIZE));
1061 #endif
1062 
1063 #if __linux__
1064  // use getrlimit() to check user limit on address space
1065  struct rlimit rl; // NOLINT
1066  if (getrlimit(RLIMIT_AS, &rl) == 0) {
1067  if (rl.rlim_cur != 0 && rl.rlim_cur * 3 / 4 < ram_) {
1068  ram_ = rl.rlim_cur * 3 / 4;
1069  }
1070  }
1071  else {
1072  sLOG1 << "getrlimit(): " << strerror(errno);
1073  }
1074 #endif
1075  }
1076 
1077  apply();
1078 
1079  return 0;
1080 }
1081 
1083  // divide up ram_
1084 
1085  ram_workers_ = ram_ / 3;
1086  ram_block_pool_hard_ = ram_ / 3;
1087  ram_block_pool_soft_ = ram_block_pool_hard_ * 9 / 10;
1088  ram_floating_ = ram_ - ram_block_pool_hard_ - ram_workers_;
1089 
1090  // set memory limit, only BlockPool is excluded from malloc tracking, as
1091  // only it uses bypassing allocators.
1092  mem::set_memory_limit_indication(ram_floating_ + ram_workers_);
1093 }
1094 
1095 MemoryConfig MemoryConfig::divide(size_t hosts) const {
1096 
1097  MemoryConfig mc = *this;
1098  mc.ram_ /= hosts;
1099  mc.ram_block_pool_hard_ /= hosts;
1100  mc.ram_block_pool_soft_ /= hosts;
1101  mc.ram_workers_ /= hosts;
1102  // free floating memory is not divided by host, as it is measured overall
1103 
1104  return mc;
1105 }
1106 
1107 void MemoryConfig::print(size_t workers_per_host) const {
1108  if (!verbose_) return;
1109 
1110  std::cerr
1111  << "Thrill: using "
1112  << tlx::format_iec_units(ram_) << "B RAM total,"
1113  << " BlockPool=" << tlx::format_iec_units(ram_block_pool_hard_) << "B,"
1114  << " workers="
1115  << tlx::format_iec_units(ram_workers_ / workers_per_host) << "B,"
1116  << " floating=" << tlx::format_iec_units(ram_floating_) << "B."
1117  << std::endl;
1118 }
1119 
1120 /******************************************************************************/
1121 // HostContext methods
1122 
1124  size_t local_host_id,
1125  const MemoryConfig& mem_config,
1126  std::unique_ptr<net::DispatcherThread> dispatcher,
1127  std::array<net::GroupPtr, net::Manager::kGroupCount>&& groups,
1128  size_t workers_per_host)
1129  : mem_config_(mem_config),
1130  base_logger_(MakeHostLogPath(groups[0]->my_host_rank())),
1131  logger_(&base_logger_, "host_rank", groups[0]->my_host_rank()),
1132  profiler_(std::make_unique<common::ProfileThread>()),
1133  local_host_id_(local_host_id),
1134  workers_per_host_(workers_per_host),
1135  dispatcher_(std::move(dispatcher)),
1136  net_manager_(std::move(groups), logger_) {
1137 
1138  // write command line parameters to json log
1140 
1143 
1144  // run memory profiler only on local host 0 (especially for test runs)
1145  if (local_host_id == 0)
1147 }
1148 
1150  // stop dispatcher _before_ stopping multiplexer
1151  dispatcher_->Terminate();
1152 }
1153 
1155  const char* env_log = getenv("THRILL_LOG");
1156  if (env_log == nullptr) {
1157  if (host_rank == 0 && mem_config().verbose_) {
1158  std::cerr << "Thrill: no THRILL_LOG was found, "
1159  << "so no json log is written."
1160  << std::endl;
1161  }
1162  return std::string();
1163  }
1164 
1165  std::string output = env_log;
1166  if (output == "" || output == "-")
1167  return std::string();
1168  if (output == "/dev/stdout")
1169  return output;
1170  if (output == "stdout")
1171  return "/dev/stdout";
1172 
1173  return output + "-host-" + std::to_string(host_rank) + ".json";
1174 }
1175 
1176 /******************************************************************************/
1177 // Context methods
1178 
1179 Context::Context(HostContext& host_context, size_t local_worker_id)
1180  : local_host_id_(host_context.local_host_id()),
1181  local_worker_id_(local_worker_id),
1182  workers_per_host_(host_context.workers_per_host()),
1183  mem_limit_(host_context.worker_mem_limit()),
1184  mem_config_(host_context.mem_config()),
1185  mem_manager_(host_context.mem_manager()),
1186  net_manager_(host_context.net_manager()),
1187  flow_manager_(host_context.flow_manager()),
1188  block_pool_(host_context.block_pool()),
1189  multiplexer_(host_context.data_multiplexer()),
1190  rng_(std::random_device { }
1191  () + (local_worker_id_ << 16)),
1192  base_logger_(&host_context.base_logger_) {
1193  assert(local_worker_id < workers_per_host());
1194 }
1195 
1197  return GetFile(dia != nullptr ? dia->dia_id() : 0);
1198 }
1199 
1201  return tlx::make_counting<data::File>(
1202  block_pool_, local_worker_id_, dia_id);
1203 }
1204 
1206  return GetFilePtr(dia != nullptr ? dia->dia_id() : 0);
1207 }
1208 
1211 }
1212 
1214  return GetNewCatStream(dia != nullptr ? dia->dia_id() : 0);
1215 }
1216 
1219 }
1220 
1222  return GetNewMixStream(dia != nullptr ? dia->dia_id() : 0);
1223 }
1224 
1225 template <>
1226 data::CatStreamPtr Context::GetNewStream<data::CatStream>(size_t dia_id) {
1227  return GetNewCatStream(dia_id);
1228 }
1229 
1230 template <>
1231 data::MixStreamPtr Context::GetNewStream<data::MixStream>(size_t dia_id) {
1232  return GetNewMixStream(dia_id);
1233 }
1234 
1235 struct OverallStats {
1236 
1237  //! overall run time
1238  double runtime;
1239 
1240  //! maximum ByteBlock allocation on all workers
1241  size_t max_block_bytes;
1242 
1243  //! network traffic performed by net layer
1244  size_t net_traffic_tx, net_traffic_rx;
1245 
1246  //! I/O volume performed by io layer
1247  size_t io_volume;
1248 
1249  //! maximum external memory allocation
1250  size_t io_max_allocation;
1251 
1252  friend std::ostream& operator << (std::ostream& os, const OverallStats& c) {
1253  return os << "[OverallStats"
1254  << " runtime=" << c.runtime
1255  << " max_block_bytes=" << c.max_block_bytes
1256  << " net_traffic_tx=" << c.net_traffic_tx
1257  << " net_traffic_rx=" << c.net_traffic_rx
1258  << " io_volume=" << c.io_volume
1259  << " io_max_allocation=" << c.io_max_allocation
1260  << "]";
1261  }
1262 
1263  OverallStats operator + (const OverallStats& b) const {
1264  OverallStats r;
1265  r.runtime = std::max(runtime, b.runtime);
1266  r.max_block_bytes = max_block_bytes + b.max_block_bytes;
1267  r.net_traffic_tx = net_traffic_tx + b.net_traffic_tx;
1268  r.net_traffic_rx = net_traffic_rx + b.net_traffic_rx;
1269  r.io_volume = io_volume + b.io_volume;
1270  r.io_max_allocation = std::max(io_max_allocation, b.io_max_allocation);
1271  return r;
1272  }
1273 };
1274 
1275 void Context::Launch(const std::function<void(Context&)>& job_startpoint) {
1276  logger_ << "class" << "Context"
1277  << "event" << "job-start";
1278 
1279  common::StatsTimerStart overall_timer;
1280 
1281  try {
1282  job_startpoint(*this);
1283  }
1284  catch (std::exception& e) {
1285  LOG1 << "worker " << my_rank() << " threw " << typeid(e).name();
1286  LOG1 << " what(): " << e.what();
1287 
1288  logger_ << "class" << "Context"
1289  << "event" << "job-exception"
1290  << "exception" << typeid(e).name()
1291  << "what" << e.what();
1292  throw;
1293  }
1294 
1295  logger_ << "class" << "Context"
1296  << "event" << "job-done"
1297  << "elapsed" << overall_timer;
1298 
1299  overall_timer.Stop();
1300 
1301  // collect overall statistics
1302  OverallStats stats;
1303  stats.runtime = overall_timer.SecondsDouble();
1304 
1305  stats.max_block_bytes =
1307 
1308  stats.net_traffic_tx = local_worker_id_ == 0 ? net_manager_.Traffic().tx : 0;
1309  stats.net_traffic_rx = local_worker_id_ == 0 ? net_manager_.Traffic().rx : 0;
1310 
1311  if (local_host_id_ == 0 && local_worker_id_ == 0) {
1313  stats.io_volume = io_stats.get_read_bytes() + io_stats.get_write_bytes();
1314  stats.io_max_allocation =
1316  }
1317  else {
1318  stats.io_volume = 0;
1319  stats.io_max_allocation = 0;
1320  }
1321 
1322  LOG0 << stats;
1323 
1324  stats = net.Reduce(stats);
1325 
1326  if (my_rank() == 0) {
1327  using tlx::format_iec_units;
1328 
1329  if (stats.net_traffic_rx != stats.net_traffic_tx)
1330  LOG1 << "Manager::Traffic() tx/rx asymmetry = "
1331  << tlx::abs_diff(stats.net_traffic_tx, stats.net_traffic_rx);
1332 
1333  if (mem_config().verbose_) {
1334  std::cerr
1335  << "Thrill:"
1336  << " ran " << stats.runtime << "s with max "
1337  << format_iec_units(stats.max_block_bytes) << "B in DIA Blocks, "
1338  << format_iec_units(stats.net_traffic_tx) << "B network traffic, "
1339  << format_iec_units(stats.io_volume) << "B disk I/O, and "
1340  << format_iec_units(stats.io_max_allocation) << "B max disk use."
1341  << std::endl;
1342  }
1343 
1344  logger_ << "class" << "Context"
1345  << "event" << "summary"
1346  << "runtime" << stats.runtime
1347  << "net_traffic" << stats.net_traffic_tx
1348  << "io_volume" << stats.io_volume
1349  << "io_max_allocation" << stats.io_max_allocation;
1350  }
1351 }
1352 
1353 } // namespace api
1354 } // namespace thrill
1355 
1356 /******************************************************************************/
std::thread CreateThread(Args &&... args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:44
CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id)
Request next stream.
size_t ram_
total amount of physical ram detected or THRILL_RAM
Definition: context.hpp:63
size_t local_host_id_
id among all local hosts (in test program runs)
Definition: context.hpp:395
net::FlowControlChannel & net
Definition: context.hpp:446
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
size_t workers_per_host() const
Returns the number of workers that is hosted on each host.
Definition: context.hpp:238
size_t max_total_bytes() noexcept
Maximum total number of bytes allocated in blocks of this block pool.
Definition: block_pool.cpp:896
The central object of a mock network: the Group containing links to other mock Group forming the netw...
Definition: group.hpp:116
void LogCmdlineParams(JsonLogger &logger)
Definition: porting.cpp:68
size_t worker_mem_limit() const
memory limit of each worker Context for local data structures
Definition: context.hpp:118
HostContext(size_t local_host_id, const MemoryConfig &mem_config, std::unique_ptr< net::DispatcherThread > dispatcher, std::array< net::GroupPtr, net::Manager::kGroupCount > &&groups, size_t workers_per_host)
constructor from existing net Groups. Used by the construction methods.
Definition: context.cpp:1123
void StartMemProfiler(common::ProfileThread &sched, common::JsonLogger &logger)
launch profiler task
void Initialize()
Initialize VFS layer.
Definition: file_io.cpp:35
net::Manager & net_manager_
net::Manager instance that is shared among workers
Definition: context.hpp:413
std::string MakeHostLogPath(size_t host_rank)
create host log
Definition: context.cpp:1154
#define LOG1
Definition: logger.hpp:28
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
Collection of NetConnections to workers, allows point-to-point client communication and simple collec...
Definition: group.hpp:43
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:947
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
const MemoryConfig & mem_config() const
host-global memory config
Definition: context.hpp:329
net::Manager & net_manager()
net manager constructs communication groups to other hosts.
Definition: context.hpp:129
An Exception which is thrown on system errors and contains errno information.
external_size_type get_read_bytes() const
Definition: iostats.cpp:527
STL namespace.
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:139
void set_memory_limit_indication(ssize_t size)
common::JsonLogger logger_
Definition: context.hpp:158
bool unlink_on_open
unlink file immediately after opening (available on most Unix)
Definition: config.hpp:103
data::Multiplexer & data_multiplexer()
data multiplexer transmits large amounts of data asynchronously.
Definition: context.hpp:142
MemoryConfig divide(size_t hosts) const
Definition: context.cpp:1095
int RunCheckDieWithParent()
Check environment variable THRILL_DIE_WITH_PARENT and enable process flag: this is useful for ssh/inv...
Definition: context.cpp:849
friend std::ostream & operator<<(std::ostream &os, const Context &ctx)
Outputs the context as [host id]:[local worker id] to an std::ostream.
Definition: context.hpp:272
int setup_detect()
detect memory configuration from environment
Definition: context.cpp:1023
#define sLOG1
Definition: logger.hpp:38
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
void NameThisThread(const std::string &name)
Defines a name for the current thread, only if no name was set previously.
Definition: logger.cpp:40
The DIABase is the untyped super class of DIANode.
Definition: dia_base.hpp:87
static by_string to_string(int val)
convert to string
void print(size_t workers_per_host) const
Definition: context.cpp:1107
uint64_t maximum_allocation() const
return maximum number of bytes allocated during program run.
static bool Deinitialize()
Definition: context.cpp:283
static std::vector< std::unique_ptr< HostContext > > ConstructLoopbackHostContexts(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host)
Generic network constructor for net backends supporting loopback tests.
Definition: context.cpp:95
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
Definition: split.cpp:20
int RunCheckUnlinkBinary()
Check environment variable THRILL_UNLINK_BINARY and unlink given program path: this is useful for ssh...
Definition: context.cpp:880
data::BlockPool & block_pool_
data block pool
Definition: context.hpp:419
static void RunLoopbackThreads(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, size_t core_offset, const std::function< void(Context &)> &job_startpoint)
Generic runner for backends supporting loopback tests.
Definition: context.cpp:136
mem::Manager & mem_manager()
host-global memory manager
Definition: context.hpp:126
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1209
size_t start_block_size
starting size of blocks in BlockWriter.
Definition: byte_block.cpp:24
int setenv(const char *name, const char *value, int overwrite)
setenv - change or add an environment variable Windows porting madness because setenv() is apparently...
Definition: setenv.cpp:31
~HostContext()
destructor
Definition: context.cpp:1149
std::string GetHostname()
get hostname
Definition: porting.cpp:143
common::JsonLogger base_logger_
base logger exclusive for this host context
Definition: context.hpp:153
external_size_type get_write_bytes() const
Definition: iostats.cpp:543
bool parse_si_iec_units(const char *str, uint64_t *out_size, char default_unit)
Parse a string like "343KB" or "44 GiB" into the corresponding size in bytes.
size_t workers_per_host() const
number of workers per host (all have the same).
Definition: context.hpp:115
void RunLocalSameThread(const std::function< void(Context &)> &job_startpoint)
Runs the given job_startpoint within the same thread with a test network –> run test with one host a...
Definition: context.cpp:376
size_t local_worker_id_
number of this host context, 0..p-1, within this host
Definition: context.hpp:398
size_t ram_block_pool_hard_
amount of RAM dedicated to data::BlockPool – hard limit
Definition: context.hpp:66
unique_ptr< T > make_unique(Manager &manager, Args &&... args)
make_unique with Manager tracking
Definition: allocator.hpp:212
void RunLocalMock(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, const std::function< void(Context &)> &job_startpoint)
Function to run a number of mock hosts as locally independent threads, which communicate via internal...
Definition: context.cpp:299
void StartLinuxProcStatsProfiler(ProfileThread &, JsonLogger &)
launch profiler task
T abs_diff(const T &a, const T &b)
absolute difference, which also works for unsigned types
Definition: abs_diff.hpp:24
static bool SetupBlockSize()
Definition: context.cpp:179
void Deinitialize()
Deinitialize VFS layer.
Definition: file_io.cpp:40
common::JsonLogger logger_
Definition: context.hpp:462
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Reduce(const T &value, size_t root=0, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers to the given worker, provided a certain red...
int RunNotSupported(const char *env_net)
Definition: context.cpp:815
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static instance_pointer get_instance()
return instance or create base instance if empty
Definition: singleton.hpp:41
size_t local_worker_id() const
Definition: context.hpp:263
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a test network with an underlying full mesh of local loopback stream sockets for testing...
Definition: group.cpp:36
data::FilePtr GetFilePtr(size_t dia_id)
Definition: context.cpp:1200
MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id)
Request next stream.
void RunLocalTests(const std::function< void(Context &)> &job_startpoint)
Helper Function to execute RunLocalMock() tests using mock networks in test suite for many different ...
Definition: context.cpp:319
size_t MpiRank()
Return the rank of this process in the MPI COMM WORLD.
Definition: group.cpp:714
net::Traffic Traffic() const
calculate overall traffic for final stats
Definition: group.cpp:67
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
void Construct(SelectDispatcher &dispatcher, size_t my_rank, const std::vector< std::string > &endpoints, std::unique_ptr< Group > *groups, size_t group_count)
Definition: construct.cpp:543
static size_t FindWorkersPerHost(const char *&str_workers_per_host, const char *&env_workers_per_host)
Definition: context.cpp:204
Context(HostContext &host_context, size_t local_worker_id)
Definition: context.cpp:1179
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
data::MixStreamPtr GetNewMixStream(size_t dia_id)
Definition: context.cpp:1217
static std::string to_str(const Type &t)
Definition: string.hpp:36
const size_t & dia_id() const
return unique id of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
net::FlowControlChannelManager & flow_manager()
the flow control group is used for collective communication.
Definition: context.hpp:136
void Launch(const std::function< void(Context &)> &job_startpoint)
method used to launch a job&#39;s main procedure. it wraps it in log output.
Definition: context.cpp:1275
data::Multiplexer & multiplexer_
data::Multiplexer instance that is shared among workers
Definition: context.hpp:422
size_t ram_block_pool_soft_
amount of RAM dedicated to data::BlockPool – soft limit
Definition: context.hpp:69
size_t host_rank() const
Definition: context.hpp:257
void RunSetupFoxxll()
Definition: context.cpp:940
static constexpr size_t kGroupCount
The count of net::Groups to initialize.
Definition: manager.hpp:61
net::FlowControlChannelManager flow_manager_
the flow control group is used for collective communication.
Definition: context.hpp:189
mem::Manager mem_manager_
host-global memory manager for internal memory only
Definition: context.hpp:173
The HostContext contains all data structures shared among workers on the same host.
Definition: context.hpp:90
void setup(size_t ram)
setup memory size
Definition: context.cpp:1018
static int RunBackendLoopback(const char *backend, const std::function< void(Context &)> &job_startpoint)
Run() implementation which uses a loopback net backend ("mock" or "tcp").
Definition: context.cpp:419
size_t local_host_id() const
Returns local_host_id_.
Definition: context.hpp:112
#define TLX_LOG1
Definition: core.hpp:145
static std::vector< std::unique_ptr< HostContext > > ConstructLoopback(size_t num_hosts, size_t workers_per_host)
Construct a number of mock hosts running in this process.
Definition: context.cpp:308
MemoryConfig & mem_config()
host-global memory config
Definition: context.hpp:123
size_t rx
received bytes
Definition: manager.hpp:37
size_t NumMpiProcesses()
Return the number of MPI processes.
Definition: group.cpp:701
static bool Initialize()
Definition: context.cpp:274
MemoryConfig mem_config_
memory configuration
Definition: context.hpp:146
data::BlockPool block_pool_
data block pool
Definition: context.hpp:194
bool autogrow
autogrow file if more disk space is needed, automatically set if size == 0.
Definition: config.hpp:78
std::unique_ptr< common::ProfileThread > profiler_
thread for scheduling profiling methods for statistical output
Definition: context.hpp:161
bool enable_proc_profiler_
enable Linux /proc stats profiler (default: on)
Definition: context.hpp:82
std::unique_ptr< net::DispatcherThread > dispatcher_
main host network dispatcher thread backend
Definition: context.hpp:176
common::JsonLogger base_logger_
base logger exclusive for this worker
Definition: context.hpp:457
size_t host_rank() const
Definition: context.hpp:133
void SetCpuAffinity(std::thread &thread, size_t cpu_id)
set cpu/core affinity of a thread
Definition: porting.cpp:111
net::Manager net_manager_
net manager constructs communication groups to other hosts.
Definition: context.hpp:179
std::string format_iec_units(uint64_t number, int precision=3)
Format number as something like 1 TiB.
size_t local_host_id_
id among all local hosts (in test program runs)
Definition: context.hpp:167
size_t workers_per_host_
number of workers per host (all have the same).
Definition: context.hpp:170
static const char * DetectNetBackend()
Definition: context.cpp:822
bool verbose_
StageBuilder verbosity flag.
Definition: context.hpp:79
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:324
bool Construct(size_t group_size, DispatcherThread &dispatcher, std::unique_ptr< Group > *groups, size_t group_count)
Construct Group which connects to peers using MPI.
Definition: group.cpp:675
size_t tx
transmitted bytes
Definition: manager.hpp:35