Thrill  0.1
dispatcher_thread.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/dispatcher_thread.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
13 #include <thrill/net/group.hpp>
14 
15 #include <deque>
16 #include <string>
17 #include <vector>
18 
19 namespace thrill {
20 namespace net {
21 
23  std::unique_ptr<class Dispatcher> dispatcher, size_t host_rank)
24  : dispatcher_(std::move(dispatcher)),
25  host_rank_(host_rank) {
26  // start thread
27  thread_ = std::thread(&DispatcherThread::Work, this);
28 }
29 
31  Terminate();
32 }
33 
35  if (terminate_) return;
36 
37  // set termination flags.
38  terminate_ = true;
39  // interrupt select().
40  WakeUpThread();
41  // wait for last round to finish.
42  thread_.join();
43 }
44 
46  Enqueue([this, cb = std::move(cb)]() {
47  cb(*dispatcher_);
48  });
49  WakeUpThread();
50 }
51 
53  std::chrono::milliseconds timeout, const TimerCallback& cb) {
54  Enqueue([=]() {
55  dispatcher_->AddTimer(timeout, cb);
56  });
57  WakeUpThread();
58 }
59 
61  Enqueue([=, &c]() {
62  dispatcher_->AddRead(c, read_cb);
63  });
64  WakeUpThread();
65 }
66 
68  Enqueue([=, &c]() {
69  dispatcher_->AddWrite(c, write_cb);
70  });
71  WakeUpThread();
72 }
73 
75  Enqueue([=, &c]() {
76  dispatcher_->Cancel(c);
77  });
78  WakeUpThread();
79 }
80 
82  Connection& c, uint32_t seq, size_t size,
83  const AsyncReadCallback& done_cb) {
84  Enqueue([=, &c]() {
85  dispatcher_->AsyncRead(c, seq, size, done_cb);
86  });
87  WakeUpThread();
88 }
89 
91  Connection& c, uint32_t seq, size_t size, data::PinnedByteBlockPtr&& block,
92  const AsyncReadByteBlockCallback& done_cb) {
93  assert(block.valid());
94  Enqueue([=, &c, b = std::move(block)]() mutable {
95  dispatcher_->AsyncRead(c, seq, size, std::move(b), done_cb);
96  });
97  WakeUpThread();
98 }
99 
101  Connection& c, uint32_t seq, Buffer&& buffer, const AsyncWriteCallback& done_cb) {
102  // the following captures the move-only buffer in a lambda.
103  Enqueue([=, &c, b = std::move(buffer)]() mutable {
104  dispatcher_->AsyncWrite(c, seq, std::move(b), done_cb);
105  });
106  WakeUpThread();
107 }
108 
110  Connection& c, uint32_t seq, Buffer&& buffer, data::PinnedBlock&& block,
111  const AsyncWriteCallback& done_cb) {
112  assert(block.IsValid());
113  // the following captures the move-only buffer in a lambda.
114  Enqueue([=, &c,
115  b1 = std::move(buffer), b2 = std::move(block)]() mutable {
116  dispatcher_->AsyncWrite(c, seq, std::move(b1));
117  dispatcher_->AsyncWrite(c, seq + 1, std::move(b2), done_cb);
118  });
119  WakeUpThread();
120 }
121 
123  Connection& c, uint32_t seq, const void* buffer, size_t size,
124  const AsyncWriteCallback& done_cb) {
125  return AsyncWrite(c, seq, Buffer(buffer, size), done_cb);
126 }
127 
129  Connection& c, uint32_t seq,
130  const std::string& str, const AsyncWriteCallback& done_cb) {
131  return AsyncWriteCopy(c, seq, str.data(), str.size(), done_cb);
132 }
133 
135  return jobqueue_.push(std::move(job));
136 }
137 
140  "host " + std::to_string(host_rank_) + " dispatcher");
141  // pin DispatcherThread to last core
142  common::SetCpuAffinity(std::thread::hardware_concurrency() - 1);
143 
144  while (!terminate_ ||
145  dispatcher_->HasAsyncWrites() || !jobqueue_.empty())
146  {
147  // process jobs in jobqueue_
148  {
149  Job job;
150  while (jobqueue_.try_pop(job))
151  job();
152  }
153 
154  // set busy flag, but check once again for jobs.
155  busy_ = true;
156  {
157  Job job;
158  if (jobqueue_.try_pop(job)) {
159  busy_ = false;
160  job();
161  continue;
162  }
163  }
164 
165  // run one dispatch
166  dispatcher_->Dispatch();
167 
168  busy_ = false;
169  }
170 
171  LOG << "DispatcherThread finished.";
172 }
173 
175  if (busy_)
176  dispatcher_->Interrupt();
177 }
178 
179 } // namespace net
180 } // namespace thrill
181 
182 /******************************************************************************/
DispatcherThread(std::unique_ptr< class Dispatcher > dispatcher, size_t host_rank)
void Work()
What happens in the dispatcher thread.
void AddWrite(Connection &c, const AsyncCallback &write_cb)
Register a buffered write callback and a default exception callback.
void WakeUpThread()
wake up select() in dispatching thread.
std::unique_ptr< class Dispatcher > dispatcher_
enclosed dispatcher.
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:205
STL namespace.
void NameThisThread(const std::string &name)
Defines a name for the current thread, only if no name was set previously.
Definition: logger.cpp:40
static by_string to_string(int val)
convert to string
void AddTimer(std::chrono::milliseconds timeout, const TimerCallback &cb)
Register a relative timeout callback.
std::atomic< bool > terminate_
termination flag
void Terminate()
Terminate the dispatcher thread (if now already done).
void AsyncRead(Connection &c, uint32_t seq, size_t size, const AsyncReadCallback &done_cb)
asynchronously read n bytes and deliver them to the callback
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
void AsyncWriteCopy(Connection &c, uint32_t seq, const void *buffer, size_t size, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
void Cancel(Connection &c)
Cancel all callbacks on a given connection.
void AsyncWrite(Connection &c, uint32_t seq, Buffer &&buffer, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
void AddRead(Connection &c, const AsyncCallback &read_cb)
Register a buffered read callback and a default exception callback.
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
size_t host_rank_
for thread name for logging
std::atomic< bool > busy_
whether to call Interrupt() in WakeUpThread()
void Enqueue(Job &&job)
Enqueue job in queue for dispatching thread to run at its discretion.
common::ConcurrentQueue< Job, mem::GPoolAllocator< Job > > jobqueue_
Queue of jobs to be run by dispatching thread at its discretion.
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
void SetCpuAffinity(std::thread &thread, size_t cpu_id)
set cpu/core affinity of a thread
Definition: porting.cpp:111
void RunInThread(const AsyncDispatcherThreadCallback &cb)
Run generic callback in dispatcher thread to enqueue stuff.
std::thread thread_
thread of dispatcher