Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
context.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/context.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  * Copyright (C) 2015 Timo Bingmann <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_API_CONTEXT_HEADER
15 #define THRILL_API_CONTEXT_HEADER
16 
17 #include <thrill/common/config.hpp>
23 #include <thrill/data/file.hpp>
28 #include <thrill/net/manager.hpp>
29 
30 #include <algorithm>
31 #include <cassert>
32 #include <functional>
33 #include <numeric>
34 #include <random>
35 #include <string>
36 #include <tuple>
37 #include <vector>
38 
39 namespace thrill {
40 namespace api {
41 
42 //! \ingroup api_layer
43 //! \{
44 
45 // forward declarations
46 class DIABase;
47 
49 {
50 public:
51  //! detect memory configuration from environment
52  int setup_detect();
53 
54  //! setup memory size
55  void setup(size_t ram);
56 
57  MemoryConfig divide(size_t hosts) const;
58  void apply();
59 
60  void print(size_t workers_per_host) const;
61 
62  //! total amount of physical ram detected or THRILL_RAM
63  size_t ram_;
64 
65  //! amount of RAM dedicated to data::BlockPool -- hard limit
67 
68  //! amount of RAM dedicated to data::BlockPool -- soft limit
70 
71  //! total amount of RAM for DIANode data structures such as the reduce
72  //! tables. divide by the number of worker threads before use.
73  size_t ram_workers_;
74 
75  //! remaining free-floating RAM used for user and Thrill data structures.
76  size_t ram_floating_;
77 
78  //! StageBuilder verbosity flag
79  bool verbose_ = true;
80 };
81 
82 /*!
83  * The HostContext contains all data structures shared among workers on the same
84  * host. It is used to construct and destroy them. For testing multiple
85  * instances are run in the same process.
86  */
88 {
89 public:
90 #ifndef SWIG
91  //! constructor from existing net Groups. Used by the construction methods.
93  std::unique_ptr<net::DispatcherThread> dispatcher,
94  std::array<net::GroupPtr, net::Manager::kGroupCount>&& groups,
95  size_t workers_per_host);
96 
97  //! destructor
98  ~HostContext();
99 
100  //! Construct a number of mock hosts running in this process.
101  static std::vector<std::unique_ptr<HostContext> >
102  ConstructLoopback(size_t num_hosts, size_t workers_per_host);
103 #endif
104 
105  //! create host log
107 
108  //! Returns local_host_id_
109  size_t local_host_id() const { return local_host_id_; }
110 
111  //! number of workers per host (all have the same).
112  size_t workers_per_host() const { return workers_per_host_; }
113 
114  //! memory limit of each worker Context for local data structures
115  size_t worker_mem_limit() const {
117  }
118 
119  //! host-global memory config
121 
122  //! host-global memory manager
124 
125  //! net manager constructs communication groups to other hosts.
127 
128  //! Returns id of this host in the cluser. A host is a machine in the
129  //! cluster that hosts multiple workers
130  size_t host_rank() const { return net_manager_.my_host_rank(); }
131 
132  //! the flow control group is used for collective communication.
134 
135  //! the block manager keeps all data blocks moving through the system.
137 
138  //! data multiplexer transmits large amounts of data asynchronously.
140 
141 private:
142  //! memory configuration
144 
145 public:
146  //! \name Logging System
147  //! \{
148 
149  //! base logger exclusive for this host context
151 
152  //! public member which delivers key:value pairs as JSON log lines. this
153  //! logger is local to this Context which is exclusive for one worker
154  //! thread.
156 
157  //! thread for scheduling profiling methods for statistical output
158  std::unique_ptr<common::ProfileThread> profiler_;
159 
160  //! \}
161 
162 private:
163  //! id among all _local_ hosts (in test program runs)
165 
166  //! number of workers per host (all have the same).
168 
169  //! host-global memory manager for internal memory only
170  mem::Manager mem_manager_ { nullptr, "HostContext" };
171 
172  //! main host network dispatcher thread backend
173  std::unique_ptr<net::DispatcherThread> dispatcher_;
174 
175  //! net manager constructs communication groups to other hosts.
177 
178 #if !THRILL_HAVE_THREAD_SANITIZER
179  //! register net_manager_'s profiling method
181  std::chrono::milliseconds(500), *profiler_, &net_manager_
182  };
183 #endif
184 
185  //! the flow control group is used for collective communication.
188  };
189 
190  //! data block pool
194  };
195 
196 #if !THRILL_HAVE_THREAD_SANITIZER
197  //! register BlockPool's profiling method
199  std::chrono::milliseconds(500), *profiler_, &block_pool_
200  };
201 #endif
202 
203  //! data multiplexer transmits large amounts of data asynchronously.
207  };
208 };
209 
210 /*!
211  * The Context of a job is a unique instance per worker which holds references
212  * to all underlying parts of Thrill. The context is able to give references to
213  * the \ref data::Multiplexer "stream multiplexer", the \ref net::Group "net
214  * group" \ref common::Stats "stats" and \ref common::StatsGraph "stats graph".
215  * Threads share the stream multiplexer and the net group via the context
216  * object.
217  */
218 class Context
219 {
220 public:
221  Context(HostContext& host_context, size_t local_worker_id);
222 
223  //! method used to launch a job's main procedure. it wraps it in log output.
224  void Launch(const std::function<void(Context&)>& job_startpoint);
225 
226  //! \name System Information
227  //! \{
228 
229  //! Returns the total number of hosts.
230  size_t num_hosts() const {
231  return net_manager_.num_hosts();
232  }
233 
234  //! Returns the number of workers that is hosted on each host
235  size_t workers_per_host() const {
236  return workers_per_host_;
237  }
238 
239  //! Global rank of this worker among all other workers in the system.
240  size_t my_rank() const {
241  return workers_per_host() * host_rank() + local_worker_id();
242  }
243 
244  //! memory limit of this worker Context for local data structures
245  size_t mem_limit() const { return mem_limit_; }
246 
247  //! Global number of workers in the system.
248  size_t num_workers() const {
249  return num_hosts() * workers_per_host();
250  }
251 
252  //! Returns id of this host in the cluser
253  //! A host is a machine in the cluster that hosts multiple workers
254  size_t host_rank() const {
255  return net_manager_.my_host_rank();
256  }
257 
258  //! Returns the local id ot this worker on the host
259  //! A worker is _locally_ identified by this id
260  size_t local_worker_id() const {
261  return local_worker_id_;
262  }
263 
264  //! id among all _local_ hosts (in test program runs)
265  size_t local_host_id() const { return local_host_id_; }
266 
267 #ifndef SWIG
268  //! Outputs the context as [host id]:[local worker id] to an std::ostream
269  friend std ::ostream& operator << (std::ostream& os, const Context& ctx) {
270  return os << ctx.host_rank() << ":" << ctx.local_worker_id();
271  }
272 #endif
273 
274  //! \}
275 
276  //! \name Data Subsystem
277  //! \{
278 
279  //! Returns a new File object containing a sequence of local Blocks.
280  data::File GetFile(size_t dia_id) {
281  return data::File(block_pool_, local_worker_id_, dia_id);
282  }
283 
284  //! Returns a new File object containing a sequence of local Blocks.
285  data::File GetFile(DIABase* dia);
286 
287  //! Returns a new File, wrapped in a CountingPtr, containing a sequence of
288  //! local Blocks.
289  data::FilePtr GetFilePtr(size_t dia_id);
290 
291  //! Returns a new File, wrapped in a CountingPtr, containing a sequence of
292  //! local Blocks.
294 
295  //! Returns a reference to a new CatStream. This method alters the state of
296  //! the context and must be called on all Workers to ensure correct
297  //! communication coordination.
298  data::CatStreamPtr GetNewCatStream(size_t dia_id);
299 
300  //! Returns a reference to a new CatStream. This method alters the state of
301  //! the context and must be called on all Workers to ensure correct
302  //! communication coordination.
304 
305  //! Returns a reference to a new MixStream. This method alters the state
306  //! of the context and must be called on all Workers to ensure correct
307  //! communication coordination.
308  data::MixStreamPtr GetNewMixStream(size_t dia_id);
309 
310  //! Returns a reference to a new MixStream. This method alters the state
311  //! of the context and must be called on all Workers to ensure correct
312  //! communication coordination.
314 
315  //! Returns a reference to a new CatStream or MixStream, selectable via
316  //! template parameter.
317  template <typename Stream>
318  tlx::CountingPtr<Stream> GetNewStream(size_t dia_id);
319 
320  //! the block manager keeps all data blocks moving through the system.
322 
323  //! \}
324 
325  //! host-global memory config
326  const MemoryConfig& mem_config() const { return mem_config_; }
327 
328  //! returns the host-global memory manager
330 
332 
333  //! given a global range [0,global_size) and p PEs to split the range, calculate
334  //! the [local_begin,local_end) index range assigned to the PE i. Takes the
335  //! information from the Context.
336  common::Range CalculateLocalRange(size_t global_size) const {
338  global_size, num_workers(), my_rank());
339  }
340 
341  common::Range CalculateLocalRangeOnHost(size_t global_size) const {
343  global_size, workers_per_host(), local_worker_id());
344  }
345 
346  //! Perform collectives and print min, max, mean, stdev, and all local
347  //! values.
348  template <typename Type>
349  void PrintCollectiveMeanStdev(const char* text, const Type& local) {
350  std::vector<Type> svec = { local };
351  svec = net.Reduce(svec, 0, common::VectorConcat<Type>());
352  if (my_rank() == 0) {
353  double sum = std::accumulate(svec.begin(), svec.end(), 0.0);
354  double mean = sum / svec.size();
355 
356  double sq_sum = std::inner_product(
357  svec.begin(), svec.end(), svec.begin(), 0.0);
358  double stdev = std::sqrt(sq_sum / svec.size() - mean * mean);
359 
360  double min = *std::min_element(svec.begin(), svec.end());
361  double max = *std::max_element(svec.begin(), svec.end());
362 
363  LOG1 << text << " mean " << mean
364  << " max " << max << " stdev " << stdev
365  << " = " << (stdev / mean * 100.0) << "%"
366  << " max-min " << max - min
367  << " = " << ((max - min) / min * 100.0) << "%"
368  << " max-mean " << max - mean
369  << " = " << ((max - mean) / mean * 100.0) << "%"
370  << " svec " << svec;
371  }
372  }
373 
374  //! return value of consume flag.
375  bool consume() const { return consume_; }
376 
377  /*!
378  * Sets consume-mode flag such that DIA contents may be consumed during
379  * PushData(). When in consume mode the DIA contents is destroyed online
380  * when it is transmitted to the next operation. This enables reusing the
381  * space of the consume operations. This enabled processing more data with
382  * less space. However, by default this mode is DISABLED, because it
383  * requires deliberate insertion of .Keep() calls.
384  */
385  void enable_consume(bool consume = true) { consume_ = consume; }
386 
387  //! Returns next_dia_id_ to generate DIA::id_ serial.
388  size_t next_dia_id() { return ++last_dia_id_; }
389 
390 private:
391  //! id among all _local_ hosts (in test program runs)
393 
394  //! number of this host context, 0..p-1, within this host
396 
397  //! number of workers hosted per host
399 
400  //! memory limit of this worker Context for local data structures
401  size_t mem_limit_;
402 
403  //! memory configuration in HostContext
405 
406  //! host-global memory manager
408 
409  //! net::Manager instance that is shared among workers
411 
412  //! net::FlowControlChannelManager instance that is shared among workers
414 
415  //! data block pool
417 
418  //! data::Multiplexer instance that is shared among workers
420 
421  //! flag to set which enables selective consumption of DIA contents!
422  bool consume_ = false;
423 
424  //! the number of valid DIA ids. 0 is reserved for invalid.
425  size_t last_dia_id_ = 0;
426 
427 public:
428  //! \name Shared Objects
429  //! \{
430 
431  //! a random generator
432  std::default_random_engine rng_;
433 
434  //! \}
435 
436 public:
437  //! \name Network Subsystem
438  //! \{
439 
440  //! public member which exposes all network primitives from
441  //! FlowControlChannel for DOp implementations. Use it as
442  //! `context_.net.Method()`.
445  };
446 
447  //! \}
448 
449 public:
450  //! \name Logging System
451  //! \{
452 
453  //! base logger exclusive for this worker
455 
456  //! public member which delivers key:value pairs as JSON log lines. this
457  //! logger is local to this Context which is exclusive for one worker
458  //! thread.
460  &base_logger_, "host_rank", host_rank(), "worker_rank", my_rank()
461  };
462 
463  //! \}
464 };
465 
466 //! \name Run Methods with Internal Networks for Testing
467 //! \{
468 
469 /*!
470  * Function to run a number of mock hosts as locally independent threads, which
471  * communicate via internal stream sockets.
472  */
473 void RunLocalMock(const MemoryConfig& mem_config,
474  size_t num_hosts, size_t workers_per_host,
475  const std::function<void(Context&)>& job_startpoint);
476 
477 /*!
478  * Helper Function to execute RunLocalMock() tests using mock networks in test
479  * suite for many different numbers of workers and hosts as independent threads
480  * in one program. Use this function in most test cases.
481  */
482 void RunLocalTests(const std::function<void(Context&)>& job_startpoint);
483 
484 /*!
485  * Helper Function to execute RunLocalMock() tests using mock networks in test
486  * suite for many different numbers of workers and hosts as independent threads
487  * in one program. Use this function in most test cases.
488  */
489 void RunLocalTests(
490  size_t ram, const std::function<void(Context&)>& job_startpoint);
491 
492 /*!
493  * Runs the given job_startpoint within the same thread with a test network -->
494  * run test with one host and one thread.
495  */
496 void RunLocalSameThread(const std::function<void(Context&)>& job_startpoint);
497 
498 /*!
499  * Check environment variable THRILL_DIE_WITH_PARENT and enable process flag:
500  * this is useful for ssh/invoke.sh: it kills spawned processes when the ssh
501  * connection breaks. Hence: no more zombies.
502  */
504 
505 /*!
506  * Check environment variable THRILL_UNLINK_BINARY and unlink given program
507  * path: this is useful for ssh/invoke.sh: it removes the copied program files
508  * _while_ it is running, hence it is gone even if the program crashes.
509  */
511 
512 //! \}
513 
514 /*!
515  * Runs the given job startpoint with a Context instance. Startpoints may be
516  * called multiple times with concurrent threads and different context instances
517  * across different workers. The Thrill configuration is taken from environment
518  * variables starting the THRILL_.
519  *
520  * THRILL_NET is the network backend to use, e.g.: mock, local, tcp, or mpi.
521  *
522  * THRILL_RANK contains the rank of this worker
523  *
524  * THRILL_HOSTLIST contains a space- or comma-separated list of host:ports to
525  * connect to.
526  *
527  * THRILL_WORKERS_PER_HOST is the number of workers (threads) per host.
528  *
529  * Additional variables:
530  *
531  * THRILL_DIE_WITH_PARENT sets a flag which terminates the program if the caller
532  * terminates (this is automatically set by ssh/invoke.sh). No more zombies.
533  *
534  * THRILL_UNLINK_BINARY deletes a file. Used by ssh/invoke.sh to unlink a copied
535  * program binary while it is running. Hence, it can keep /tmp clean.
536  *
537  * \returns 0 if execution was fine on all threads.
538  */
539 int Run(const std::function<void(Context&)>& job_startpoint);
540 
541 //! \}
542 
543 } // namespace api
544 
545 //! imported from api namespace
546 using api::HostContext;
547 
548 //! imported from api namespace
549 using api::Context;
550 
551 //! imported from api namespace
552 using api::Run;
553 
554 } // namespace thrill
555 
556 #endif // !THRILL_API_CONTEXT_HEADER
557 
558 /******************************************************************************/
mem::Manager & mem_manager()
returns the host-global memory manager
Definition: context.hpp:329
size_t mem_limit_
memory limit of this worker Context for local data structures
Definition: context.hpp:401
size_t ram_
total amount of physical ram detected or THRILL_RAM
Definition: context.hpp:63
size_t local_host_id_
id among all local hosts (in test program runs)
Definition: context.hpp:392
net::FlowControlChannel & net
Definition: context.hpp:443
size_t ram_floating_
remaining free-floating RAM used for user and Thrill data structures.
Definition: context.hpp:76
MemoryConfig divide(size_t hosts) const
Definition: context.cpp:1007
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
size_t num_hosts() const
Returns the total number of hosts.
Definition: context.hpp:230
size_t worker_mem_limit() const
memory limit of each worker Context for local data structures
Definition: context.hpp:115
Group & GetDataGroup()
Returns the net::Group for the data manager.
Definition: manager.hpp:90
size_t host_rank() const
Definition: context.hpp:254
void PrintCollectiveMeanStdev(const char *text, const Type &local)
Definition: context.hpp:349
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
bool consume() const
return value of consume flag.
Definition: context.hpp:375
HostContext(size_t local_host_id, const MemoryConfig &mem_config, std::unique_ptr< net::DispatcherThread > dispatcher, std::array< net::GroupPtr, net::Manager::kGroupCount > &&groups, size_t workers_per_host)
constructor from existing net Groups. Used by the construction methods.
Definition: context.cpp:1035
net::Manager & net_manager_
net::Manager instance that is shared among workers
Definition: context.hpp:410
std::string MakeHostLogPath(size_t host_rank)
create host log
Definition: context.cpp:1065
#define LOG1
Definition: logger.hpp:28
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:863
size_t workers_per_host_
number of workers hosted per host
Definition: context.hpp:398
Type
VFS object type.
Definition: file_io.hpp:52
net::Manager & net_manager()
net manager constructs communication groups to other hosts.
Definition: context.hpp:126
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:136
common::JsonLogger logger_
Definition: context.hpp:155
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
const MemoryConfig & mem_config() const
host-global memory config
Definition: context.hpp:326
data::Multiplexer & data_multiplexer()
data multiplexer transmits large amounts of data asynchronously.
Definition: context.hpp:139
size_t num_hosts() const
Definition: manager.hpp:67
int RunCheckDieWithParent()
Check environment variable THRILL_DIE_WITH_PARENT and enable process flag: this is useful for ssh/inv...
Definition: context.cpp:818
std::default_random_engine rng_
a random generator
Definition: context.hpp:432
Compute the concatenation of two std::vector<T>s.
Definition: functional.hpp:159
int setup_detect()
detect memory configuration from environment
Definition: context.cpp:935
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:218
size_t workers_per_host() const
number of workers per host (all have the same).
Definition: context.hpp:112
mem::Manager & mem_manager_
host-global memory manager
Definition: context.hpp:407
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
void enable_consume(bool consume=true)
Sets consume-mode flag such that DIA contents may be consumed during PushData().
Definition: context.hpp:385
friend std::ostream & operator<<(std::ostream &os, const Context &ctx)
Outputs the context as [host id]:[local worker id] to an std::ostream.
Definition: context.hpp:269
FlowControlChannel & GetFlowControlChannel(size_t thread_id)
Gets the flow control channel for a certain thread.
size_t host_rank() const
Definition: context.hpp:130
The DIABase is the untyped super class of DIANode.
Definition: dia_base.hpp:87
int RunCheckUnlinkBinary()
Check environment variable THRILL_UNLINK_BINARY and unlink given program path: this is useful for ssh...
Definition: context.cpp:849
data::BlockPool & block_pool_
data block pool
Definition: context.hpp:416
size_t workers_per_host() const
Returns the number of workers that is hosted on each host.
Definition: context.hpp:235
mem::Manager & mem_manager()
host-global memory manager
Definition: context.hpp:123
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1120
const MemoryConfig & mem_config_
memory configuration in HostContext
Definition: context.hpp:404
~HostContext()
destructor
Definition: context.cpp:1060
Group & GetFlowGroup()
Returns the net::Group for the flow control channel.
Definition: manager.hpp:85
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
Definition: math.hpp:110
data::Multiplexer data_multiplexer_
data multiplexer transmits large amounts of data asynchronously.
Definition: context.hpp:204
common::JsonLogger base_logger_
base logger exclusive for this host context
Definition: context.hpp:150
void print(size_t workers_per_host) const
Definition: context.cpp:1019
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
void RunLocalSameThread(const std::function< void(Context &)> &job_startpoint)
Runs the given job_startpoint within the same thread with a test network –> run test with one host an...
Definition: context.cpp:345
size_t next_dia_id()
Returns next_dia_id_ to generate DIA::id_ serial.
Definition: context.hpp:388
size_t local_worker_id_
number of this host context, 0..p-1, within this host
Definition: context.hpp:395
size_t local_host_id() const
id among all local hosts (in test program runs)
Definition: context.hpp:265
Provides a blocking collection for communication.
size_t ram_block_pool_hard_
amount of RAM dedicated to data::BlockPool – hard limit
Definition: context.hpp:66
void RunLocalMock(const MemoryConfig &mem_config, size_t num_hosts, size_t workers_per_host, const std::function< void(Context &)> &job_startpoint)
Function to run a number of mock hosts as locally independent threads, which communicate via internal...
Definition: context.cpp:298
common::JsonLogger logger_
Definition: context.hpp:459
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Reduce(const T &value, size_t root=0, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers to the given worker, provided a certain red...
common::Range CalculateLocalRange(size_t global_size) const
Definition: context.hpp:336
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
data::FilePtr GetFilePtr(size_t dia_id)
Definition: context.cpp:1111
void RunLocalTests(const std::function< void(Context &)> &job_startpoint)
Helper Function to execute RunLocalMock() tests using mock networks in test suite for many different ...
Definition: context.cpp:318
common::Range CalculateLocalRangeOnHost(size_t global_size) const
Definition: context.hpp:341
Context(HostContext &host_context, size_t local_worker_id)
Definition: context.cpp:1090
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
data::MixStreamPtr GetNewMixStream(size_t dia_id)
Definition: context.cpp:1128
common::ProfileTaskRegistration net_manager_profiler_
register net_manager_'s profiling method
Definition: context.hpp:180
net::FlowControlChannelManager & flow_manager()
the flow control group is used for collective communication.
Definition: context.hpp:133
bool consume_
flag to set which enables selective consumption of DIA contents!
Definition: context.hpp:422
void Launch(const std::function< void(Context &)> &job_startpoint)
method used to launch a job's main procedure. it wraps it in log output.
Definition: context.cpp:1186
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
data::Multiplexer & multiplexer_
data::Multiplexer instance that is shared among workers
Definition: context.hpp:419
size_t ram_block_pool_soft_
amount of RAM dedicated to data::BlockPool – soft limit
Definition: context.hpp:69
Object shared by allocators and other classes to track memory allocations.
Definition: manager.hpp:28
net::FlowControlChannelManager flow_manager_
the flow control group is used for collective communication.
Definition: context.hpp:186
mem::Manager mem_manager_
host-global memory manager for internal memory only
Definition: context.hpp:170
The HostContext contains all data structures shared among workers on the same host.
Definition: context.hpp:87
size_t local_worker_id() const
Definition: context.hpp:260
net::Manager & net_manager()
Definition: context.hpp:331
void setup(size_t ram)
setup memory size
Definition: context.cpp:930
common::ProfileTaskRegistration block_pool_profiler_
register BlockPool's profiling method
Definition: context.hpp:198
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
JsonLogger is a receiver of JSON output objects for logging.
Definition: json_logger.hpp:69
static std::vector< std::unique_ptr< HostContext > > ConstructLoopback(size_t num_hosts, size_t workers_per_host)
Construct a number of mock hosts running in this process.
Definition: context.cpp:307
MemoryConfig & mem_config()
host-global memory config
Definition: context.hpp:120
net::FlowControlChannelManager & flow_manager_
net::FlowControlChannelManager instance that is shared among workers
Definition: context.hpp:413
MemoryConfig mem_config_
memory configuration
Definition: context.hpp:143
data::BlockPool block_pool_
data block pool
Definition: context.hpp:191
size_t last_dia_id_
the number of valid DIA ids. 0 is reserved for invalid.
Definition: context.hpp:425
std::unique_ptr< common::ProfileThread > profiler_
thread for scheduling profiling methods for statistical output
Definition: context.hpp:158
size_t local_host_id() const
Returns local_host_id_.
Definition: context.hpp:109
Initializes communication channels, manages communication channels and handles errors.
Definition: manager.hpp:53
std::unique_ptr< net::DispatcherThread > dispatcher_
main host network dispatcher thread backend
Definition: context.hpp:173
common::JsonLogger base_logger_
base logger exclusive for this worker
Definition: context.hpp:454
tlx::CountingPtr< Stream > GetNewStream(size_t dia_id)
net::Manager net_manager_
net manager constructs communication groups to other hosts.
Definition: context.hpp:176
size_t my_host_rank() const
Definition: manager.hpp:63
size_t local_host_id_
id among all local hosts (in test program runs)
Definition: context.hpp:164
size_t workers_per_host_
number of workers per host (all have the same).
Definition: context.hpp:167
size_t mem_limit() const
memory limit of this worker Context for local data structures
Definition: context.hpp:245
bool verbose_
StageBuilder verbosity flag.
Definition: context.hpp:79
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:321