Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
multiplexer_header.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/multiplexer_header.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Tobias Sturm <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_MULTIPLEXER_HEADER_HEADER
14 #define THRILL_DATA_MULTIPLEXER_HEADER_HEADER
15 
16 #include <thrill/data/block.hpp>
17 #include <thrill/data/stream.hpp>
21 
22 #include <cstdio>
23 #include <cstdlib>
24 #include <string>
25 
26 namespace thrill {
27 namespace data {
28 
29 #if defined(_MSC_VER)
30 #pragma pack(push, 1)
31 #endif
32 
33 //! \addtogroup data_layer
34 //! \{
35 
37 {
38 public:
39  static constexpr bool self_verify = common::g_self_verify;
40 
42  uint32_t size = 0;
43  uint32_t num_items = 0;
44  // previous two bits are packed with first_item
45  uint32_t first_item : 30;
46  //! typecode self verify
47  uint32_t typecode_verify : 1;
48  //! is last block piggybacked indicator
49  uint32_t is_last_block : 1;
50 
51  MultiplexerHeader() = default;
52 
53  explicit MultiplexerHeader(MagicByte m, const PinnedBlock& b)
54  : magic(m),
55  size(static_cast<uint32_t>(b.size())),
56  num_items(static_cast<uint32_t>(b.num_items())),
57  first_item(static_cast<uint32_t>(b.first_item_relative())),
59  if (!self_verify)
60  assert(!typecode_verify);
61  }
62 
63  static constexpr size_t header_size =
64  sizeof(MagicByte) + 3 * sizeof(uint32_t);
65 
66  static constexpr size_t total_size =
67  header_size + sizeof(size_t) + 3 * sizeof(uint32_t);
69 
70 static_assert(sizeof(MultiplexerHeader) == MultiplexerHeader::header_size,
71  "MultiplexerHeader has invalid size");
72 
73 /*!
74  * Block header is sent before a sequence of blocks it indicates the number of
75  * elements and their boundaries
76  *
77  * Provides a serializer and two partial deserializers. A
78  * StreamMultiplexerHeader with size = 0 marks the end of a stream.
79  */
81 {
82 public:
83  size_t stream_id = 0;
84  uint32_t receiver_local_worker = 0;
85  //! global worker rank of sender
86  uint32_t sender_worker = 0;
87  //! sequence number in Stream
88  uint32_t seq = 0;
89 
90  StreamMultiplexerHeader() = default;
91 
93  MagicByte m, const PinnedBlock& b)
94  : MultiplexerHeader(m, b) { }
95 
96  //! Serializes the whole block struct into a buffer
97  void Serialize(net::BufferBuilder& bb) const {
99  bb.Put<StreamMultiplexerHeader>(*this);
100  }
101 
102  //! Reads the stream id and the number of elements in this block
104  return br.Get<StreamMultiplexerHeader>();
105  }
106 
107  //! Indicates if this is the end-of-line block header
108  bool IsEnd() const {
109  return size == 0;
110  }
111 
112  //! Calculate the sender host_rank from sender_worker and workers_per_host.
113  size_t CalcHostRank(size_t workers_per_host) const {
114  return sender_worker / workers_per_host;
115  }
117 
118 static_assert(sizeof(StreamMultiplexerHeader) == MultiplexerHeader::total_size,
119  "StreamMultiplexerHeader has invalid size");
120 
122 {
123 public:
124  size_t partition_set_id = 0;
125  // probably needed
126  // size_t partition_index = 0;
127  uint32_t receiver_local_worker = 0;
128  uint32_t sender_worker = 0;
129  //! sequence number in Stream
130  uint32_t seq = 0;
131 
132  PartitionMultiplexerHeader() = default;
133 
136 
137  //! Serializes the whole block struct into a buffer
138  void Serialize(net::BufferBuilder& bb) const {
140  bb.Put<PartitionMultiplexerHeader>(*this);
141  }
142 
143  //! Reads the stream id and the number of elements in this block
145  return br.Get<PartitionMultiplexerHeader>();
146  }
147 
148  //! Indicates if this is the end-of-line block header
149  bool IsEnd() const {
150  return size == 0;
151  }
153 
154 static_assert(
155  sizeof(PartitionMultiplexerHeader) == MultiplexerHeader::total_size,
156  "PartitionMultiplexerHeader has invalid size");
157 
158 #if defined(_MSC_VER)
159 #pragma pack(pop)
160 #endif
161 
162 //! \}
163 
164 } // namespace data
165 } // namespace thrill
166 
167 #endif // !THRILL_DATA_MULTIPLEXER_HEADER_HEADER
168 
169 /******************************************************************************/
StreamMultiplexerHeader(MagicByte m, const PinnedBlock &b)
uint32_t sender_worker
global worker rank of sender
static StreamMultiplexerHeader Parse(net::BufferReader &br)
Reads the stream id and the number of elements in this block.
void Serialize(net::BufferBuilder &bb) const
Serializes the whole block struct into a buffer.
void Serialize(net::BufferBuilder &bb) const
Serializes the whole block struct into a buffer.
uint32_t seq
sequence number in Stream
class thrill::data::MultiplexerHeader TLX_ATTRIBUTE_PACKED
bool IsEnd() const
Indicates if this is the end-of-line block header.
static constexpr size_t total_size
static constexpr bool g_self_verify
Definition: config.hpp:32
BufferReader represents a BufferRef with an additional cursor with which the memory can be read incre...
static PartitionMultiplexerHeader Parse(net::BufferReader &br)
Reads the stream id and the number of elements in this block.
uint32_t typecode_verify
typecode self verify
uint32_t seq
sequence number in Stream
uint32_t is_last_block
is last block piggybacked indicator
Block header is sent before a sequence of blocks it indicates the number of elements and their bounda...
bool IsEnd() const
Indicates if this is the end-of-line block header.
BufferBuilder & Reserve(size_t n)
Make sure that at least n bytes are allocated.
size_t CalcHostRank(size_t workers_per_host) const
Calculate the sender host_rank from sender_worker and workers_per_host.
static constexpr size_t header_size
BufferBuilder represents a dynamically growable area of memory, which can be modified by appending in...
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
MultiplexerHeader(MagicByte m, const PinnedBlock &b)
BufferBuilder & Put(const Type &item)