14 #ifndef THRILL_API_CONTEXT_HEADER 15 #define THRILL_API_CONTEXT_HEADER 55 void setup(
size_t ram);
60 void print(
size_t workers_per_host)
const;
96 std::unique_ptr<net::DispatcherThread> dispatcher,
97 std::array<net::GroupPtr, net::Manager::kGroupCount>&& groups,
98 size_t workers_per_host);
104 static std::vector<std::unique_ptr<HostContext> >
105 ConstructLoopback(
size_t num_hosts,
size_t workers_per_host);
119 return mem_config_.ram_workers_ / workers_per_host_;
133 size_t host_rank()
const {
return net_manager_.my_host_rank(); }
181 #if !THRILL_HAVE_THREAD_SANITIZER 184 std::chrono::milliseconds(500), *profiler_, &net_manager_
190 net_manager_.GetFlowGroup(), workers_per_host_
196 &logger_, &mem_manager_, workers_per_host_
199 #if !THRILL_HAVE_THREAD_SANITIZER 202 std::chrono::milliseconds(500), *profiler_, &block_pool_
208 mem_manager_, block_pool_,
209 *dispatcher_, net_manager_.GetDataGroup(), workers_per_host_
227 void Launch(
const std::function<
void(
Context&)>& job_startpoint);
234 return net_manager_.num_hosts();
239 return workers_per_host_;
244 return workers_per_host() * host_rank() + local_worker_id();
252 return num_hosts() * workers_per_host();
258 return net_manager_.my_host_rank();
264 return local_worker_id_;
284 return data::File(block_pool_, local_worker_id_, dia_id);
320 template <
typename Stream>
341 global_size, num_workers(), my_rank());
346 global_size, workers_per_host(), local_worker_id());
351 template <
typename Type>
353 std::vector<Type> svec = { local };
355 if (my_rank() == 0) {
356 double sum = std::accumulate(svec.begin(), svec.end(), 0.0);
357 double mean = sum / svec.size();
359 double sq_sum = std::inner_product(
360 svec.begin(), svec.end(), svec.begin(), 0.0);
361 double stdev = std::sqrt(sq_sum / svec.size() - mean * mean);
363 double min = *std::min_element(svec.begin(), svec.end());
364 double max = *std::max_element(svec.begin(), svec.end());
366 LOG1 << text <<
" mean " << mean
367 <<
" max " << max <<
" stdev " << stdev
368 <<
" = " << (stdev / mean * 100.0) <<
"%" 369 <<
" max-min " << max - min
370 <<
" = " << ((max -
min) / min * 100.0) <<
"%" 371 <<
" max-mean " << max - mean
372 <<
" = " << ((max - mean) / mean * 100.0) <<
"%" 425 bool consume_ =
false;
428 size_t last_dia_id_ = 0;
435 std::default_random_engine
rng_;
463 &base_logger_,
"host_rank", host_rank(),
"worker_rank", my_rank()
477 size_t num_hosts,
size_t workers_per_host,
478 const std::function<
void(
Context&)>& job_startpoint);
493 size_t ram,
const std::function<
void(
Context&)>& job_startpoint);
542 int Run(
const std::function<
void(
Context&)>& job_startpoint);
559 #endif // !THRILL_API_CONTEXT_HEADER mem::Manager & mem_manager()
returns the host-global memory manager
size_t mem_limit_
memory limit of this worker Context for local data structures
size_t ram_
total amount of physical ram detected or THRILL_RAM
size_t local_host_id_
id among all local hosts (in test program runs)
size_t ram_floating_
remaining free-floating RAM used for user and Thrill data structures.
static uint_pair max()
return an uint_pair instance containing the largest value possible
size_t workers_per_host() const
Returns the number of workers that is hosted on each host.
void PrintCollectiveMeanStdev(const char *text, const Type &local)
common::Range CalculateLocalRange(size_t global_size) const
size_t worker_mem_limit() const
memory limit of each worker Context for local data structures
size_t num_hosts() const
Returns the total number of hosts.
net::Manager & net_manager_
net::Manager instance that is shared among workers
size_t num_workers() const
Global number of workers in the system.
A File is an ordered sequence of Block objects for storing items.
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
size_t workers_per_host_
number of workers hosted per host
const MemoryConfig & mem_config() const
host-global memory config
net::Manager & net_manager()
net manager constructs communication groups to other hosts.
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
size_t local_host_id() const
id among all local hosts (in test program runs)
common::JsonLogger logger_
represents a 1 dimensional range (interval) [begin,end)
data::Multiplexer & data_multiplexer()
data multiplexer transmits large amounts of data asynchronously.
MemoryConfig divide(size_t hosts) const
int RunCheckDieWithParent()
Check environment variable THRILL_DIE_WITH_PARENT and enable process flag: this is useful for ssh/inv...
std::default_random_engine rng_
a random generator
Compute the concatenation of two std::vector<T>s.
int setup_detect()
detect memory configuration from environment
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
mem::Manager & mem_manager_
host-global memory manager
Multiplexes virtual Connections on Dispatcher.
void enable_consume(bool consume=true)
Sets consume-mode flag such that DIA contents may be consumed during PushData().
FlowControlChannel & GetFlowControlChannel(size_t thread_id)
Gets the flow control channel for a certain thread.
The DIABase is the untyped super class of DIANode.
void print(size_t workers_per_host) const
int RunCheckUnlinkBinary()
Check environment variable THRILL_UNLINK_BINARY and unlink given program path: this is useful for ssh...
data::BlockPool & block_pool_
data block pool
mem::Manager & mem_manager()
host-global memory manager
const MemoryConfig & mem_config_
memory configuration in HostContext
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
common::JsonLogger base_logger_
base logger exclusive for this host context
size_t workers_per_host() const
number of workers per host (all have the same).
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
void RunLocalSameThread(const std::function< void(Context &)> &job_startpoint)
Runs the given job_startpoint within the same thread with a test network –> run test with one host a...
size_t next_dia_id()
Returns next_dia_id_ to generate DIA::id_ serial.
size_t local_worker_id_
number of this host context, 0..p-1, within this host
Provides a blocking collection for communication.
size_t ram_block_pool_hard_
amount of RAM dedicated to data::BlockPool – hard limit
void RunLocalMock(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, const std::function< void(Context &)> &job_startpoint)
Function to run a number of mock hosts as locally independent threads, which communicate via internal...
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
bool consume() const
return value of consume flag.
size_t local_worker_id() const
void RunLocalTests(const std::function< void(Context &)> &job_startpoint)
Helper Function to execute RunLocalMock() tests using mock networks in test suite for many different ...
size_t my_rank() const
Global rank of this worker among all other workers in the system.
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
net::FlowControlChannelManager & flow_manager()
the flow control group is used for collective communication.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
data::Multiplexer & multiplexer_
data::Multiplexer instance that is shared among workers
size_t ram_block_pool_soft_
amount of RAM dedicated to data::BlockPool – soft limit
Object shared by allocators and other classes to track memory allocations.
common::Range CalculateLocalRangeOnHost(size_t global_size) const
The HostContext contains all data structures shared among workers on the same host.
net::Manager & net_manager()
void setup(size_t ram)
setup memory size
size_t local_host_id() const
Returns local_host_id_.
JsonLogger is a receiver of JSON output objects for logging.
MemoryConfig & mem_config()
host-global memory config
net::FlowControlChannelManager & flow_manager_
net::FlowControlChannelManager instance that is shared among workers
MemoryConfig mem_config_
memory configuration
std::unique_ptr< common::ProfileThread > profiler_
thread for scheduling profiling methods for statistical output
bool enable_proc_profiler_
enable Linux /proc stats profiler (default: on)
Initializes communication channels, manages communication channels and handles errors.
std::unique_ptr< net::DispatcherThread > dispatcher_
main host network dispatcher thread backend
common::JsonLogger base_logger_
base logger exclusive for this worker
net::Manager net_manager_
net manager constructs communication groups to other hosts.
size_t mem_limit() const
memory limit of this worker Context for local data structures
size_t local_host_id_
id among all local hosts (in test program runs)
size_t workers_per_host_
number of workers per host (all have the same).
bool verbose_
StageBuilder verbosity flag.
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
std::ostream & operator<<(std::ostream &os, const DIABase &d)
make ostream-able.