18 #if THRILL_HAVE_LIBHDFS3 19 #include <hdfs/hdfs.h> 26 #include <unordered_map> 33 #if THRILL_HAVE_LIBHDFS3 36 static constexpr
bool debug =
false;
39 static std::unordered_map<std::string, hdfsFS> s_hdfs_map;
42 static std::mutex s_hdfs_mutex;
51 std::unique_lock<std::mutex> lock(s_hdfs_mutex);
52 for (
auto& hdfs : s_hdfs_map) {
53 hdfsDisconnect(hdfs.second);
61 hdfsFS Hdfs3FindConnection(
const std::string& hostport) {
62 std::unique_lock<std::mutex> lock(s_hdfs_mutex);
64 auto it = s_hdfs_map.find(hostport);
65 if (it != s_hdfs_map.end())
69 std::vector<std::string> splitted = common::Split(hostport,
':', 2);
72 if (splitted.size() == 1) {
76 if (!common::from_str<uint16_t>(splitted[1], port))
77 die(
"Could not parse port in host:port \"" << hostport <<
"\"");
81 std::vector<std::string> user_split = common::Split(splitted[0],
'@', 2);
82 const char* host, * user;
84 if (user_split.size() == 1) {
85 host = user_split[0].c_str();
89 user = user_split[0].c_str();
90 host = user_split[1].c_str();
93 hdfsBuilder* builder = hdfsNewBuilder();
94 hdfsBuilderSetNameNode(builder, host);
95 hdfsBuilderSetNameNodePort(builder, port);
97 hdfsBuilderSetUserName(builder, user);
99 hdfsFS hdfs = hdfsBuilderConnect(builder);
101 die(
"Could not connect to HDFS server \"" << hostport <<
"\"" 102 ": " << hdfsGetLastError());
104 s_hdfs_map[hostport] = hdfs;
112 FileList& filelist) {
116 die_unless(common::StartsWith(path,
"hdfs://"));
117 path = path.substr(7);
120 std::vector<std::string> splitted = common::Split(path,
'/', 2);
122 hdfsFS fs = Hdfs3FindConnection(splitted[0]);
126 splitted[1] =
"/" + splitted[1];
130 hdfsFileInfo* list = hdfsListDirectory(
131 fs, splitted[1].c_str(), &num_entries);
135 for (
int i = 0; i < num_entries; ++i) {
138 fi.path = list[i].mName;
140 while (fi.path.size() >= 2 && fi.path[0] ==
'/' && fi.path[1] ==
'/')
141 fi.path.erase(fi.path.begin(), fi.path.begin() + 1);
143 fi.path = hosturi + fi.path;
145 if (list[i].mKind == kObjectKindFile) {
149 while (fi.path.back() ==
'/')
150 fi.path.resize(fi.path.size() - 1);
152 fi.size = list[i].mSize;
153 filelist.emplace_back(fi);
156 else if (list[i].mKind == kObjectKindDirectory) {
159 fi.size = list[i].mSize;
160 filelist.emplace_back(fi);
165 hdfsFreeFileInfo(list, num_entries);
171 class Hdfs3ReadStream :
public ReadStream
174 Hdfs3ReadStream(hdfsFS fs, hdfsFile file,
175 uint64_t start_byte, uint64_t )
176 : fs_(fs), file_(file) {
178 int err = hdfsSeek(fs_, file_, start_byte);
182 ~Hdfs3ReadStream()
override {
186 ssize_t read(
void* data,
size_t size)
final {
187 return hdfsRead(fs_, file_, data, size);
193 hdfsCloseFile(fs_, file_);
206 const std::string& _path,
const common::Range& range) {
210 die_unless(common::StartsWith(path,
"hdfs://"));
211 path = path.substr(7);
214 std::vector<std::string> splitted = common::Split(path,
'/', 2);
218 splitted[1] =
"/" + splitted[1];
220 hdfsFS fs = Hdfs3FindConnection(splitted[0]);
223 hdfsFile file = hdfsOpenFile(
224 fs, splitted[1].c_str(), O_RDONLY, 0,
227 die(
"Could not open HDFS file \"" << _path <<
"\": " << hdfsGetLastError());
229 return tlx::make_counting<Hdfs3ReadStream>(
230 fs, file, range.begin, range.size());
236 class Hdfs3WriteStream :
public WriteStream
239 Hdfs3WriteStream(hdfsFS fs, hdfsFile file)
240 : fs_(fs), file_(file) { }
242 ~Hdfs3WriteStream()
override {
246 ssize_t write(
const void* data,
size_t size)
final {
247 return hdfsWrite(fs_, file_, data, size);
253 hdfsCloseFile(fs_, file_);
269 die_unless(common::StartsWith(path,
"hdfs://"));
270 path = path.substr(7);
273 std::vector<std::string> splitted = common::Split(path,
'/', 2);
276 splitted[1] =
"/" + splitted[1];
278 hdfsFS fs = Hdfs3FindConnection(splitted[0]);
281 hdfsFile file = hdfsOpenFile(
282 fs, splitted[1].c_str(), O_WRONLY, 0,
285 die(
"Could not open HDFS file \"" << _path <<
"\": " << hdfsGetLastError());
287 return tlx::make_counting<Hdfs3WriteStream>(fs, file);
290 #else // !THRILL_HAVE_LIBHDFS3 300 die(
"hdfs:// is not available, because Thrill was built without libhdfs3.");
305 die(
"hdfs:// is not available, because Thrill was built without libhdfs3.");
309 die(
"hdfs:// is not available, because Thrill was built without libhdfs3.");
312 #endif // !THRILL_HAVE_LIBHDFS3
WriteStreamPtr Hdfs3OpenWriteStream(const std::string &)
tlx::CountingPtr< WriteStream > WriteStreamPtr
GlobType
Type of objects to include in glob result.
represents a 1 dimensional range (interval) [begin,end)
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
ReadStreamPtr Hdfs3OpenReadStream(const std::string &, const common::Range &)
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
static constexpr bool debug
High-performance smart pointer used as a wrapping reference counting pointer.
tlx::CountingPtr< ReadStream > ReadStreamPtr
List of file info and additional overall info.
void Hdfs3Glob(const std::string &, const GlobType &, FileList &)