Thrill  0.1
multiplexer_header.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/multiplexer_header.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Tobias Sturm <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_MULTIPLEXER_HEADER_HEADER
14 #define THRILL_DATA_MULTIPLEXER_HEADER_HEADER
15 
16 #include <thrill/data/block.hpp>
17 #include <thrill/data/stream.hpp>
21 
22 #include <cstdio>
23 #include <cstdlib>
24 #include <string>
25 
26 namespace thrill {
27 namespace data {
28 
29 #if defined(_MSC_VER)
30 #pragma pack(push, 1)
31 #endif
32 
33 //! \addtogroup data_layer
34 //! \{
35 
37 {
38 public:
39  static constexpr bool self_verify = common::g_self_verify;
40 
42  uint32_t size = 0;
43  uint32_t num_items = 0;
44  // previous two bits are packed with first_item
45  uint32_t first_item : 30;
46  //! typecode self verify
47  uint32_t typecode_verify : 1;
48  //! is last block piggybacked indicator
49  uint32_t is_last_block : 1;
50 
51  MultiplexerHeader() = default;
52 
53  explicit MultiplexerHeader(MagicByte m, const PinnedBlock& b)
54  : magic(m),
55  size(static_cast<uint32_t>(b.size())),
56  num_items(static_cast<uint32_t>(b.num_items())),
57  first_item(static_cast<uint32_t>(b.first_item_relative())),
59  if (!self_verify)
60  assert(!typecode_verify);
61  }
62 
63  static constexpr size_t header_size =
64  sizeof(MagicByte) + 3 * sizeof(uint32_t);
65 
66  static constexpr size_t total_size =
67  header_size + sizeof(size_t) + 3 * sizeof(uint32_t);
69 
70 static_assert(sizeof(MultiplexerHeader) == MultiplexerHeader::header_size,
71  "MultiplexerHeader has invalid size");
72 
73 /*!
74  * Block header is sent before a sequence of blocks it indicates the number of
75  * elements and their boundaries
76  *
77  * Provides a serializer and two partial deserializers. A
78  * StreamMultiplexerHeader with size = 0 marks the end of a stream.
79  */
81 {
82 public:
83  size_t stream_id = 0;
84  uint32_t receiver_local_worker = 0;
85  //! global worker rank of sender
86  uint32_t sender_worker = 0;
87  //! sequence number in Stream
88  uint32_t seq = 0;
89 
90  //! virtual worker which receives all-close messages
91  static const uint32_t all_workers = uint32_t(-1);
92  //! final sequence number
93  static const uint32_t final_seq = uint32_t(-1);
94 
95  StreamMultiplexerHeader() = default;
96 
98  MagicByte m, const PinnedBlock& b)
99  : MultiplexerHeader(m, b) { }
100 
101  //! Serializes the whole block struct into a buffer
102  void Serialize(net::BufferBuilder& bb) const {
104  bb.Put<StreamMultiplexerHeader>(*this);
105  }
106 
107  //! Reads the stream id and the number of elements in this block
109  return br.Get<StreamMultiplexerHeader>();
110  }
111 
112  //! Indicates if this is the end-of-line block header
113  bool IsEnd() const {
114  return size == 0;
115  }
116 
117  //! Indicates if this message is for all local workers
118  bool IsAllWorkers() const {
119  return receiver_local_worker == all_workers;
120  }
121 
122  //! Calculate the sender host_rank from sender_worker and workers_per_host.
123  size_t CalcHostRank(size_t workers_per_host) const {
124  return sender_worker / workers_per_host;
125  }
127 
129  "StreamMultiplexerHeader has invalid size");
130 
132 {
133 public:
134  size_t partition_set_id = 0;
135  // probably needed
136  // size_t partition_index = 0;
137  uint32_t receiver_local_worker = 0;
138  uint32_t sender_worker = 0;
139  //! sequence number in Stream
140  uint32_t seq = 0;
141 
142  PartitionMultiplexerHeader() = default;
143 
146 
147  //! Serializes the whole block struct into a buffer
148  void Serialize(net::BufferBuilder& bb) const {
150  bb.Put<PartitionMultiplexerHeader>(*this);
151  }
152 
153  //! Reads the stream id and the number of elements in this block
155  return br.Get<PartitionMultiplexerHeader>();
156  }
157 
158  //! Indicates if this is the end-of-line block header
159  bool IsEnd() const {
160  return size == 0;
161  }
163 
164 static_assert(
166  "PartitionMultiplexerHeader has invalid size");
167 
168 #if defined(_MSC_VER)
169 #pragma pack(pop)
170 #endif
171 
172 //! \}
173 
174 } // namespace data
175 } // namespace thrill
176 
177 #endif // !THRILL_DATA_MULTIPLEXER_HEADER_HEADER
178 
179 /******************************************************************************/
bool IsEnd() const
Indicates if this is the end-of-line block header.
StreamMultiplexerHeader(MagicByte m, const PinnedBlock &b)
void Serialize(net::BufferBuilder &bb) const
Serializes the whole block struct into a buffer.
bool IsAllWorkers() const
Indicates if this message is for all local workers.
bool IsEnd() const
Indicates if this is the end-of-line block header.
static StreamMultiplexerHeader Parse(net::BufferReader &br)
Reads the stream id and the number of elements in this block.
class thrill::data::MultiplexerHeader TLX_ATTRIBUTE_PACKED
static constexpr size_t total_size
static constexpr bool g_self_verify
Definition: config.hpp:32
BufferReader represents a BufferRef with an additional cursor with which the memory can be read incre...
static PartitionMultiplexerHeader Parse(net::BufferReader &br)
Reads the stream id and the number of elements in this block.
void Serialize(net::BufferBuilder &bb) const
Serializes the whole block struct into a buffer.
uint32_t typecode_verify
typecode self verify
uint32_t is_last_block
is last block piggybacked indicator
Block header is sent before a sequence of blocks it indicates the number of elements and their bounda...
BufferBuilder & Reserve(size_t n)
Make sure that at least n bytes are allocated.
static constexpr size_t header_size
BufferBuilder represents a dynamically growable area of memory, which can be modified by appending in...
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
MultiplexerHeader(MagicByte m, const PinnedBlock &b)
BufferBuilder & Put(const Type &item)
size_t CalcHostRank(size_t workers_per_host) const
Calculate the sender host_rank from sender_worker and workers_per_host.