Thrill
0.1
|
The Context of a job is a unique instance per worker which holds references to all underlying parts of Thrill.
The context is able to give references to the stream multiplexer, the netgroup" \ref common::Stats "stats" and \ref common::StatsGraph "stats graph". Threads share the stream multiplexer and the net group via the context object.
Definition at line 221 of file context.hpp.
#include <context.hpp>
Public Member Functions | |
Context (HostContext &host_context, size_t local_worker_id) | |
common::Range | CalculateLocalRange (size_t global_size) const |
common::Range | CalculateLocalRangeOnHost (size_t global_size) const |
bool | consume () const |
return value of consume flag. More... | |
void | enable_consume (bool consume=true) |
Sets consume-mode flag such that DIA contents may be consumed during PushData(). More... | |
void | Launch (const std::function< void(Context &)> &job_startpoint) |
method used to launch a job's main procedure. it wraps it in log output. More... | |
const MemoryConfig & | mem_config () const |
host-global memory config More... | |
mem::Manager & | mem_manager () |
returns the host-global memory manager More... | |
net::Manager & | net_manager () |
size_t | next_dia_id () |
Returns next_dia_id_ to generate DIA::id_ serial. More... | |
template<typename Type > | |
void | PrintCollectiveMeanStdev (const char *text, const Type &local) |
Data Subsystem | |
data::File | GetFile (size_t dia_id) |
Returns a new File object containing a sequence of local Blocks. More... | |
data::File | GetFile (DIABase *dia) |
Returns a new File object containing a sequence of local Blocks. More... | |
data::FilePtr | GetFilePtr (size_t dia_id) |
data::FilePtr | GetFilePtr (DIABase *dia) |
data::CatStreamPtr | GetNewCatStream (size_t dia_id) |
data::CatStreamPtr | GetNewCatStream (DIABase *dia) |
data::MixStreamPtr | GetNewMixStream (size_t dia_id) |
data::MixStreamPtr | GetNewMixStream (DIABase *dia) |
template<typename Stream > | |
tlx::CountingPtr< Stream > | GetNewStream (size_t dia_id) |
data::BlockPool & | block_pool () |
the block manager keeps all data blocks moving through the system. More... | |
Public Attributes | |
Shared Objects | |
std::default_random_engine | rng_ |
a random generator More... | |
Network Subsystem | |
net::FlowControlChannel & | net |
Logging System | |
common::JsonLogger | base_logger_ |
base logger exclusive for this worker More... | |
common::JsonLogger | logger_ |
Private Attributes | |
data::BlockPool & | block_pool_ |
data block pool More... | |
bool | consume_ = false |
flag to set which enables selective consumption of DIA contents! More... | |
net::FlowControlChannelManager & | flow_manager_ |
net::FlowControlChannelManager instance that is shared among workers More... | |
size_t | last_dia_id_ = 0 |
the number of valid DIA ids. 0 is reserved for invalid. More... | |
size_t | local_host_id_ |
id among all local hosts (in test program runs) More... | |
size_t | local_worker_id_ |
number of this host context, 0..p-1, within this host More... | |
const MemoryConfig & | mem_config_ |
memory configuration in HostContext More... | |
size_t | mem_limit_ |
memory limit of this worker Context for local data structures More... | |
mem::Manager & | mem_manager_ |
host-global memory manager More... | |
data::Multiplexer & | multiplexer_ |
data::Multiplexer instance that is shared among workers More... | |
net::Manager & | net_manager_ |
net::Manager instance that is shared among workers More... | |
size_t | workers_per_host_ |
number of workers hosted per host More... | |
System Information | |
size_t | num_hosts () const |
Returns the total number of hosts. More... | |
size_t | workers_per_host () const |
Returns the number of workers that is hosted on each host. More... | |
size_t | my_rank () const |
Global rank of this worker among all other workers in the system. More... | |
size_t | mem_limit () const |
memory limit of this worker Context for local data structures More... | |
size_t | num_workers () const |
Global number of workers in the system. More... | |
size_t | host_rank () const |
size_t | local_worker_id () const |
size_t | local_host_id () const |
id among all local hosts (in test program runs) More... | |
std::ostream & | operator<< (std::ostream &os, const Context &ctx) |
Outputs the context as [host id]:[local worker id] to an std::ostream. More... | |
Context | ( | HostContext & | host_context, |
size_t | local_worker_id | ||
) |
Definition at line 1179 of file context.cpp.
References Context::base_logger_, Context::local_worker_id(), Context::local_worker_id_, and Context::workers_per_host().
|
inline |
the block manager keeps all data blocks moving through the system.
Definition at line 324 of file context.hpp.
Referenced by Context::Launch(), JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >::MakePuller(), JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >::MergeFiles(), ReadBinaryNode< ValueType >::VfsFileBlockSource::NextBlock(), GroupByNode< ValueType, KeyExtractor, GroupFunction, HashFunction, UseLocationDetection >::PushData(), SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::PushData(), ReadBinaryNode< ValueType >::ReadBinaryNode(), and JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >::SortAndWriteToFile().
|
inline |
given a global range [0,global_size) and p PEs to split the range, calculate the [local_begin,local_end) index range assigned to the PE i. Takes the information from the Context.
Definition at line 339 of file context.hpp.
References thrill::common::CalculateLocalRange().
Referenced by ReadLinesNode::InputLineIteratorCompressed::InputLineIteratorCompressed(), ReadLinesNode::InputLineIteratorUncompressed::InputLineIteratorUncompressed(), EqualToDIANode< ValueType >::PushData(), GenerateNode< ValueType, GenerateFunction >::PushData(), and ReadBinaryNode< ValueType >::ReadBinaryNode().
|
inline |
Definition at line 344 of file context.hpp.
References thrill::common::CalculateLocalRange().
Referenced by ReadLinesNode::InputLineIteratorCompressed::InputLineIteratorCompressed(), ReadLinesNode::InputLineIteratorUncompressed::InputLineIteratorUncompressed(), and ReadBinaryNode< ValueType >::ReadBinaryNode().
|
inline |
return value of consume flag.
Definition at line 378 of file context.hpp.
Referenced by DIANode< StackInput >::RunPushData().
|
inline |
Sets consume-mode flag such that DIA contents may be consumed during PushData().
When in consume mode the DIA contents is destroyed online when it is transmitted to the next operation. This enables reusing the space of the consume operations. This enabled processing more data with less space. However, by default this mode is DISABLED, because it requires deliberate insertion of .Keep() calls.
Definition at line 388 of file context.hpp.
Referenced by JoinTPCH4(), main(), RunHashWordCount(), RunHashWordCountGenerated(), RunJoinPageRankEdgePerLine(), RunPageRankEdgePerLine(), RunPageRankGenerated(), RunPageRankJoinGenerated(), RunWordCount(), and RunWordCountGenerated().
|
inline |
Returns a new File object containing a sequence of local Blocks.
Definition at line 283 of file context.hpp.
Referenced by ConcatNode< ValueType >::ConcatNode(), GroupToIndexNode< ValueType, KeyExtractor, GroupFunction >::FlushVectorToFile(), GroupByNode< ValueType, KeyExtractor, GroupFunction, HashFunction, UseLocationDetection >::FlushVectorToFile(), Context::GetFile(), GroupByNode< ValueType, KeyExtractor, GroupFunction, HashFunction, UseLocationDetection >::GroupByNode(), JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >::MergeFiles(), SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::PartialMultiwayMerge(), GroupByNode< ValueType, KeyExtractor, GroupFunction, HashFunction, UseLocationDetection >::PushData(), ReduceTable< TableItem, Key, Value, KeyExtractor, ReduceFunction, Emitter, VolatileKey, ReduceConfig_, IndexFunction, KeyEqualFunction >::ReduceTable(), JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >::SortAndWriteToFile(), SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::SortAndWriteToFile(), and ZipNode< ValueType, ZipFunction, Pad, UnequalCheck, NoRebalance, kNumInputs >::ZipNode().
data::File GetFile | ( | DIABase * | dia | ) |
Returns a new File object containing a sequence of local Blocks.
Definition at line 1196 of file context.cpp.
References DIABase::dia_id(), and Context::GetFile().
data::FilePtr GetFilePtr | ( | size_t | dia_id | ) |
Returns a new File, wrapped in a CountingPtr, containing a sequence of local Blocks.
Definition at line 1200 of file context.cpp.
References Context::block_pool_, and Context::local_worker_id_.
Referenced by JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >::AddEqualKeysToVec(), Context::GetFilePtr(), ReduceByIndexPostPhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, thrill::api::ReduceToIndexNode::Emitter, VolatileKey, ReduceConfig >::Initialize(), MergeNode< ValueType, Comparator, kNumInputs >::MergeNode(), and ReduceByIndexPostPhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, thrill::api::ReduceToIndexNode::Emitter, VolatileKey, ReduceConfig >::PushData().
data::FilePtr GetFilePtr | ( | DIABase * | dia | ) |
Returns a new File, wrapped in a CountingPtr, containing a sequence of local Blocks.
Definition at line 1205 of file context.cpp.
References DIABase::dia_id(), and Context::GetFilePtr().
data::CatStreamPtr GetNewCatStream | ( | size_t | dia_id | ) |
Returns a reference to a new CatStream. This method alters the state of the context and must be called on all Workers to ensure correct communication coordination.
Definition at line 1209 of file context.cpp.
References Multiplexer::GetNewCatStream(), Context::local_worker_id_, and Context::multiplexer_.
Referenced by thrill::api::Context::GetNewStream< data::CatStream >(), ZipNode< ValueType, ZipFunction, Pad, UnequalCheck, NoRebalance, kNumInputs >::DoScatter(), ConcatNode< ValueType >::Execute(), DuplicateDetection::FindNonDuplicates(), Context::GetNewCatStream(), and MergeNode< ValueType, Comparator, kNumInputs >::MainOp().
data::CatStreamPtr GetNewCatStream | ( | DIABase * | dia | ) |
Returns a reference to a new CatStream. This method alters the state of the context and must be called on all Workers to ensure correct communication coordination.
Definition at line 1213 of file context.cpp.
References DIABase::dia_id(), and Context::GetNewCatStream().
data::MixStreamPtr GetNewMixStream | ( | size_t | dia_id | ) |
Returns a reference to a new MixStream. This method alters the state of the context and must be called on all Workers to ensure correct communication coordination.
Definition at line 1217 of file context.cpp.
References Multiplexer::GetNewMixStream(), Context::local_worker_id_, and Context::multiplexer_.
Referenced by thrill::api::Context::GetNewStream< data::MixStream >(), Context::GetNewMixStream(), and SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::MainOp().
data::MixStreamPtr GetNewMixStream | ( | DIABase * | dia | ) |
Returns a reference to a new MixStream. This method alters the state of the context and must be called on all Workers to ensure correct communication coordination.
Definition at line 1221 of file context.cpp.
References DIABase::dia_id(), and Context::GetNewMixStream().
tlx::CountingPtr<Stream> GetNewStream | ( | size_t | dia_id | ) |
Returns a reference to a new CatStream or MixStream, selectable via template parameter.
|
inline |
Returns id of this host in the cluser A host is a machine in the cluster that hosts multiple workers
Definition at line 257 of file context.hpp.
Referenced by thrill::api::RunBackendLoopback().
void Launch | ( | const std::function< void(Context &)> & | job_startpoint | ) |
method used to launch a job's main procedure. it wraps it in log output.
Definition at line 1275 of file context.cpp.
References tlx::abs_diff(), Context::block_pool(), tlx::format_iec_units(), singleton< stats >::get_instance(), singleton< block_manager >::get_instance(), stats_data::get_read_bytes(), stats_data::get_write_bytes(), Context::local_host_id_, Context::local_worker_id_, LOG0, LOG1, Context::logger_, BlockPool::max_total_bytes(), block_manager::maximum_allocation(), Context::mem_config(), Context::my_rank(), Context::net, Context::net_manager_, FlowControlChannel::Reduce(), Traffic::rx, Manager::Traffic(), and Traffic::tx.
Referenced by thrill::api::RunBackendLoopback(), and thrill::api::RunLoopbackThreads().
|
inline |
id among all local hosts (in test program runs)
Definition at line 268 of file context.hpp.
|
inline |
Returns the local id ot this worker on the host A worker is locally identified by this id
Definition at line 263 of file context.hpp.
Referenced by Context::Context(), SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::MainOp(), ReadBinaryNode< ValueType >::VfsFileBlockSource::NextBlock(), and WriteBinaryNode< ValueType >::OpenNextFile().
|
inline |
host-global memory config
Definition at line 329 of file context.hpp.
Referenced by Context::Launch().
|
inline |
memory limit of this worker Context for local data structures
Definition at line 248 of file context.hpp.
|
inline |
returns the host-global memory manager
Definition at line 332 of file context.hpp.
|
inline |
Global rank of this worker among all other workers in the system.
Definition at line 243 of file context.hpp.
Referenced by CountTrianglesGenerated(), examples::suffix_sorting::DC3Recursive(), examples::suffix_sorting::DC7Recursive(), doubleSweepDiameter(), DistributeNode< ValueType >::Execute(), WriteLinesOneNode< ValueType >::Execute(), SampleNode< ValueType >::Execute(), ConcatNode< ValueType >::Execute(), thrill::api::FindStages(), GroupToIndexNode< ValueType, KeyExtractor, GroupFunction >::GroupToIndexNode(), JoinTPCH4(), Context::Launch(), main(), MergeNode< ValueType, Comparator, kNumInputs >::MainOp(), SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::MainOp(), CollapseNode< ValueType >::OnPreOpFile(), WriteBinaryNode< ValueType >::OpenNextFile(), Parallel(), Percentiles(), examples::select::PickPivots(), WriteLinesNode< ValueType >::PreOp(), MergeNode< ValueType, Comparator, kNumInputs >::Stats::Print(), OverlapWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::PushData(), SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::PushData(), DisjointWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::PushData(), RunHashWordCount(), RunJoinPageRankEdgePerLine(), RunKMeansFile(), RunKMeansGenerated(), RunPageRankEdgePerLine(), RunPageRankGenerated(), RunPageRankJoinGenerated(), RunStochasticGradFile(), RunStochasticGradGenerated(), RunWordCount(), examples::select::Select(), and WriteLinesNode< ValueType >::WriteLinesNode().
|
inline |
Definition at line 334 of file context.hpp.
Referenced by CountTrianglesGenerated(), JoinTPCH4(), main(), Percentiles(), RunHashWordCount(), RunJoinPageRankEdgePerLine(), RunKMeansFile(), RunKMeansGenerated(), RunPageRankJoinGenerated(), and RunWordCount().
|
inline |
Returns next_dia_id_ to generate DIA::id_ serial.
Definition at line 391 of file context.hpp.
|
inline |
Returns the total number of hosts.
Definition at line 233 of file context.hpp.
Referenced by CountTrianglesGenerated(), JoinTPCH4(), main(), Percentiles(), RunHashWordCount(), RunJoinPageRankEdgePerLine(), RunKMeansFile(), RunKMeansGenerated(), RunPageRankGenerated(), RunPageRankJoinGenerated(), and RunWordCount().
|
inline |
Global number of workers in the system.
Definition at line 251 of file context.hpp.
Referenced by ZipNode< ValueType, ZipFunction, Pad, UnequalCheck, NoRebalance, kNumInputs >::DoScatter(), RebalanceNode< ValueType >::Execute(), WriteLinesOneNode< ValueType >::Execute(), SampleNode< ValueType >::Execute(), ConcatNode< ValueType >::Execute(), SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::FindAndSendSplitters(), DuplicateDetection::FindNonDuplicates(), GatherNode< ValueType >::GatherNode(), GroupToIndexNode< ValueType, KeyExtractor, GroupFunction >::GroupToIndexNode(), main(), MergeNode< ValueType, Comparator, kNumInputs >::MainOp(), SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::MainOp(), Parallel(), examples::select::PickPivots(), GroupToIndexNode< ValueType, KeyExtractor, GroupFunction >::PreOp(), JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >::PreOp1(), JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >::PreOp2(), MergeNode< ValueType, Comparator, kNumInputs >::Stats::Print(), OverlapWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::PushData(), DisjointWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::PushData(), and ReadBinaryNode< ValueType >::ReadBinaryNode().
|
inline |
Perform collectives and print min, max, mean, stdev, and all local values.
Definition at line 352 of file context.hpp.
References LOG1, max(), and min().
Referenced by SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::Execute(), ZipNode< ValueType, ZipFunction, Pad, UnequalCheck, NoRebalance, kNumInputs >::MainOp(), ZipWithIndexNode< ValueType, ZipFunction >::PushData(), ZipNode< ValueType, ZipFunction, Pad, UnequalCheck, NoRebalance, kNumInputs >::PushData(), SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::PushData(), SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::ReceiveItems(), and SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::StopPreOp().
|
inline |
Returns the number of workers that is hosted on each host.
Definition at line 238 of file context.hpp.
Referenced by Context::Context().
|
friend |
Outputs the context as [host id]:[local worker id] to an std::ostream.
Definition at line 272 of file context.hpp.
Referenced by thrill::api::Context::GetNewStream< data::MixStream >().
common::JsonLogger base_logger_ |
base logger exclusive for this worker
Definition at line 457 of file context.hpp.
Referenced by Context::Context().
|
private |
|
private |
flag to set which enables selective consumption of DIA contents!
Definition at line 425 of file context.hpp.
|
private |
net::FlowControlChannelManager instance that is shared among workers
Definition at line 416 of file context.hpp.
|
private |
the number of valid DIA ids. 0 is reserved for invalid.
Definition at line 428 of file context.hpp.
|
private |
id among all local hosts (in test program runs)
Definition at line 395 of file context.hpp.
Referenced by Context::Launch().
|
private |
number of this host context, 0..p-1, within this host
Definition at line 398 of file context.hpp.
Referenced by Context::Context(), Context::GetFilePtr(), Context::GetNewCatStream(), Context::GetNewMixStream(), and Context::Launch().
common::JsonLogger logger_ |
public member which delivers key:value pairs as JSON log lines. this logger is local to this Context which is exclusive for one worker thread.
Definition at line 462 of file context.hpp.
Referenced by Context::Launch().
|
private |
memory configuration in HostContext
Definition at line 407 of file context.hpp.
|
private |
memory limit of this worker Context for local data structures
Definition at line 404 of file context.hpp.
|
private |
host-global memory manager
Definition at line 410 of file context.hpp.
|
private |
data::Multiplexer instance that is shared among workers
Definition at line 422 of file context.hpp.
Referenced by Context::GetNewCatStream(), and Context::GetNewMixStream().
public member which exposes all network primitives from FlowControlChannel for DOp implementations. Use it as context_.net.Method()
.
Definition at line 446 of file context.hpp.
Referenced by CountTrianglesGenerated(), HyperLogLogNode< p, ValueType >::Execute(), SizeNode< ValueType >::Execute(), AllReduceNode< ValueType, ReduceFunction >::Execute(), RebalanceNode< ValueType >::Execute(), WriteLinesOneNode< ValueType >::Execute(), SampleNode< ValueType >::Execute(), PrefixSumNode< ValueType, SumFunction, Inclusive >::Execute(), ZipWithIndexNode< ValueType, ZipFunction >::Execute(), OverlapWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::Execute(), ConcatNode< ValueType >::Execute(), DisjointWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::Execute(), DuplicateDetection::FindNonDuplicates(), MergeNode< ValueType, Comparator, kNumInputs >::GetGlobalRanks(), JoinTPCH4(), Context::Launch(), main(), ZipNode< ValueType, ZipFunction, Pad, UnequalCheck, NoRebalance, kNumInputs >::MainOp(), MergeNode< ValueType, Comparator, kNumInputs >::MainOp(), SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::MainOp(), Percentiles(), examples::select::PickPivots(), MergeNode< ValueType, Comparator, kNumInputs >::Stats::Print(), RunHashWordCount(), RunKMeansFile(), RunKMeansGenerated(), RunWordCount(), examples::select::Select(), and MergeNode< ValueType, Comparator, kNumInputs >::SelectPivots().
|
private |
net::Manager instance that is shared among workers
Definition at line 413 of file context.hpp.
Referenced by Context::Launch().
std::default_random_engine rng_ |
a random generator
Definition at line 435 of file context.hpp.
Referenced by SortNode< ValueType, CompareFunction, SortAlgorithm, Stable >::OnPreOpFile(), and MergeNode< ValueType, Comparator, kNumInputs >::SelectPivots().
|
private |
number of workers hosted per host
Definition at line 401 of file context.hpp.