Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
block_queue.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/block_queue.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
12 
13 namespace thrill {
14 namespace data {
15 
16 /******************************************************************************/
17 // BlockQueue
18 
19 BlockQueue::BlockQueue(BlockPool& block_pool, size_t local_worker_id,
20  size_t dia_id,
21  const CloseCallback& close_callback)
22  : BlockSink(block_pool, local_worker_id),
23  file_(block_pool, local_worker_id, dia_id),
24  close_callback_(close_callback) {
25  assert(local_worker_id < block_pool.workers_per_host());
26 }
27 
29  assert(!write_closed_);
30  write_closed_ = true;
31 
33 
34  // enqueue a closing Block.
35  queue_.emplace();
36 
37  if (close_callback_) {
38  close_callback_(*this);
40  }
41 }
42 
44  return Writer(BlockQueueSink(this), block_size);
45 }
46 
48  assert(!read_closed_);
49  return ConsumeReader(ConsumeBlockQueueSource(*this, local_worker_id));
50 }
51 
52 DynBlockSource BlockQueue::GetBlockSource(bool consume, size_t local_worker_id) {
53  if (consume && !read_closed_) {
54  // set to consume, and BlockQueue has not been read.
55  sLOG << "BlockQueue::GetBlockSource() consume, from queue.";
56  return ConstructDynBlockSource<ConsumeBlockQueueSource>(
57  *this, local_worker_id);
58  }
59  else if (consume && read_closed_) {
60  // consume the File, BlockQueue was already read.
61  sLOG << "BlockQueue::GetBlockSource() consume, from cache:"
62  << file_.num_items();
63  return ConstructDynBlockSource<ConsumeFileBlockSource>(
65  }
66  else if (!consume && !read_closed_) {
67  // non-consumer but the BlockQueue has not been read.
68  sLOG << "BlockQueue::GetBlockSource() non-consume, from queue.";
69  return ConstructDynBlockSource<CacheBlockQueueSource>(
70  this, local_worker_id);
71  }
72  else if (!consume && read_closed_) {
73  // non-consumer: reread the file that was cached.
74  sLOG << "BlockQueue::GetBlockSource() non-consume, from cache:"
75  << file_.num_items();
76  return ConstructDynBlockSource<KeepFileBlockSource>(
78  }
79  else {
80  // impossible
81  abort();
82  }
83 }
84 
85 BlockQueue::Reader BlockQueue::GetReader(bool consume, size_t local_worker_id) {
86  return DynBlockReader(GetBlockSource(consume, local_worker_id));
87 }
88 
89 /******************************************************************************/
90 // ConsumeBlockQueueSource
91 
93  BlockQueue& queue, size_t local_worker_id)
94  : queue_(queue), local_worker_id_(local_worker_id) { }
95 
96 void ConsumeBlockQueueSource::Prefetch(size_t /* prefetch */) {
97  // not supported yet. TODO(tb)
98 }
99 
101  Block b = queue_.Pop();
102  LOG << "ConsumeBlockQueueSource::NextBlock() " << b;
103 
104  if (!b.IsValid()) return PinnedBlock();
105  return b.PinWait(local_worker_id_);
106 }
107 
108 /******************************************************************************/
109 // CacheBlockQueueSource
110 
112  : queue_(queue), local_worker_id_(local_worker_id) { }
113 
114 //! move-constructor: default
116  : queue_(s.queue_), local_worker_id_(s.local_worker_id_) {
117  s.queue_ = nullptr;
118 }
119 
120 void CacheBlockQueueSource::Prefetch(size_t /* prefetch */) {
121  // not supported yet. TODO(tb)
122 }
123 
125  LOG << "CacheBlockQueueSource[" << this << "]::NextBlock() closed " << queue_->read_closed();
126  Block b = queue_->Pop();
127  LOG << "CacheBlockQueueSource[" << this << "]::NextBlock() " << b;
128 
129  // cache block in file_ (but not the termination block from the queue)
130  if (b.IsValid())
132 
133  if (!b.IsValid())
134  return PinnedBlock();
135 
136  return b.PinWait(local_worker_id_);
137 }
138 
140  if (queue_ && !queue_->read_closed()) {
141  while (NextBlock().IsValid()) { }
142  }
143 }
144 
145 } // namespace data
146 } // namespace thrill
147 
148 /******************************************************************************/
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
PinnedBlock PinWait() const =delete
not available in PinnedBlock
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
BlockWriter< BlockQueueSink > Writer
Definition: block_queue.hpp:52
BlockQueue * queue_
Reference to BlockQueue.
ConsumeBlockQueueSource(BlockQueue &queue, size_t local_worker_id)
Start reading from a BlockQueue.
Definition: block_queue.cpp:92
size_t block_counter_
number of blocks transfered by the Queue
ConsumeReader GetConsumeReader(size_t local_worker_id)
return BlockReader specifically for a BlockQueue
Definition: block_queue.cpp:47
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
common::ConcurrentBoundedQueue< Block > queue_
File file_
File to cache blocks for implementing CacheBlockQueueSource.
tlx::delegate< void(BlockQueue &)> CloseCallback
Definition: block_queue.hpp:56
bool read_closed() const
check if reader side has returned a closing sentinel block
BlockSink which interfaces to a File.
size_t local_worker_id_
local worker id of the thread reading the BlockQueue
void AppendBlock(const Block &b)
Definition: file.hpp:88
common::AtomicMovable< bool > write_closed_
Reader GetReader(bool consume, size_t local_worker_id)
return polymorphic BlockReader variant
Definition: block_queue.cpp:85
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
BlockQueue & queue_
BlockQueue that blocks are retrieved from.
Writer GetWriter(size_t block_size=default_block_size)
Return a BlockWriter delivering to this BlockQueue.
Definition: block_queue.cpp:43
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t local_worker_id_
local worker id of the thread reading the BlockQueue
CacheBlockQueueSource(BlockQueue *queue, size_t local_worker_id)
Start reading from a BlockQueue.
CloseCallback close_callback_
~CacheBlockQueueSource()
Consume remaining blocks and cache them in the File.
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
Definition: block_sink.hpp:28
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
size_t local_worker_id() const
local worker id to associate pinned block with
Definition: block_sink.hpp:94
BlockQueue(BlockPool &block_pool, size_t local_worker_id, size_t dia_id, const CloseCallback &close_callback=CloseCallback())
Constructor from BlockPool.
Definition: block_queue.cpp:19
size_t workers_per_host() const
return number of workers per host
Definition: block_pool.hpp:82
PinnedBlock NextBlock()
Return next block for BlockQueue, store into caching File and return it.
bool IsValid() const
Return whether the enclosed ByteBlock is valid.
Definition: block.hpp:223
BlockReader< DynBlockSource > DynBlockReader
Instantiation of BlockReader for reading from the polymorphic source.
BlockReader< ConsumeBlockQueueSource > ConsumeReader
Definition: block_queue.hpp:54
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
DynBlockSource GetBlockSource(bool consume, size_t local_worker_id)
return polymorphic BlockSource variant
Definition: block_queue.cpp:52
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
Definition: block_queue.hpp:47
A BlockSource to read Blocks from a BlockQueue using a BlockReader, and at the same time CACHE all it...
A BlockSource to read Block from a BlockQueue using a BlockReader.
void Close() final
Close called by BlockWriter.
Definition: block_queue.cpp:28
This is the actual BlockSource used to instantiate BlockReader.