35 #if THRILL_HAVE_NET_TCP 40 #if THRILL_HAVE_NET_MPI 45 #if THRILL_HAVE_NET_IB 46 #include <thrill/net/ib/group.hpp> 52 #include <sys/prctl.h> 55 #include <sys/resource.h> 63 #include <sys/sysctl.h> 64 #include <sys/types.h> 92 template <
typename NetGroup>
94 std::vector<std::unique_ptr<HostContext> >
97 size_t num_hosts,
size_t workers_per_host) {
102 std::array<std::vector<std::unique_ptr<NetGroup> >, kGroupCount> group;
104 for (
size_t g = 0; g < kGroupCount; ++g) {
105 group[g] = NetGroup::ConstructLoopbackMesh(num_hosts);
108 std::vector<std::unique_ptr<net::DispatcherThread> > dispatcher;
109 for (
size_t h = 0; h < num_hosts; ++h) {
110 dispatcher.emplace_back(
111 std::make_unique<net::DispatcherThread>(
112 std::make_unique<typename NetGroup::Dispatcher>(), h));
117 std::vector<std::unique_ptr<HostContext> > host_context;
119 for (
size_t h = 0; h < num_hosts; h++) {
120 std::array<net::GroupPtr, kGroupCount> host_group = {
121 { std::move(group[0][h]), std::move(group[1][h]) }
124 host_context.emplace_back(
125 std::make_unique<HostContext>(
126 h, mem_config, std::move(dispatcher[h]),
127 std::move(host_group), workers_per_host));
134 template <
typename NetGroup>
138 size_t num_hosts,
size_t workers_per_host,
size_t core_offset,
139 const std::function<
void(
Context&)>& job_startpoint) {
142 mem_config.
print(workers_per_host);
145 typename NetGroup::Dispatcher dispatcher;
147 std::vector<std::unique_ptr<HostContext> > host_contexts =
148 ConstructLoopbackHostContexts<NetGroup>(
149 host_mem_config, num_hosts, workers_per_host);
152 std::vector<std::thread> threads(num_hosts * workers_per_host);
154 for (
size_t host = 0; host < num_hosts; ++host) {
156 for (
size_t worker = 0; worker < workers_per_host; ++worker) {
157 size_t id = host * workers_per_host + worker;
159 [&host_contexts, &job_startpoint, host, worker, log_prefix] {
160 Context ctx(*host_contexts[host], worker);
164 ctx.
Launch(job_startpoint);
171 for (
size_t i = 0; i < num_hosts * workers_per_host; i++) {
181 const char* env_block_size = getenv(
"THRILL_BLOCK_SIZE");
182 if (env_block_size ==
nullptr || *env_block_size == 0)
return true;
188 std::cerr <<
"Thrill: environment variable" 189 <<
" THRILL_BLOCK_SIZE=" << env_block_size
190 <<
" is not a valid number." 197 std::cerr <<
"Thrill: setting default_block_size = " 205 const char*& str_workers_per_host,
const char*& env_workers_per_host) {
211 str_workers_per_host =
"THRILL_WORKERS_PER_HOST";
212 env_workers_per_host = getenv(str_workers_per_host);
214 if (env_workers_per_host && *env_workers_per_host) {
215 size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
216 if (!endptr || *endptr != 0 || result == 0) {
217 std::cerr <<
"Thrill: environment variable" 218 <<
' ' << str_workers_per_host
219 <<
'=' << env_workers_per_host
220 <<
" is not a valid number of workers per host." 231 str_workers_per_host =
"OMP_NUM_THREADS";
232 env_workers_per_host = getenv(str_workers_per_host);
234 if (env_workers_per_host && *env_workers_per_host) {
235 size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
236 if (!endptr || *endptr != 0 || result == 0) {
237 std::cerr <<
"Thrill: environment variable" 238 <<
' ' << str_workers_per_host
239 <<
'=' << env_workers_per_host
240 <<
" is not a valid number of workers per host." 251 str_workers_per_host =
"SLURM_CPUS_ON_NODE";
252 env_workers_per_host = getenv(str_workers_per_host);
254 if (env_workers_per_host && *env_workers_per_host) {
255 size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
256 if (!endptr || *endptr != 0 || result == 0) {
257 std::cerr <<
"Thrill: environment variable" 258 <<
' ' << str_workers_per_host
259 <<
'=' << env_workers_per_host
260 <<
" is not a valid number of workers per host." 271 return std::thread::hardware_concurrency();
293 #if defined(_MSC_VER) 300 size_t num_hosts,
size_t workers_per_host,
301 const std::function<
void(
Context&)>& job_startpoint) {
303 return RunLoopbackThreads<TestGroup>(
304 mem_config, num_hosts, workers_per_host, 0, job_startpoint);
307 std::vector<std::unique_ptr<HostContext> >
312 mem_config.
setup(4 * 1024 * 1024 * 1024llu);
315 return ConstructLoopbackHostContexts<TestGroup>(
325 size_t ram,
const std::function<
void(
Context&)>& job_startpoint) {
334 mem_config.
setup(ram);
336 static constexpr
size_t num_hosts_list[] = { 1, 2, 5, 8 };
337 static constexpr
size_t num_workers_list[] = { 1, 3 };
339 size_t max_mock_workers = 1000000;
341 const char* env_max_mock_workers = getenv(
"THRILL_MAX_MOCK_WORKERS");
342 if (env_max_mock_workers && *env_max_mock_workers) {
345 max_mock_workers = std::strtoul(env_max_mock_workers, &endptr, 10);
347 if (!endptr || *endptr != 0 || max_mock_workers == 0) {
348 std::cerr <<
"Thrill: environment variable" 349 <<
" THRILL_MAX_MOCK_WORKERS=" << env_max_mock_workers
350 <<
" is not a valid maximum number of mock hosts." 356 for (
const size_t& num_hosts : num_hosts_list) {
357 for (
const size_t& workers_per_host : num_workers_list) {
358 if (num_hosts * workers_per_host > max_mock_workers) {
359 std::cerr <<
"Thrill: skipping test with " 360 << num_hosts * workers_per_host
361 <<
" workers > max workers " << max_mock_workers
366 LOG0 <<
"Thrill: running local test with " 367 << num_hosts <<
" hosts and " << workers_per_host
368 <<
" workers per host";
378 size_t my_host_rank = 0;
379 size_t workers_per_host = 1;
380 size_t num_hosts = 1;
386 mem_config.
setup(4 * 1024 * 1024 * 1024llu);
387 mem_config.
print(workers_per_host);
390 std::array<std::vector<std::unique_ptr<TestGroup> >, kGroupCount> group;
392 for (
size_t g = 0; g < kGroupCount; ++g) {
396 std::array<net::GroupPtr, kGroupCount> host_group = {
397 { std::move(group[0][0]), std::move(group[1][0]) }
400 auto dispatcher = std::make_unique<net::DispatcherThread>(
401 std::make_unique<TestGroup::Dispatcher>(), my_host_rank);
405 std::move(dispatcher), std::move(host_group), workers_per_host);
417 template <
typename NetGroup>
420 const char* backend,
const std::function<
void(
Context&)>& job_startpoint) {
426 size_t num_hosts = 2;
428 const char* env_local = getenv(
"THRILL_LOCAL");
429 if (env_local && *env_local) {
431 num_hosts = std::strtoul(env_local, &endptr, 10);
433 if (!endptr || *endptr != 0 || num_hosts == 0) {
434 std::cerr <<
"Thrill: environment variable" 435 <<
" THRILL_LOCAL=" << env_local
436 <<
" is not a valid number of local loopback hosts." 444 const char* str_workers_per_host;
445 const char* env_workers_per_host;
448 str_workers_per_host, env_workers_per_host);
450 if (workers_per_host == 0)
455 const char* env_core_offset = getenv(
"THRILL_CORE_OFFSET");
456 size_t core_offset = 0;
457 if (env_core_offset && *env_core_offset) {
458 core_offset = std::strtoul(env_core_offset, &endptr, 10);
461 if (!endptr || *endptr != 0 ||
462 last_core > std::thread::hardware_concurrency())
464 std::cerr <<
"Thrill: environment variable" 465 <<
" THRILL_CORE_OFFSET=" << env_core_offset
466 <<
" is not a valid number of cores to skip for pinning." 476 mem_config.
print(workers_per_host);
480 std::cerr <<
"Thrill: running locally with " << num_hosts
481 <<
" test hosts and " << workers_per_host <<
" workers per host" 482 <<
" in a local " << backend <<
" network." << std::endl;
486 RunLoopbackThreads<NetGroup>(
494 #if THRILL_HAVE_NET_TCP 496 int RunBackendTcp(
const std::function<
void(
Context&)>& job_startpoint) {
502 const char* str_rank =
"THRILL_RANK";
503 const char* env_rank = getenv(str_rank);
505 if (env_rank ==
nullptr) {
507 str_rank =
"SLURM_PROCID";
508 env_rank = getenv(str_rank);
511 const char* env_hostlist = getenv(
"THRILL_HOSTLIST");
515 size_t my_host_rank = 0;
517 if (env_rank !=
nullptr && *env_rank != 0) {
518 my_host_rank = std::strtoul(env_rank, &endptr, 10);
520 if (endptr ==
nullptr || *endptr != 0) {
521 std::cerr <<
"Thrill: environment variable " 522 << str_rank <<
'=' << env_rank
523 <<
" is not a valid number." 529 std::cerr <<
"Thrill: environment variable THRILL_RANK" 530 <<
" is required for tcp network backend." 535 std::vector<std::string> hostlist;
537 if (env_hostlist !=
nullptr && *env_hostlist != 0) {
539 std::vector<std::string> list =
tlx::split(
' ', env_hostlist);
541 if (list.size() == 1) {
547 if (host.empty())
continue;
549 if (host.find(
':') == std::string::npos) {
550 std::cerr <<
"Thrill: invalid address \"" << host <<
"\"" 551 <<
"in THRILL_HOSTLIST. It must contain a port number." 556 hostlist.push_back(host);
559 if (my_host_rank >= hostlist.size()) {
560 std::cerr <<
"Thrill: endpoint list (" << list.size() <<
" entries) " 561 <<
"does not include my host_rank (" << my_host_rank <<
")" 567 std::cerr <<
"Thrill: environment variable THRILL_HOSTLIST" 568 <<
" is required for tcp network backend." 575 const char* str_workers_per_host;
576 const char* env_workers_per_host;
579 str_workers_per_host, env_workers_per_host);
581 if (workers_per_host == 0)
588 mem_config.
print(workers_per_host);
592 std::cerr <<
"Thrill: running in tcp network with " << hostlist.size()
593 <<
" hosts and " << workers_per_host <<
" workers per host" 595 <<
" as rank " << my_host_rank <<
" and endpoints";
597 std::cerr <<
' ' << ep;
598 std::cerr << std::endl;
605 auto select_dispatcher = std::make_unique<net::tcp::SelectDispatcher>();
607 std::array<std::unique_ptr<net::tcp::Group>, kGroupCount> groups;
609 *select_dispatcher, my_host_rank, hostlist,
612 std::array<net::GroupPtr, kGroupCount> host_groups = {
613 { std::move(groups[0]), std::move(groups[1]) }
618 auto dispatcher = std::make_unique<net::DispatcherThread>(
619 std::move(select_dispatcher), my_host_rank);
623 std::move(dispatcher), std::move(host_groups), workers_per_host);
625 std::vector<std::thread> threads(workers_per_host);
629 [&host_context, &job_startpoint, worker] {
630 Context ctx(host_context, worker);
633 ctx.
Launch(job_startpoint);
649 #if THRILL_HAVE_NET_MPI 651 int RunBackendMpi(
const std::function<
void(
Context&)>& job_startpoint) {
655 const char* str_workers_per_host;
656 const char* env_workers_per_host;
659 str_workers_per_host, env_workers_per_host);
661 if (workers_per_host == 0)
666 if (workers_per_host == 1) {
667 std::cerr <<
"Thrill: environment variable" 668 <<
' ' << str_workers_per_host
669 <<
'=' << env_workers_per_host
670 <<
" is not recommended, as one thread is used exclusively" 671 <<
" for MPI communication." 682 mem_config.
print(workers_per_host);
690 std::cerr <<
"Thrill: running in MPI network with " << num_hosts
691 <<
" hosts and " << workers_per_host <<
"+1 workers per host" 692 <<
" with " << hostname <<
" as rank " << mpi_rank <<
"." 700 auto dispatcher = std::make_unique<net::DispatcherThread>(
701 std::make_unique<net::mpi::Dispatcher>(num_hosts), mpi_rank);
703 std::array<std::unique_ptr<net::mpi::Group>, kGroupCount> groups;
706 std::array<net::GroupPtr, kGroupCount> host_groups = {
707 { std::move(groups[0]), std::move(groups[1]) }
713 std::move(dispatcher), std::move(host_groups), workers_per_host);
716 std::vector<std::thread> threads(workers_per_host);
720 [&host_context, &job_startpoint, worker] {
721 Context ctx(host_context, worker);
725 ctx.
Launch(job_startpoint);
741 #if THRILL_HAVE_NET_IB 743 int RunBackendIb(
const std::function<
void(
Context&)>& job_startpoint) {
747 const char* str_workers_per_host;
748 const char* env_workers_per_host;
751 str_workers_per_host, env_workers_per_host);
753 if (workers_per_host == 0)
760 mem_config.
print(workers_per_host);
767 std::cerr <<
"Thrill: running in IB/MPI network with " << num_hosts
768 <<
" hosts and " << workers_per_host <<
" workers per host" 770 <<
" as rank " << mpi_rank <<
"." 778 std::array<std::unique_ptr<net::ib::Group>, kGroupCount> groups;
781 std::array<net::GroupPtr, kGroupCount> host_groups = {
782 { std::move(groups[0]), std::move(groups[1]) }
787 0, mem_config, std::move(host_groups), workers_per_host);
790 std::vector<std::thread> threads(workers_per_host);
794 [&host_context, &job_startpoint, worker] {
795 Context ctx(host_context, worker);
799 ctx.
Launch(job_startpoint);
816 std::cerr <<
"Thrill: network backend " << env_net
817 <<
" is not supported by this binary." << std::endl;
824 if (getenv(
"OMPI_COMM_WORLD_SIZE") !=
nullptr ||
825 getenv(
"I_MPI_INFO_NP") !=
nullptr) {
826 #if THRILL_HAVE_NET_IB 828 #elif THRILL_HAVE_NET_MPI 831 std::cerr <<
"Thrill: MPI environment detected, but network backend mpi" 832 <<
" is not supported by this binary." << std::endl;
836 #if defined(_MSC_VER) 839 const char* env_rank = getenv(
"THRILL_RANK");
840 const char* env_hostlist = getenv(
"THRILL_HOSTLIST");
842 if (env_rank !=
nullptr || env_hostlist !=
nullptr)
851 const char* env_die_with_parent = getenv(
"THRILL_DIE_WITH_PARENT");
852 if (env_die_with_parent ==
nullptr || *env_die_with_parent == 0)
return 0;
856 long die_with_parent = std::strtol(env_die_with_parent, &endptr, 10);
857 if (endptr ==
nullptr || *endptr != 0 ||
858 (die_with_parent != 0 && die_with_parent != 1)) {
859 std::cerr <<
"Thrill: environment variable" 860 <<
" THRILL_DIE_WITH_PARENT=" << env_die_with_parent
861 <<
" is not either 0 or 1." 866 if (die_with_parent == 0)
return 0;
869 if (prctl(PR_SET_PDEATHSIG, SIGTERM) != 0)
873 std::cerr <<
"Thrill: DIE_WITH_PARENT is not supported on this platform.\n" 874 <<
"Please submit a patch." 882 const char* env_unlink_binary = getenv(
"THRILL_UNLINK_BINARY");
883 if (env_unlink_binary ==
nullptr || *env_unlink_binary == 0)
return 0;
885 if (unlink(env_unlink_binary) != 0) {
887 "Error calling unlink binary \"" 902 void load_default_config()
override;
911 void FoxxllConfig::load_default_config() {
912 TLX_LOG1 <<
"foxxll: Using default disk configuration.";
914 default_disk_path(), 1000 * 1024 * 1024, default_disk_io_impl());
925 DWORD pid = GetCurrentProcessId();
926 char* tmpstr =
new char[255];
927 if (GetTempPathA(255, tmpstr) == 0)
928 FOXXLL_THROW_WIN_LASTERROR(resource_error,
"GetTempPathA()");
936 std::string FoxxllConfig::default_config_file_name() {
942 foxxll::config::create_instance<FoxxllConfig>();
947 int Run(
const std::function<
void(
Context&)>& job_startpoint) {
960 const char* env_net = getenv(
"THRILL_NET");
963 if (env_net ==
nullptr || *env_net == 0) {
965 if (env_net ==
nullptr)
return -1;
969 if (strcmp(env_net,
"mock") == 0) {
971 return RunBackendLoopback<net::mock::Group>(
"mock", job_startpoint);
974 if (strcmp(env_net,
"local") == 0) {
975 #if THRILL_HAVE_NET_TCP 977 return RunBackendLoopback<net::tcp::Group>(
"tcp", job_startpoint);
983 if (strcmp(env_net,
"tcp") == 0) {
984 #if THRILL_HAVE_NET_TCP 986 return RunBackendTcp(job_startpoint);
992 if (strcmp(env_net,
"mpi") == 0) {
993 #if THRILL_HAVE_NET_MPI 995 return RunBackendMpi(job_startpoint);
1001 if (strcmp(env_net,
"ib") == 0) {
1002 #if THRILL_HAVE_NET_IB 1004 return RunBackendIb(job_startpoint);
1010 std::cerr <<
"Thrill: network backend " << env_net <<
" is unknown." 1027 const char* env_ram = getenv(
"THRILL_RAM");
1029 if (env_ram !=
nullptr && *env_ram != 0) {
1032 std::cerr <<
"Thrill: environment variable" 1033 <<
" THRILL_RAM=" << env_ram
1034 <<
" is not a valid amount of RAM memory." 1038 ram_ =
static_cast<size_t>(ram64);
1042 #if defined(_MSC_VER) 1043 MEMORYSTATUSEX memstx;
1044 memstx.dwLength =
sizeof(memstx);
1045 GlobalMemoryStatusEx(&memstx);
1047 ram_ = memstx.ullTotalPhys;
1050 int64_t physical_memory;
1055 mib[1] = HW_MEMSIZE;
1056 length =
sizeof(physical_memory);
1057 sysctl(mib, 2, &physical_memory, &length,
nullptr, 0);
1058 ram_ =
static_cast<size_t>(physical_memory);
1060 ram_ = sysconf(_SC_PHYS_PAGES) *
static_cast<size_t>(sysconf(_SC_PAGESIZE));
1066 if (getrlimit(RLIMIT_AS, &rl) == 0) {
1067 if (rl.rlim_cur != 0 && rl.rlim_cur * 3 / 4 < ram_) {
1068 ram_ = rl.rlim_cur * 3 / 4;
1072 sLOG1 <<
"getrlimit(): " << strerror(errno);
1085 ram_workers_ = ram_ / 3;
1086 ram_block_pool_hard_ = ram_ / 3;
1087 ram_block_pool_soft_ = ram_block_pool_hard_ * 9 / 10;
1088 ram_floating_ = ram_ - ram_block_pool_hard_ - ram_workers_;
1108 if (!verbose_)
return;
1126 std::unique_ptr<net::DispatcherThread> dispatcher,
1127 std::array<net::GroupPtr, net::Manager::kGroupCount>&& groups,
1128 size_t workers_per_host)
1145 if (local_host_id == 0)
1155 const char* env_log = getenv(
"THRILL_LOG");
1156 if (env_log ==
nullptr) {
1157 if (host_rank == 0 &&
mem_config().verbose_) {
1158 std::cerr <<
"Thrill: no THRILL_LOG was found, " 1159 <<
"so no json log is written." 1166 if (output ==
"" || output ==
"-")
1168 if (output ==
"/dev/stdout")
1170 if (output ==
"stdout")
1171 return "/dev/stdout";
1181 local_worker_id_(local_worker_id),
1190 rng_(
std::random_device { }
1201 return tlx::make_counting<data::File>(
1235 struct OverallStats {
1241 size_t max_block_bytes;
1244 size_t net_traffic_tx, net_traffic_rx;
1250 size_t io_max_allocation;
1252 friend std::ostream&
operator << (std::ostream& os,
const OverallStats& c) {
1253 return os <<
"[OverallStats" 1254 <<
" runtime=" << c.runtime
1255 <<
" max_block_bytes=" << c.max_block_bytes
1256 <<
" net_traffic_tx=" << c.net_traffic_tx
1257 <<
" net_traffic_rx=" << c.net_traffic_rx
1258 <<
" io_volume=" << c.io_volume
1259 <<
" io_max_allocation=" << c.io_max_allocation
1263 OverallStats operator + (
const OverallStats& b)
const {
1265 r.runtime =
std::max(runtime, b.runtime);
1266 r.max_block_bytes = max_block_bytes + b.max_block_bytes;
1267 r.net_traffic_tx = net_traffic_tx + b.net_traffic_tx;
1268 r.net_traffic_rx = net_traffic_rx + b.net_traffic_rx;
1269 r.io_volume = io_volume + b.io_volume;
1270 r.io_max_allocation =
std::max(io_max_allocation, b.io_max_allocation);
1276 logger_ <<
"class" <<
"Context" 1277 <<
"event" <<
"job-start";
1282 job_startpoint(*
this);
1284 catch (std::exception& e) {
1285 LOG1 <<
"worker " <<
my_rank() <<
" threw " <<
typeid(e).name();
1286 LOG1 <<
" what(): " << e.what();
1288 logger_ <<
"class" <<
"Context" 1289 <<
"event" <<
"job-exception" 1290 <<
"exception" <<
typeid(e).name()
1291 <<
"what" << e.what();
1295 logger_ <<
"class" <<
"Context" 1296 <<
"event" <<
"job-done" 1297 <<
"elapsed" << overall_timer;
1299 overall_timer.Stop();
1303 stats.runtime = overall_timer.SecondsDouble();
1305 stats.max_block_bytes =
1314 stats.io_max_allocation =
1318 stats.io_volume = 0;
1319 stats.io_max_allocation = 0;
1329 if (stats.net_traffic_rx != stats.net_traffic_tx)
1330 LOG1 <<
"Manager::Traffic() tx/rx asymmetry = " 1331 <<
tlx::abs_diff(stats.net_traffic_tx, stats.net_traffic_rx);
1336 <<
" ran " << stats.runtime <<
"s with max " 1344 logger_ <<
"class" <<
"Context" 1345 <<
"event" <<
"summary" 1346 <<
"runtime" << stats.runtime
1347 <<
"net_traffic" << stats.net_traffic_tx
1348 <<
"io_volume" << stats.io_volume
1349 <<
"io_max_allocation" << stats.io_max_allocation;
std::thread CreateThread(Args &&... args)
create a std::thread and repeat creation if it fails
CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id)
Request next stream.
size_t ram_
total amount of physical ram detected or THRILL_RAM
size_t local_host_id_
id among all local hosts (in test program runs)
net::FlowControlChannel & net
static uint_pair max()
return an uint_pair instance containing the largest value possible
size_t workers_per_host() const
Returns the number of workers that is hosted on each host.
size_t max_total_bytes() noexcept
Maximum total number of bytes allocated in blocks of this block pool.
The central object of a mock network: the Group containing links to other mock Group forming the netw...
void LogCmdlineParams(JsonLogger &logger)
size_t worker_mem_limit() const
memory limit of each worker Context for local data structures
HostContext(size_t local_host_id, const MemoryConfig &mem_config, std::unique_ptr< net::DispatcherThread > dispatcher, std::array< net::GroupPtr, net::Manager::kGroupCount > &&groups, size_t workers_per_host)
constructor from existing net Groups. Used by the construction methods.
void StartMemProfiler(common::ProfileThread &sched, common::JsonLogger &logger)
launch profiler task
void Initialize()
Initialize VFS layer.
net::Manager & net_manager_
net::Manager instance that is shared among workers
std::string MakeHostLogPath(size_t host_rank)
create host log
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Collection of NetConnections to workers, allows point-to-point client communication and simple collec...
A File is an ordered sequence of Block objects for storing items.
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
#define LOG0
Override default output: never or always output log.
const MemoryConfig & mem_config() const
host-global memory config
net::Manager & net_manager()
net manager constructs communication groups to other hosts.
An Exception which is thrown on system errors and contains errno information.
external_size_type get_read_bytes() const
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
void set_memory_limit_indication(ssize_t size)
common::JsonLogger logger_
bool unlink_on_open
unlink file immediately after opening (available on most Unix)
data::Multiplexer & data_multiplexer()
data multiplexer transmits large amounts of data asynchronously.
MemoryConfig divide(size_t hosts) const
int RunCheckDieWithParent()
Check environment variable THRILL_DIE_WITH_PARENT and enable process flag: this is useful for ssh/inv...
friend std::ostream & operator<<(std::ostream &os, const Context &ctx)
Outputs the context as [host id]:[local worker id] to an std::ostream.
int setup_detect()
detect memory configuration from environment
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
void NameThisThread(const std::string &name)
Defines a name for the current thread, only if no name was set previously.
The DIABase is the untyped super class of DIANode.
static by_string to_string(int val)
convert to string
void print(size_t workers_per_host) const
uint64_t maximum_allocation() const
return maximum number of bytes allocated during program run.
static bool Deinitialize()
static std::vector< std::unique_ptr< HostContext > > ConstructLoopbackHostContexts(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host)
Generic network constructor for net backends supporting loopback tests.
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
int RunCheckUnlinkBinary()
Check environment variable THRILL_UNLINK_BINARY and unlink given program path: this is useful for ssh...
data::BlockPool & block_pool_
data block pool
static void RunLoopbackThreads(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, size_t core_offset, const std::function< void(Context &)> &job_startpoint)
Generic runner for backends supporting loopback tests.
mem::Manager & mem_manager()
host-global memory manager
data::CatStreamPtr GetNewCatStream(size_t dia_id)
size_t start_block_size
starting size of blocks in BlockWriter.
int setenv(const char *name, const char *value, int overwrite)
setenv - change or add an environment variable Windows porting madness because setenv() is apparently...
std::string GetHostname()
get hostname
common::JsonLogger base_logger_
base logger exclusive for this host context
external_size_type get_write_bytes() const
bool parse_si_iec_units(const char *str, uint64_t *out_size, char default_unit)
Parse a string like "343KB" or "44 GiB" into the corresponding size in bytes.
size_t workers_per_host() const
number of workers per host (all have the same).
void RunLocalSameThread(const std::function< void(Context &)> &job_startpoint)
Runs the given job_startpoint within the same thread with a test network –> run test with one host a...
size_t local_worker_id_
number of this host context, 0..p-1, within this host
size_t ram_block_pool_hard_
amount of RAM dedicated to data::BlockPool – hard limit
unique_ptr< T > make_unique(Manager &manager, Args &&... args)
make_unique with Manager tracking
void RunLocalMock(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, const std::function< void(Context &)> &job_startpoint)
Function to run a number of mock hosts as locally independent threads, which communicate via internal...
void StartLinuxProcStatsProfiler(ProfileThread &, JsonLogger &)
launch profiler task
T abs_diff(const T &a, const T &b)
absolute difference, which also works for unsigned types
static bool SetupBlockSize()
void Deinitialize()
Deinitialize VFS layer.
common::JsonLogger logger_
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Reduce(const T &value, size_t root=0, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers to the given worker, provided a certain red...
int RunNotSupported(const char *env_net)
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
static instance_pointer get_instance()
return instance or create base instance if empty
size_t local_worker_id() const
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a test network with an underlying full mesh of local loopback stream sockets for testing...
data::FilePtr GetFilePtr(size_t dia_id)
MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id)
Request next stream.
void RunLocalTests(const std::function< void(Context &)> &job_startpoint)
Helper Function to execute RunLocalMock() tests using mock networks in test suite for many different ...
size_t MpiRank()
Return the rank of this process in the MPI COMM WORLD.
net::Traffic Traffic() const
calculate overall traffic for final stats
size_t my_rank() const
Global rank of this worker among all other workers in the system.
void Construct(SelectDispatcher &dispatcher, size_t my_rank, const std::vector< std::string > &endpoints, std::unique_ptr< Group > *groups, size_t group_count)
static size_t FindWorkersPerHost(const char *&str_workers_per_host, const char *&env_workers_per_host)
Context(HostContext &host_context, size_t local_worker_id)
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
data::MixStreamPtr GetNewMixStream(size_t dia_id)
static std::string to_str(const Type &t)
const size_t & dia_id() const
return unique id of DIANode subclass as stored by StatsNode
net::FlowControlChannelManager & flow_manager()
the flow control group is used for collective communication.
void Launch(const std::function< void(Context &)> &job_startpoint)
method used to launch a job's main procedure. it wraps it in log output.
data::Multiplexer & multiplexer_
data::Multiplexer instance that is shared among workers
size_t ram_block_pool_soft_
amount of RAM dedicated to data::BlockPool – soft limit
static constexpr size_t kGroupCount
The count of net::Groups to initialize.
net::FlowControlChannelManager flow_manager_
the flow control group is used for collective communication.
mem::Manager mem_manager_
host-global memory manager for internal memory only
The HostContext contains all data structures shared among workers on the same host.
void setup(size_t ram)
setup memory size
static int RunBackendLoopback(const char *backend, const std::function< void(Context &)> &job_startpoint)
Run() implementation which uses a loopback net backend ("mock" or "tcp").
size_t local_host_id() const
Returns local_host_id_.
static std::vector< std::unique_ptr< HostContext > > ConstructLoopback(size_t num_hosts, size_t workers_per_host)
Construct a number of mock hosts running in this process.
MemoryConfig & mem_config()
host-global memory config
size_t NumMpiProcesses()
Return the number of MPI processes.
MemoryConfig mem_config_
memory configuration
data::BlockPool block_pool_
data block pool
bool autogrow
autogrow file if more disk space is needed, automatically set if size == 0.
std::unique_ptr< common::ProfileThread > profiler_
thread for scheduling profiling methods for statistical output
bool enable_proc_profiler_
enable Linux /proc stats profiler (default: on)
std::unique_ptr< net::DispatcherThread > dispatcher_
main host network dispatcher thread backend
common::JsonLogger base_logger_
base logger exclusive for this worker
void SetCpuAffinity(std::thread &thread, size_t cpu_id)
set cpu/core affinity of a thread
net::Manager net_manager_
net manager constructs communication groups to other hosts.
std::string format_iec_units(uint64_t number, int precision=3)
Format number as something like 1 TiB.
size_t local_host_id_
id among all local hosts (in test program runs)
size_t workers_per_host_
number of workers per host (all have the same).
static const char * DetectNetBackend()
bool verbose_
StageBuilder verbosity flag.
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
bool Construct(size_t group_size, DispatcherThread &dispatcher, std::unique_ptr< Group > *groups, size_t group_count)
Construct Group which connects to peers using MPI.
size_t tx
transmitted bytes