Thrill  0.1
group.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/mpi/group.hpp
3  *
4  * A Thrill network layer Implementation which uses MPI to transmit messages to
5  * peers. Since MPI implementations are very bad at multi-threading, this
6  * implementation is not recommended: it sequentialized all calls to the MPI
7  * library (such that it does not deadlock), which _requires_ a busy-waiting
8  * loop for new messages.
9  *
10  * Due to this restriction, the mpi::Group allows only **one Thrill host**
11  * within a system process. We cannot start independent test threads as MPI
12  * would not distinguish them.
13  *
14  * Part of Project Thrill - http://project-thrill.org
15  *
16  * Copyright (C) 2015 Timo Bingmann <[email protected]>
17  *
18  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
19  ******************************************************************************/
20 
21 #pragma once
22 #ifndef THRILL_NET_MPI_GROUP_HEADER
23 #define THRILL_NET_MPI_GROUP_HEADER
24 
26 #include <thrill/net/group.hpp>
27 
28 #include <algorithm>
29 #include <string>
30 #include <vector>
31 
32 namespace thrill {
33 namespace net {
34 namespace mpi {
35 
36 //! \addtogroup net_mpi MPI Network API
37 //! \ingroup net
38 //! \{
39 
40 class Group;
41 
42 /*!
43  * A derived exception class which looks up MPI error strings.
44  */
45 class Exception : public net::Exception
46 {
47 public:
48  explicit Exception(const std::string& what)
49  : net::Exception(what) { }
50 
51  Exception(const std::string& what, int error_code);
52 
53  //! return the MPI error string
54  static std::string GetErrorString(int error_code);
55 };
56 
57 /*!
58  * Virtual MPI connection class. As MPI has no real connections, this class is
59  * just the integer which selected an MPI peer. Additionally, it contains the
60  * group tag used to separate communication into groups.
61  */
62 class Connection final : public net::Connection
63 {
64  static constexpr bool debug = false;
65 
66 public:
67  //! construct from group tag and MPI peer
68  void Initialize(Group* group, int peer) {
69  group_ = group;
70  peer_ = peer;
71  }
72 
73  //! \name Base Status Functions
74  //! \{
75 
76  bool IsValid() const final { return true; }
77 
78  //! return the MPI peer number
79  int peer() const { return peer_; }
80 
81  std::string ToString() const final;
82 
83  std::ostream& OutputOstream(std::ostream& os) const final;
84 
85  //! \}
86 
87  //! \name Send Functions
88  //! \{
89 
90  void SyncSend(
91  const void* data, size_t size, Flags /* flags */ = NoFlags) final;
92 
93  ssize_t SendOne(
94  const void* data, size_t size, Flags flags = NoFlags) final {
95  SyncSend(data, size, flags);
96  return size;
97  }
98 
99  //! \}
100 
101  //! \name Receive Functions
102  //! \{
103 
104  void SyncRecv(void* out_data, size_t size) final;
105 
106  ssize_t RecvOne(void* out_data, size_t size) final {
107  SyncRecv(out_data, size);
108  return size;
109  }
110 
111  //! \}
112 
113  //! \name Paired SendReceive Methods
114  //! \{
115 
116  void SyncSendRecv(const void* send_data, size_t send_size,
117  void* recv_data, size_t recv_size) final;
118  void SyncRecvSend(const void* send_data, size_t send_size,
119  void* recv_data, size_t recv_size) final;
120 
121  //! \}
122 
123 private:
124  //! Group reference
126 
127  //! Outgoing peer id of this Connection.
128  int peer_;
129 };
130 
131 /*!
132  * A net group backed by virtual MPI connection. As MPI already sets up
133  * communication, not much is done. Each Group communicates using a unique MPI
134  * tag, the group id. Each host's rank within the group is plaining its MPI
135  * rank.
136  */
137 class Group final : public net::Group
138 {
139  static constexpr bool debug = false;
140 
141 public:
142  //! \name Base Functions
143  //! \{
144 
145  //! Initialize a Group for the given size and rank
146  Group(size_t my_rank, int group_tag, size_t group_size,
147  DispatcherThread& dispatcher)
148  : net::Group(my_rank),
149  group_tag_(group_tag),
150  conns_(group_size),
151  dispatcher_(dispatcher) {
152  // create virtual connections
153  for (size_t i = 0; i < group_size; ++i)
154  conns_[i].Initialize(this, static_cast<int>(i));
155  }
156 
157  //! return MPI tag used to communicate
158  int group_tag() const { return group_tag_; }
159 
160  //! number of hosts configured.
161  size_t num_hosts() const final { return conns_.size(); }
162 
163  //! reference to the main MPI dispatcher thread
164  DispatcherThread& dispatcher() { return dispatcher_; }
165 
166  net::Connection& connection(size_t peer) final {
167  assert(peer < conns_.size());
168  return conns_[peer];
169  }
170 
171  void Close() final { }
172 
173  //! Number of parallel sends or recvs requests supported by net backend
174  size_t num_parallel_async() const final;
175 
176  //! Construct a network dispatcher object for the network backend used by
177  //! this group, matching its internal implementation. A dispatcher may be
178  //! shared between groups of the same type.
179  std::unique_ptr<net::Dispatcher> ConstructDispatcher() const final;
180 
181  //! run a MPI_Barrier() for synchronization.
182  void Barrier();
183 
184  //! \}
185 
186 private:
187  //! this group's MPI tag
189 
190  //! vector of virtual connection objects to remote peers
191  std::vector<Connection> conns_;
192 
193  //! reference to the main MPI dispatcher thread
195 
196  template <typename MpiCall>
197  void WaitForRequest(MpiCall call);
198 
199  //! \name Virtual Synchronous Collectives to Override Implementations
200  //! \{
201 
202 /*[[[perl
203  for my $e (
204  ["int", "Int"], ["unsigned int", "UnsignedInt"],
205  ["long", "Long"], ["unsigned long", "UnsignedLong"],
206  ["long long", "LongLong"], ["unsigned long long", "UnsignedLongLong"])
207  {
208  print "void PrefixSumPlus$$e[1]($$e[0]& value, const $$e[0]& initial) final;\n";
209  print "void ExPrefixSumPlus$$e[1]($$e[0]& value, const $$e[0]& initial) final;\n";
210  print "void Broadcast$$e[1]($$e[0]& value, size_t origin) final;\n";
211  print "void AllReducePlus$$e[1]($$e[0]& value) final;\n";
212  print "void AllReduceMinimum$$e[1]($$e[0]& value) final;\n";
213  print "void AllReduceMaximum$$e[1]($$e[0]& value) final;\n";
214  }
215 ]]]*/
216  void PrefixSumPlusInt(int& value, const int& initial) final;
217  void ExPrefixSumPlusInt(int& value, const int& initial) final;
218  void BroadcastInt(int& value, size_t origin) final;
219  void AllReducePlusInt(int& value) final;
220  void AllReduceMinimumInt(int& value) final;
221  void AllReduceMaximumInt(int& value) final;
222  void PrefixSumPlusUnsignedInt(unsigned int& value, const unsigned int& initial) final;
223  void ExPrefixSumPlusUnsignedInt(unsigned int& value, const unsigned int& initial) final;
224  void BroadcastUnsignedInt(unsigned int& value, size_t origin) final;
225  void AllReducePlusUnsignedInt(unsigned int& value) final;
226  void AllReduceMinimumUnsignedInt(unsigned int& value) final;
227  void AllReduceMaximumUnsignedInt(unsigned int& value) final;
228  void PrefixSumPlusLong(long& value, const long& initial) final;
229  void ExPrefixSumPlusLong(long& value, const long& initial) final;
230  void BroadcastLong(long& value, size_t origin) final;
231  void AllReducePlusLong(long& value) final;
232  void AllReduceMinimumLong(long& value) final;
233  void AllReduceMaximumLong(long& value) final;
234  void PrefixSumPlusUnsignedLong(unsigned long& value, const unsigned long& initial) final;
235  void ExPrefixSumPlusUnsignedLong(unsigned long& value, const unsigned long& initial) final;
236  void BroadcastUnsignedLong(unsigned long& value, size_t origin) final;
237  void AllReducePlusUnsignedLong(unsigned long& value) final;
238  void AllReduceMinimumUnsignedLong(unsigned long& value) final;
239  void AllReduceMaximumUnsignedLong(unsigned long& value) final;
240  void PrefixSumPlusLongLong(long long& value, const long long& initial) final;
241  void ExPrefixSumPlusLongLong(long long& value, const long long& initial) final;
242  void BroadcastLongLong(long long& value, size_t origin) final;
243  void AllReducePlusLongLong(long long& value) final;
244  void AllReduceMinimumLongLong(long long& value) final;
245  void AllReduceMaximumLongLong(long long& value) final;
246  void PrefixSumPlusUnsignedLongLong(unsigned long long& value, const unsigned long long& initial) final;
247  void ExPrefixSumPlusUnsignedLongLong(unsigned long long& value, const unsigned long long& initial) final;
248  void BroadcastUnsignedLongLong(unsigned long long& value, size_t origin) final;
249  void AllReducePlusUnsignedLongLong(unsigned long long& value) final;
250  void AllReduceMinimumUnsignedLongLong(unsigned long long& value) final;
251  void AllReduceMaximumUnsignedLongLong(unsigned long long& value) final;
252 // [[[end]]]
253 
254  //! \}
255 };
256 
257 /*!
258  * Construct Group which connects to peers using MPI. As the MPI environment
259  * already defines the connections, no hosts or parameters can be
260  * given. Constructs group_count mpi::Group objects at once. Within each Group
261  * this host has its MPI rank.
262  *
263  * To enable tests with smaller group sizes, the Construct method takes
264  * group_size and returns a Group with *less* hosts than actual MPI processes!
265  * Obviously, group_size must be less-or-equal to the number of processes
266  * started with mpirun -np.
267  */
268 bool Construct(size_t group_size, DispatcherThread& dispatcher,
269  std::unique_ptr<Group>* groups, size_t group_count);
270 
271 /*!
272  * Return the number of MPI processes. This is the maximum group size.
273  */
274 size_t NumMpiProcesses();
275 
276 //! Return the rank of this process in the MPI COMM WORLD.
277 size_t MpiRank();
278 
279 //! \}
280 
281 } // namespace mpi
282 } // namespace net
283 } // namespace thrill
284 
285 #endif // !THRILL_NET_MPI_GROUP_HEADER
286 
287 /******************************************************************************/
int peer_
Outgoing peer id of this Connection.
Definition: group.hpp:128
size_t num_hosts() const final
number of hosts configured.
Definition: group.hpp:161
DispatcherThread & dispatcher_
reference to the main MPI dispatcher thread
Definition: group.hpp:194
ssize_t RecvOne(void *out_data, size_t size) final
Definition: group.hpp:106
A derived exception class which looks up MPI error strings.
Definition: group.hpp:45
ssize_t SendOne(const void *data, size_t size, Flags flags=NoFlags) final
Definition: group.hpp:93
int group_tag_
this group&#39;s MPI tag
Definition: group.hpp:188
A Exception is thrown by Connection on all errors instead of returning error codes.
Definition: exception.hpp:30
std::vector< Connection > conns_
vector of virtual connection objects to remote peers
Definition: group.hpp:191
int value
Definition: gen_data.py:41
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
Group(size_t my_rank, int group_tag, size_t group_size, DispatcherThread &dispatcher)
Initialize a Group for the given size and rank.
Definition: group.hpp:146
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static constexpr bool debug
size_t MpiRank()
Return the rank of this process in the MPI COMM WORLD.
Definition: group.cpp:714
void Close() final
Close.
Definition: group.hpp:171
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
Definition: group.hpp:47
A net group backed by virtual MPI connection.
Definition: group.hpp:137
int peer() const
return the MPI peer number
Definition: group.hpp:79
Exception(const std::string &what)
Definition: group.hpp:48
static void Initialize()
run MPI_Init() if not already done (can be called multiple times).
Definition: group.cpp:641
int group_tag() const
return MPI tag used to communicate
Definition: group.hpp:158
Flags
Additional flags for sending or receiving.
Definition: connection.hpp:61
size_t NumMpiProcesses()
Return the number of MPI processes.
Definition: group.cpp:701
static std::string GetErrorString(int error_code)
return the MPI error string
Definition: group.cpp:36
Virtual MPI connection class.
Definition: group.hpp:62
net::Connection & connection(size_t peer) final
Return Connection to client id.
Definition: group.hpp:166
bool IsValid() const final
check whether the connection is (still) valid.
Definition: group.hpp:76
void Initialize(Group *group, int peer)
construct from group tag and MPI peer
Definition: group.hpp:68
Group * group_
Group reference.
Definition: group.hpp:125
DispatcherThread contains a net::Dispatcher object and an associated thread that runs in the dispatch...
DispatcherThread & dispatcher()
reference to the main MPI dispatcher thread
Definition: group.hpp:164
bool Construct(size_t group_size, DispatcherThread &dispatcher, std::unique_ptr< Group > *groups, size_t group_count)
Construct Group which connects to peers using MPI.
Definition: group.cpp:675