Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
async_schedule.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/mng/async_schedule.cpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2002, 2009 Roman Dementiev <[email protected]>
7  * Copyright (C) 2009, 2010 Andreas Beckmann <[email protected]>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 // Implements the "prudent prefetching" as described in
15 // D. Hutchinson, P. Sanders, J. S. Vitter: Duality between prefetching
16 // and queued writing on parallel disks, 2005
17 // DOI: 10.1137/S0097539703431573
18 
19 #include <algorithm>
20 #include <cassert>
21 #include <functional>
22 #include <queue>
23 #include <utility>
24 #include <vector>
25 
26 #include <tlx/simple_vector.hpp>
27 #include <tlx/unused.hpp>
28 
29 #include <foxxll/common/types.hpp>
30 #include <foxxll/io/file.hpp>
32 
33 namespace foxxll {
34 namespace async_schedule_local {
35 
36 constexpr static bool debug = false;
37 
38 // only one type of event: WRITE COMPLETED
39 struct sim_event
40 {
41  size_t timestamp;
42  size_t iblock;
43  inline sim_event(size_t t, size_t b) : timestamp(t), iblock(b) { }
44 };
45 
46 struct sim_event_cmp : public std::binary_function<sim_event, sim_event, bool>
47 {
48  inline bool operator () (const sim_event& a, const sim_event& b) const
49  {
50  return a.timestamp > b.timestamp;
51  }
52 };
53 
54 using write_time_pair = std::pair<size_t, size_t>;
55 struct write_time_cmp : public std::binary_function<write_time_pair, write_time_pair, bool>
56 {
57  inline bool operator () (const write_time_pair& a, const write_time_pair& b) const
58  {
59  return a.second > b.second;
60  }
61 };
62 
63 static inline size_t get_disk(size_t i, const size_t* disks, size_t D)
64 {
65  size_t disk = disks[i];
66  if (disk == file::DEFAULT_DEVICE_ID)
67  disk = D; // remap to sentinel
68  assert(disk <= D);
69  return disk;
70 }
71 
73  const size_t* disks,
74  const size_t L,
75  const size_t m_init,
76  const size_t D,
77  std::pair<size_t, size_t>* o_time)
78 {
79  using event_queue_type = std::priority_queue<sim_event, std::vector<sim_event>, sim_event_cmp>;
80  using disk_queue_type = std::queue<size_t>;
81  assert(L >= D);
82  // + sentinel for remapping NO_ALLOCATOR
84  event_queue_type event_queue;
85 
86  size_t m = m_init;
87  size_t i = L;
88  size_t oldtime = 0;
89  tlx::simple_vector<bool> disk_busy(D + 1);
90 
91  while (m && (i > 0))
92  {
93  i--;
94  m--;
95  size_t disk = get_disk(i, disks, D);
96  disk_queues[disk].push(i);
97  }
98 
99  for (size_t ii = 0; ii <= D; ii++)
100  if (!disk_queues[ii].empty())
101  {
102  size_t j = disk_queues[ii].front();
103  disk_queues[ii].pop();
104  event_queue.push(sim_event(1, j));
105  LOG << "Block " << j << " scheduled";
106  }
107 
108  while (!event_queue.empty())
109  {
110  sim_event cur = event_queue.top();
111  event_queue.pop();
112  if (oldtime != cur.timestamp)
113  {
114  // clear disk_busy
115  for (size_t j = 0; j <= D; j++)
116  disk_busy[j] = false;
117 
118  oldtime = cur.timestamp;
119  }
120 
121  LOG << "Block " << cur.iblock << " put out, time " << cur.timestamp << " disk: " << disks[cur.iblock];
122  o_time[cur.iblock] = std::pair<size_t, size_t>(cur.iblock, cur.timestamp);
123 
124  if (i > 0)
125  {
126  size_t disk = get_disk(i - 1, disks, D);
127  if (disk_busy[disk])
128  {
129  disk_queues[disk].push(--i);
130  }
131  else
132  {
133  if (!disk_queues[disk].empty())
134  {
135  LOG << "c Block " << disk_queues[disk].front() << " scheduled for time " << cur.timestamp + 1;
136  event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front()));
137  disk_queues[disk].pop();
138  }
139  else
140  {
141  LOG << "a Block " << (i - 1) << " scheduled for time " << cur.timestamp + 1;
142  event_queue.push(sim_event(cur.timestamp + 1, --i));
143  }
144  disk_busy[disk] = true;
145  }
146  }
147 
148  // add next block to write
149  size_t disk = get_disk(cur.iblock, disks, D);
150  if (!disk_busy[disk] && !disk_queues[disk].empty())
151  {
152  LOG << "b Block " << disk_queues[disk].front() << " scheduled for time " << cur.timestamp + 1;
153  event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front()));
154  disk_queues[disk].pop();
155  disk_busy[disk] = true;
156  }
157  }
158 
159  assert(i == 0);
160  for (size_t j = 0; j <= D; j++)
161  assert(disk_queues[j].empty());
162 
163  return (oldtime - 1);
164 }
165 
166 } // namespace async_schedule_local
167 
169  const size_t* first,
170  const size_t* last,
171  size_t* out_first,
172  size_t m,
173  size_t D)
174 {
175  constexpr bool debug = false;
176 
177  using pair_type = std::pair<size_t, size_t>;
178  size_t L = last - first;
179  if (L <= D)
180  {
181  for (size_t i = 0; i < L; ++i)
182  out_first[i] = i;
183 
184  return;
185  }
186  pair_type* write_order = new pair_type[L];
187 
188  size_t w_steps = async_schedule_local::simulate_async_write(first, L, m, D, write_order);
189 
190  LOG << "Write steps: " << w_steps;
191 
192  for (size_t i = 0; i < L; i++)
193  LOG << first[i] << " " << write_order[i].first << " " << write_order[i].second;
194 
195  std::stable_sort(write_order, write_order + L, async_schedule_local::write_time_cmp());
196 
197  for (size_t i = 0; i < L; i++)
198  {
199  out_first[i] = write_order[i].first;
200  //if(out_first[i] != i)
201  LOG << i << " " << out_first[i];
202  }
203 
204  delete[] write_order;
205  tlx::unused(w_steps);
206 }
207 
208 } // namespace foxxll
209 
210 /**************************************************************************/
static size_t get_disk(size_t i, const size_t *disks, size_t D)
static const unsigned int DEFAULT_DEVICE_ID
Definition: file.hpp:92
Simpler non-growing vector without initialization.
void compute_prefetch_schedule(const size_t *first, const size_t *last, size_t *out_first, size_t m, size_t D)
static double timestamp()
Returns number of seconds since the epoch, high resolution.
Definition: timer.hpp:36
void unused(Types &&...)
Definition: unused.hpp:20
static constexpr bool debug
static constexpr bool debug
int D
Definition: gen_data.py:14
std::pair< size_t, size_t > write_time_pair
size_t simulate_async_write(const size_t *disks, const size_t L, const size_t m_init, const size_t D, std::pair< size_t, size_t > *o_time)
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:141