Thrill  0.1
async_schedule.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/mng/async_schedule.cpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2002, 2009 Roman Dementiev <[email protected]>
7  * Copyright (C) 2009, 2010 Andreas Beckmann <[email protected]>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 // Implements the "prudent prefetching" as described in
15 // D. Hutchinson, P. Sanders, J. S. Vitter: Duality between prefetching
16 // and queued writing on parallel disks, 2005
17 // DOI: 10.1137/S0097539703431573
18 
19 #include <algorithm>
20 #include <cassert>
21 #include <functional>
22 #include <queue>
23 #include <utility>
24 #include <vector>
25 
26 #include <tlx/simple_vector.hpp>
27 #include <tlx/unused.hpp>
28 
29 #include <foxxll/common/types.hpp>
30 #include <foxxll/io/file.hpp>
32 
33 namespace foxxll {
34 namespace async_schedule_local {
35 
36 constexpr static bool debug = false;
37 
38 // only one type of event: WRITE COMPLETED
39 struct sim_event
40 {
41  size_t timestamp;
42  size_t iblock;
43  inline sim_event(size_t t, size_t b) : timestamp(t), iblock(b) { }
44 };
45 
46 struct sim_event_cmp : public std::binary_function<sim_event, sim_event, bool>
47 {
48  inline bool operator () (const sim_event& a, const sim_event& b) const
49  {
50  return a.timestamp > b.timestamp;
51  }
52 };
53 
54 using write_time_pair = std::pair<size_t, size_t>;
55 struct write_time_cmp : public std::binary_function<write_time_pair, write_time_pair, bool>
56 {
57  inline bool operator () (const write_time_pair& a, const write_time_pair& b) const
58  {
59  return a.second > b.second;
60  }
61 };
62 
63 static inline size_t get_disk(size_t i, const size_t* disks, size_t D)
64 {
65  size_t disk = disks[i];
66  if (disk == file::DEFAULT_DEVICE_ID)
67  disk = D; // remap to sentinel
68  assert(disk <= D);
69  return disk;
70 }
71 
73  const size_t* disks,
74  const size_t L,
75  const size_t m_init,
76  const size_t D,
77  std::pair<size_t, size_t>* o_time)
78 {
79  using event_queue_type = std::priority_queue<sim_event, std::vector<sim_event>, sim_event_cmp>;
80  using disk_queue_type = std::queue<size_t>;
81  assert(L >= D);
82  // + sentinel for remapping NO_ALLOCATOR
84  event_queue_type event_queue;
85 
86  size_t m = m_init;
87  size_t i = L;
88  size_t oldtime = 0;
89  tlx::simple_vector<bool> disk_busy(D + 1);
90 
91  while (m && (i > 0))
92  {
93  i--;
94  m--;
95  size_t disk = get_disk(i, disks, D);
96  disk_queues[disk].push(i);
97  }
98 
99  for (size_t ii = 0; ii <= D; ii++)
100  if (!disk_queues[ii].empty())
101  {
102  size_t j = disk_queues[ii].front();
103  disk_queues[ii].pop();
104  event_queue.push(sim_event(1, j));
105  TLX_LOG << "Block " << j << " scheduled";
106  }
107 
108  while (!event_queue.empty())
109  {
110  sim_event cur = event_queue.top();
111  event_queue.pop();
112  if (oldtime != cur.timestamp)
113  {
114  // clear disk_busy
115  for (size_t j = 0; j <= D; j++)
116  disk_busy[j] = false;
117 
118  oldtime = cur.timestamp;
119  }
120 
121  TLX_LOG << "Block " << cur.iblock << " put out, time "
122  << cur.timestamp << " disk: " << disks[cur.iblock];
123  o_time[cur.iblock] = std::pair<size_t, size_t>(cur.iblock, cur.timestamp);
124 
125  if (i > 0)
126  {
127  size_t disk = get_disk(i - 1, disks, D);
128  if (disk_busy[disk])
129  {
130  disk_queues[disk].push(--i);
131  }
132  else
133  {
134  if (!disk_queues[disk].empty())
135  {
136  TLX_LOG << "c Block " << disk_queues[disk].front()
137  << " scheduled for time " << cur.timestamp + 1;
138  event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front()));
139  disk_queues[disk].pop();
140  }
141  else
142  {
143  TLX_LOG << "a Block " << (i - 1)
144  << " scheduled for time " << cur.timestamp + 1;
145  event_queue.push(sim_event(cur.timestamp + 1, --i));
146  }
147  disk_busy[disk] = true;
148  }
149  }
150 
151  // add next block to write
152  size_t disk = get_disk(cur.iblock, disks, D);
153  if (!disk_busy[disk] && !disk_queues[disk].empty())
154  {
155  TLX_LOG << "b Block " << disk_queues[disk].front()
156  << " scheduled for time " << cur.timestamp + 1;
157  event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front()));
158  disk_queues[disk].pop();
159  disk_busy[disk] = true;
160  }
161  }
162 
163  assert(i == 0);
164  for (size_t j = 0; j <= D; j++)
165  assert(disk_queues[j].empty());
166 
167  return (oldtime - 1);
168 }
169 
170 } // namespace async_schedule_local
171 
173  const size_t* first,
174  const size_t* last,
175  size_t* out_first,
176  size_t m,
177  size_t D)
178 {
179  constexpr bool debug = false;
180 
181  using pair_type = std::pair<size_t, size_t>;
182  size_t L = last - first;
183  if (L <= D)
184  {
185  for (size_t i = 0; i < L; ++i)
186  out_first[i] = i;
187 
188  return;
189  }
190  pair_type* write_order = new pair_type[L];
191 
192  size_t w_steps = async_schedule_local::simulate_async_write(first, L, m, D, write_order);
193 
194  TLX_LOG << "Write steps: " << w_steps;
195 
196  for (size_t i = 0; i < L; i++)
197  TLX_LOG << first[i] << " " << write_order[i].first << " " << write_order[i].second;
198 
199  std::stable_sort(write_order, write_order + L, async_schedule_local::write_time_cmp());
200 
201  for (size_t i = 0; i < L; i++)
202  {
203  out_first[i] = write_order[i].first;
204  //if(out_first[i] != i)
205  TLX_LOG << i << " " << out_first[i];
206  }
207 
208  delete[] write_order;
209  tlx::unused(w_steps);
210 }
211 
212 } // namespace foxxll
213 
214 /**************************************************************************/
static size_t get_disk(size_t i, const size_t *disks, size_t D)
static const unsigned int DEFAULT_DEVICE_ID
Definition: file.hpp:92
Simpler non-growing vector without initialization.
void compute_prefetch_schedule(const size_t *first, const size_t *last, size_t *out_first, size_t m, size_t D)
static double timestamp()
Returns number of seconds since the epoch, high resolution.
Definition: timer.hpp:36
void unused(Types &&...)
Definition: unused.hpp:20
static constexpr bool debug
FOXXLL library namespace
int D
Definition: gen_data.py:14
std::pair< size_t, size_t > write_time_pair
size_t simulate_async_write(const size_t *disks, const size_t L, const size_t m_init, const size_t D, std::pair< size_t, size_t > *o_time)
#define TLX_LOG
Default logging method: output if the local debug variable is true.
Definition: core.hpp:141