Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
stream_data.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/stream_data.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015-2017 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/data/stream.hpp>
13 
16 
17 namespace thrill {
18 namespace data {
19 
20 /******************************************************************************/
21 // StreamData
22 
23 StreamData::StreamData(Multiplexer& multiplexer, size_t send_size_limit,
24  const StreamId& id,
25  size_t local_worker_id, size_t dia_id)
26  : sem_queue_(send_size_limit),
27  id_(id),
28  local_worker_id_(local_worker_id),
29  dia_id_(dia_id),
30  multiplexer_(multiplexer)
31 { }
32 
33 StreamData::~StreamData() = default;
34 
35 void StreamData::OnAllClosed(const char* stream_type) {
37  << "class" << "StreamData"
38  << "event" << "close"
39  << "id" << id_
40  << "type" << stream_type
41  << "dia_id" << dia_id_
42  << "worker_rank"
45  << "rx_net_items" << rx_net_items_
46  << "rx_net_bytes" << rx_net_bytes_
47  << "rx_net_blocks" << rx_net_blocks_
48  << "tx_net_items" << tx_net_items_
49  << "tx_net_bytes" << tx_net_bytes_
50  << "tx_net_blocks" << tx_net_blocks_
51  << "rx_int_items" << rx_int_items_
52  << "rx_int_bytes" << rx_int_bytes_
53  << "rx_int_blocks" << rx_int_blocks_
54  << "tx_int_items" << tx_int_items_
55  << "tx_int_bytes" << tx_int_bytes_
56  << "tx_int_blocks" << tx_int_blocks_;
57 }
58 
59 /******************************************************************************/
60 // StreamData::Writers
61 
62 StreamData::Writers::Writers(size_t my_worker_rank)
63  : my_worker_rank_(my_worker_rank)
64 { }
65 
67  // close BlockWriters in a cyclic fashion
68  size_t s = size();
69  for (size_t i = 0; i < s; ++i) {
70  operator [] ((i + my_worker_rank_) % s).Close();
71  }
72 }
73 
75  Close();
76 }
77 
78 } // namespace data
79 } // namespace thrill
80 
81 /******************************************************************************/
std::atomic< size_t > rx_net_items_
common::JsonLogger & logger()
Get the JsonLogger from the BlockPool.
std::atomic< size_t > tx_net_items_
std::atomic< size_t > rx_int_items_
std::atomic< size_t > rx_net_bytes_
Multiplexer & multiplexer_
reference to multiplexer
StreamData(Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Definition: stream_data.cpp:23
size_t my_host_rank() const
Returns my_host_rank.
Definition: stream_data.hpp:90
std::atomic< size_t > tx_int_bytes_
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:66
std::atomic< size_t > rx_int_bytes_
std::atomic< size_t > tx_net_bytes_
~Writers()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:74
Writers(size_t my_worker_rank=0)
Definition: stream_data.cpp:62
StreamId id_
our own stream id.
virtual void Close()=0
shuts the stream down.
void OnAllClosed(const char *stream_type)
Definition: stream_data.cpp:35
size_t dia_id_
Associated DIANode id.
std::atomic< size_t > rx_int_blocks_
std::atomic< size_t > tx_net_blocks_
size_t workers_per_host() const
number of workers per host
size_t StreamId
Definition: stream_data.hpp:32
std::atomic< size_t > tx_int_blocks_
std::atomic< size_t > rx_net_blocks_
std::atomic< size_t > tx_int_items_