Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
hdfs3_file.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/vfs/hdfs3_file.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
12 
13 #include <thrill/common/logger.hpp>
14 #include <thrill/common/string.hpp>
15 
16 #include <tlx/die.hpp>
17 
18 #if THRILL_HAVE_LIBHDFS3
19 #include <hdfs/hdfs.h>
20 #endif
21 
22 #include <algorithm>
23 #include <limits>
24 #include <mutex>
25 #include <string>
26 #include <unordered_map>
27 #include <utility>
28 #include <vector>
29 
30 namespace thrill {
31 namespace vfs {
32 
33 #if THRILL_HAVE_LIBHDFS3
34 
35 // flag to output debug info from S3
36 static constexpr bool debug = false;
37 
38 //! connection map to HDFS namenode
39 static std::unordered_map<std::string, hdfsFS> s_hdfs_map;
40 
41 //! mutex to protect hdfs_map
42 static std::mutex s_hdfs_mutex;
43 
44 /******************************************************************************/
45 
46 void Hdfs3Initialize() {
47  // nothing to do
48 }
49 
50 void Hdfs3Deinitialize() {
51  std::unique_lock<std::mutex> lock(s_hdfs_mutex);
52  for (auto& hdfs : s_hdfs_map) {
53  hdfsDisconnect(hdfs.second);
54  }
55  s_hdfs_map.clear();
56 }
57 
58 /******************************************************************************/
59 // Helper Methods
60 
61 hdfsFS Hdfs3FindConnection(const std::string& hostport) {
62  std::unique_lock<std::mutex> lock(s_hdfs_mutex);
63 
64  auto it = s_hdfs_map.find(hostport);
65  if (it != s_hdfs_map.end())
66  return it->second;
67 
68  // split host:port
69  std::vector<std::string> splitted = common::Split(hostport, ':', 2);
70  uint16_t port;
71 
72  if (splitted.size() == 1) {
73  port = 8020;
74  }
75  else {
76  if (!common::from_str<uint16_t>(splitted[1], port))
77  die("Could not parse port in host:port \"" << hostport << "\"");
78  }
79 
80  // split [email protected]
81  std::vector<std::string> user_split = common::Split(splitted[0], '@', 2);
82  const char* host, * user;
83 
84  if (user_split.size() == 1) {
85  host = user_split[0].c_str();
86  user = nullptr;
87  }
88  else {
89  user = user_split[0].c_str();
90  host = user_split[1].c_str();
91  }
92 
93  hdfsBuilder* builder = hdfsNewBuilder();
94  hdfsBuilderSetNameNode(builder, host);
95  hdfsBuilderSetNameNodePort(builder, port);
96  if (user)
97  hdfsBuilderSetUserName(builder, user);
98 
99  hdfsFS hdfs = hdfsBuilderConnect(builder);
100  if (!hdfs)
101  die("Could not connect to HDFS server \"" << hostport << "\""
102  ": " << hdfsGetLastError());
103 
104  s_hdfs_map[hostport] = hdfs;
105  return hdfs;
106 }
107 
108 /******************************************************************************/
109 // List Directory Contents on HDFS
110 
111 void Hdfs3Glob(const std::string& _path, const GlobType& gtype,
112  FileList& filelist) {
113 
114  std::string path = _path;
115  // crop off hdfs://
116  die_unless(common::StartsWith(path, "hdfs://"));
117  path = path.substr(7);
118 
119  // split uri into host/path
120  std::vector<std::string> splitted = common::Split(path, '/', 2);
121 
122  hdfsFS fs = Hdfs3FindConnection(splitted[0]);
123  std::string hosturi = "hdfs://" + splitted[0];
124 
125  // prepend root /
126  splitted[1] = "/" + splitted[1];
127 
128  // list directory
129  int num_entries = 0;
130  hdfsFileInfo* list = hdfsListDirectory(
131  fs, splitted[1].c_str(), &num_entries);
132 
133  if (!list) return;
134 
135  for (int i = 0; i < num_entries; ++i) {
136  FileInfo fi;
137 
138  fi.path = list[i].mName;
139  // remove leading slashes
140  while (fi.path.size() >= 2 && fi.path[0] == '/' && fi.path[1] == '/')
141  fi.path.erase(fi.path.begin(), fi.path.begin() + 1);
142  // prepend host uri
143  fi.path = hosturi + fi.path;
144 
145  if (list[i].mKind == kObjectKindFile) {
146  if (gtype == GlobType::All || gtype == GlobType::File) {
147  // strangely full file name globs return the file with a / at
148  // the end.
149  while (fi.path.back() == '/')
150  fi.path.resize(fi.path.size() - 1);
151  fi.type = Type::File;
152  fi.size = list[i].mSize;
153  filelist.emplace_back(fi);
154  }
155  }
156  else if (list[i].mKind == kObjectKindDirectory) {
157  if (gtype == GlobType::All || gtype == GlobType::Directory) {
158  fi.type = Type::Directory;
159  fi.size = list[i].mSize;
160  filelist.emplace_back(fi);
161  }
162  }
163  }
164 
165  hdfsFreeFileInfo(list, num_entries);
166 }
167 
168 /******************************************************************************/
169 // Stream Reading from HDFS
170 
171 class Hdfs3ReadStream : public ReadStream
172 {
173 public:
174  Hdfs3ReadStream(hdfsFS fs, hdfsFile file,
175  uint64_t start_byte, uint64_t /* byte_count */)
176  : fs_(fs), file_(file) {
177 
178  int err = hdfsSeek(fs_, file_, start_byte);
179  die_unless(err == 0);
180  }
181 
182  ~Hdfs3ReadStream() override {
183  close();
184  }
185 
186  ssize_t read(void* data, size_t size) final {
187  return hdfsRead(fs_, file_, data, size);
188  }
189 
190  void close() final {
191  if (!file_) return;
192 
193  hdfsCloseFile(fs_, file_);
194  file_ = nullptr;
195  }
196 
197 private:
198  //! HDFS connection
199  hdfsFS fs_;
200 
201  //! HDFS file handler
202  hdfsFile file_;
203 };
204 
206  const std::string& _path, const common::Range& range) {
207 
208  std::string path = _path;
209  // crop off hdfs://
210  die_unless(common::StartsWith(path, "hdfs://"));
211  path = path.substr(7);
212 
213  // split uri into host/path
214  std::vector<std::string> splitted = common::Split(path, '/', 2);
215  die_unless(splitted.size() == 2);
216 
217  // prepend root /
218  splitted[1] = "/" + splitted[1];
219 
220  hdfsFS fs = Hdfs3FindConnection(splitted[0]);
221 
222  // construct file handler
223  hdfsFile file = hdfsOpenFile(
224  fs, splitted[1].c_str(), O_RDONLY, /* bufferSize */ 0,
225  /* replication */ 0, /* blocksize */ 0);
226  if (!file)
227  die("Could not open HDFS file \"" << _path << "\": " << hdfsGetLastError());
228 
229  return tlx::make_counting<Hdfs3ReadStream>(
230  fs, file, /* start_byte */ range.begin, /* byte_count */ range.size());
231 }
232 
233 /******************************************************************************/
234 // Stream Writing to HDFS
235 
236 class Hdfs3WriteStream : public WriteStream
237 {
238 public:
239  Hdfs3WriteStream(hdfsFS fs, hdfsFile file)
240  : fs_(fs), file_(file) { }
241 
242  ~Hdfs3WriteStream() override {
243  close();
244  }
245 
246  ssize_t write(const void* data, size_t size) final {
247  return hdfsWrite(fs_, file_, data, size);
248  }
249 
250  void close() final {
251  if (!file_) return;
252 
253  hdfsCloseFile(fs_, file_);
254  file_ = nullptr;
255  }
256 
257 private:
258  //! HDFS connection
259  hdfsFS fs_;
260 
261  //! HDFS file handler
262  hdfsFile file_;
263 };
264 
266 
267  std::string path = _path;
268  // crop off hdfs://
269  die_unless(common::StartsWith(path, "hdfs://"));
270  path = path.substr(7);
271 
272  // split uri into host/path
273  std::vector<std::string> splitted = common::Split(path, '/', 2);
274 
275  // prepend root /
276  splitted[1] = "/" + splitted[1];
277 
278  hdfsFS fs = Hdfs3FindConnection(splitted[0]);
279 
280  // construct file handler
281  hdfsFile file = hdfsOpenFile(
282  fs, splitted[1].c_str(), O_WRONLY, /* bufferSize */ 0,
283  /* replication */ 0, /* blocksize */ 0);
284  if (!file)
285  die("Could not open HDFS file \"" << _path << "\": " << hdfsGetLastError());
286 
287  return tlx::make_counting<Hdfs3WriteStream>(fs, file);
288 }
289 
290 #else // !THRILL_HAVE_LIBHDFS3
291 
293 { }
294 
296 { }
297 
298 void Hdfs3Glob(const std::string& /* path */, const GlobType& /* gtype */,
299  FileList& /* filelist */) {
300  die("hdfs:// is not available, because Thrill was built without libhdfs3.");
301 }
302 
304  const std::string& /* path */, const common::Range& /* range */) {
305  die("hdfs:// is not available, because Thrill was built without libhdfs3.");
306 }
307 
309  die("hdfs:// is not available, because Thrill was built without libhdfs3.");
310 }
311 
312 #endif // !THRILL_HAVE_LIBHDFS3
313 
314 } // namespace vfs
315 } // namespace thrill
316 
317 /******************************************************************************/
#define die_unless(X)
Definition: die.hpp:27
WriteStreamPtr Hdfs3OpenWriteStream(const std::string &)
Definition: hdfs3_file.cpp:308
tlx::CountingPtr< WriteStream > WriteStreamPtr
Definition: file_io.hpp:146
GlobType
Type of objects to include in glob result.
Definition: file_io.hpp:99
void Hdfs3Initialize()
Definition: hdfs3_file.cpp:292
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
void Hdfs3Deinitialize()
Definition: hdfs3_file.cpp:295
ReadStreamPtr Hdfs3OpenReadStream(const std::string &, const common::Range &)
Definition: hdfs3_file.cpp:303
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static constexpr bool debug
High-performance smart pointer used as a wrapping reference counting pointer.
tlx::CountingPtr< ReadStream > ReadStreamPtr
Definition: file_io.hpp:145
List of file info and additional overall info.
Definition: file_io.hpp:79
void Hdfs3Glob(const std::string &, const GlobType &, FileList &)
Definition: hdfs3_file.cpp:298