Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
dispatcher_thread.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/dispatcher_thread.hpp
3  *
4  * Asynchronous callback wrapper around select(), epoll(), or other kernel-level
5  * dispatchers.
6  *
7  * Part of Project Thrill - http://project-thrill.org
8  *
9  * Copyright (C) 2015 Timo Bingmann <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_NET_DISPATCHER_THREAD_HEADER
16 #define THRILL_NET_DISPATCHER_THREAD_HEADER
17 
19 #include <thrill/data/block.hpp>
20 #include <thrill/mem/allocator.hpp>
21 #include <thrill/net/buffer.hpp>
23 #include <tlx/delegate.hpp>
24 
25 #include <string>
26 
27 namespace thrill {
28 namespace net {
29 
30 //! \addtogroup net_layer
31 //! \{
32 
33 //! Signature of timer callbacks.
35 
36 //! Signature of async connection readability/writability callbacks.
38 
39 //! Signature of async read callbacks.
42 
43 //! Signature of async read ByteBlock callbacks.
45  void(Connection& c, data::PinnedByteBlockPtr && block),
47 
48 //! Signature of async write callbacks.
51 
52 //! Signature of generic dispatcher callback.
55 
56 /*!
57  * DispatcherThread contains a net::Dispatcher object and an associated thread
58  * that runs in the dispatching loop.
59  */
61 {
62  static constexpr bool debug = false;
63 
64 public:
65  //! Signature of async jobs to be run by the dispatcher thread.
67 
69  std::unique_ptr<class Dispatcher> dispatcher,
70  size_t host_rank);
71 
73 
74  //! non-copyable: delete copy-constructor
75  DispatcherThread(const DispatcherThread&) = delete;
76  //! non-copyable: delete assignment operator
78 
79  //! Terminate the dispatcher thread (if now already done).
80  void Terminate();
81 
82  //! Run generic callback in dispatcher thread to enqueue stuff.
84 
85  //! \name Timeout Callbacks
86  //! \{
87 
88  //! Register a relative timeout callback
89  void AddTimer(std::chrono::milliseconds timeout, const TimerCallback& cb);
90 
91  //! \}
92 
93  //! \name Connection Callbacks
94  //! \{
95 
96  //! Register a buffered read callback and a default exception callback.
97  void AddRead(Connection& c, const AsyncCallback& read_cb);
98 
99  //! Register a buffered write callback and a default exception callback.
100  void AddWrite(Connection& c, const AsyncCallback& write_cb);
101 
102  //! Cancel all callbacks on a given connection.
103  void Cancel(Connection& c);
104 
105  //! \}
106 
107  //! \name Asynchronous Data Reader/Writer Callbacks
108  //! \{
109 
110  //! asynchronously read n bytes and deliver them to the callback
111  void AsyncRead(Connection& c, uint32_t seq, size_t size,
112  const AsyncReadCallback& done_cb);
113 
114  //! asynchronously read the full ByteBlock and deliver it to the callback
115  void AsyncRead(Connection& c, uint32_t seq, size_t size,
116  data::PinnedByteBlockPtr&& block,
117  const AsyncReadByteBlockCallback& done_cb);
118 
119  //! asynchronously write byte and block and callback when delivered. The
120  //! block is reference counted by the async writer.
121  void AsyncWrite(Connection& c, uint32_t seq, Buffer&& buffer,
122  const AsyncWriteCallback& done_cb = AsyncWriteCallback());
123 
124  //! asynchronously write TWO buffers and callback when delivered. The
125  //! buffer2 are MOVED into the async writer. This is most useful to write a
126  //! header and a payload Buffers that are hereby guaranteed to be written in
127  //! order.
128  void AsyncWrite(Connection& c, uint32_t seq,
129  Buffer&& buffer, data::PinnedBlock&& block,
130  const AsyncWriteCallback& done_cb = AsyncWriteCallback());
131 
132  //! asynchronously write buffer and callback when delivered. COPIES the data
133  //! into a Buffer!
134  void AsyncWriteCopy(
135  Connection& c, uint32_t seq, const void* buffer, size_t size,
136  const AsyncWriteCallback& done_cb = AsyncWriteCallback());
137 
138  //! asynchronously write buffer and callback when delivered. COPIES the data
139  //! into a Buffer!
140  void AsyncWriteCopy(
141  Connection& c, uint32_t seq, const std::string& str,
142  const AsyncWriteCallback& done_cb = AsyncWriteCallback());
143 
144  //! \}
145 
146 private:
147  //! Enqueue job in queue for dispatching thread to run at its discretion.
148  void Enqueue(Job&& job);
149 
150  //! What happens in the dispatcher thread
151  void Work();
152 
153  //! wake up select() in dispatching thread.
154  void WakeUpThread();
155 
156 private:
157  //! Queue of jobs to be run by dispatching thread at its discretion.
159 
160  //! thread of dispatcher
161  std::thread thread_;
162 
163  //! enclosed dispatcher.
164  std::unique_ptr<class Dispatcher> dispatcher_;
165 
166  //! termination flag
167  std::atomic<bool> terminate_ { false };
168 
169  //! whether to call Interrupt() in WakeUpThread()
170  std::atomic<bool> busy_ { false };
171 
172  //! for thread name for logging
173  size_t host_rank_;
174 };
175 
176 //! \}
177 
178 } // namespace net
179 } // namespace thrill
180 
181 #endif // !THRILL_NET_DISPATCHER_THREAD_HEADER
182 
183 /******************************************************************************/
DispatcherThread(std::unique_ptr< class Dispatcher > dispatcher, size_t host_rank)
void Work()
What happens in the dispatcher thread.
void AddWrite(Connection &c, const AsyncCallback &write_cb)
Register a buffered write callback and a default exception callback.
tlx::delegate< void(Connection &), mem::GPoolAllocator< char > > AsyncWriteCallback
Signature of async write callbacks.
Definition: dispatcher.hpp:58
void WakeUpThread()
wake up select() in dispatching thread.
std::unique_ptr< class Dispatcher > dispatcher_
enclosed dispatcher.
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:182
tlx::delegate< bool(), mem::GPoolAllocator< char > > AsyncCallback
Signature of async connection readability/writability callbacks.
Definition: dispatcher.hpp:45
DispatcherThread & operator=(const DispatcherThread &)=delete
non-copyable: delete assignment operator
void AddTimer(std::chrono::milliseconds timeout, const TimerCallback &cb)
Register a relative timeout callback.
std::atomic< bool > terminate_
termination flag
void Terminate()
Terminate the dispatcher thread (if now already done).
void AsyncRead(Connection &c, uint32_t seq, size_t size, const AsyncReadCallback &done_cb)
asynchronously read n bytes and deliver them to the callback
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
void AsyncWriteCopy(Connection &c, uint32_t seq, const void *buffer, size_t size, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
tlx::delegate< bool(), mem::GPoolAllocator< char > > TimerCallback
Signature of timer callbacks.
Definition: dispatcher.hpp:42
void Cancel(Connection &c)
Cancel all callbacks on a given connection.
void AsyncWrite(Connection &c, uint32_t seq, Buffer &&buffer, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
void AddRead(Connection &c, const AsyncCallback &read_cb)
Register a buffered read callback and a default exception callback.
This is a queue, similar to std::queue and tbb::concurrent_queue, except that it uses mutexes for syn...
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
tlx::delegate< void(Connection &c, Buffer &&buffer), mem::GPoolAllocator< char > > AsyncReadCallback
Signature of async read callbacks.
size_t host_rank_
for thread name for logging
std::atomic< bool > busy_
whether to call Interrupt() in WakeUpThread()
void Enqueue(Job &&job)
Enqueue job in queue for dispatching thread to run at its discretion.
common::ConcurrentQueue< Job, mem::GPoolAllocator< Job > > jobqueue_
Queue of jobs to be run by dispatching thread at its discretion.
Dispatcher is a high level wrapper for asynchronous callback processing.
Definition: dispatcher.hpp:480
void RunInThread(const AsyncDispatcherThreadCallback &cb)
Run generic callback in dispatcher thread to enqueue stuff.
std::thread thread_
thread of dispatcher
DispatcherThread contains a net::Dispatcher object and an associated thread that runs in the dispatch...