Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sys_file.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/vfs/sys_file.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/vfs/sys_file.hpp>
13 
15 #include <thrill/common/string.hpp>
18 
19 #include <tlx/die.hpp>
20 #include <tlx/string/ends_with.hpp>
21 
22 #include <fcntl.h>
23 #include <sys/stat.h>
24 
25 #if !defined(_MSC_VER)
26 
27 #include <dirent.h>
28 #include <glob.h>
29 #include <sys/wait.h>
30 #include <unistd.h>
31 
32 #if !defined(O_BINARY)
33 #define O_BINARY 0
34 #endif
35 
36 #else
37 
38 #include <io.h>
39 #include <windows.h>
40 
41 #define S_ISREG(m) (((m) & _S_IFMT) == _S_IFREG)
42 
43 #endif
44 
45 #include <algorithm>
46 #include <string>
47 #include <vector>
48 
49 namespace thrill {
50 namespace vfs {
51 
52 /******************************************************************************/
53 
54 static void SysGlobWalkRecursive(const std::string& path, FileList& filelist) {
55 #if defined(_MSC_VER)
56 
57  WIN32_FIND_DATA ff;
58  HANDLE h = FindFirstFile((path + "\\*").c_str(), &ff);
59 
60  if (h == INVALID_HANDLE_VALUE) {
62  "FindFirstFile failed:" + std::to_string(GetLastError()));
63  }
64 
65  std::vector<FileInfo> tmp_list;
66 
67  do {
68  if (ff.cFileName[0] != '.')
69  {
70  FileInfo fi;
71  if (ff.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
72  fi.type = Type::Directory;
73  }
74  else {
75  fi.type = Type::File;
76  }
77  fi.path = path + "\\" + ff.cFileName;
78  fi.size = (static_cast<uint64_t>(ff.nFileSizeHigh) * (MAXDWORD + 1))
79  + static_cast<uint64_t>(ff.nFileSizeLow);
80  tmp_list.emplace_back(fi);
81  }
82  } while (FindNextFile(h, &ff) != 0);
83 
84  DWORD e = GetLastError();
85  if (e != ERROR_NO_MORE_FILES) {
87  "FindFirstFile failed:" + std::to_string(GetLastError()));
88  }
89 
90  std::sort(tmp_list.begin(), tmp_list.end());
91 
92  for (const FileInfo& fi : tmp_list) {
93  if (fi.type == Type::Directory) {
94  SysGlobWalkRecursive(fi.path, filelist);
95  }
96  else {
97  filelist.emplace_back(fi);
98  }
99  }
100 
101 #else
102  // read entries
103  DIR* dir = opendir(path.c_str());
104  if (dir == nullptr)
105  throw common::ErrnoException("Could not read directory " + path);
106 
107  struct dirent* de, de_entry;
108  struct stat st;
109 
110  std::vector<std::string> list;
111 
112  while (readdir_r(dir, &de_entry, &de) == 0 && de != nullptr) {
113  // skip ".", "..", and also hidden files (don't create them).
114  if (de->d_name[0] == '.') continue;
115 
116  list.emplace_back(path + "/" + de->d_name);
117  }
118 
119  closedir(dir);
120 
121  // sort file names
122  std::sort(list.begin(), list.end());
123 
124  for (const std::string& entry : list) {
125  if (stat(entry.c_str(), &st) != 0)
126  throw common::ErrnoException("Could not lstat() " + entry);
127 
128  if (S_ISDIR(st.st_mode)) {
129  // descend into directories
130  SysGlobWalkRecursive(entry, filelist);
131  }
132  else if (S_ISREG(st.st_mode)) {
133  FileInfo fi;
134  fi.type = Type::File;
135  fi.path = entry;
136  fi.size = static_cast<uint64_t>(st.st_size);
137  filelist.emplace_back(fi);
138  }
139  }
140 #endif
141 }
142 
143 void SysGlob(const std::string& path, const GlobType& gtype,
144  FileList& filelist) {
145 
146  std::vector<std::string> list;
147 
148  // collect file names
149 #if defined(_MSC_VER)
151  sglob.Add(path.c_str());
152  for (int n = 0; n < sglob.FileCount(); ++n) {
153  list.emplace_back(sglob.File(n));
154  }
155 #else
156  glob_t glob_result;
157  glob(path.c_str(), GLOB_TILDE, nullptr, &glob_result);
158 
159  for (unsigned int i = 0; i < glob_result.gl_pathc; ++i) {
160  list.push_back(glob_result.gl_pathv[i]);
161  }
162  globfree(&glob_result);
163 #endif
164 
165  // sort file names
166  std::sort(list.begin(), list.end());
167 
168  // stat files to collect size information
169  struct stat filestat;
170  for (const std::string& file : list)
171  {
172  if (::stat(file.c_str(), &filestat) != 0) {
173  die("ERROR: could not stat() path " + file);
174  }
175 
176  if (S_ISREG(filestat.st_mode)) {
177  if (gtype == GlobType::All || gtype == GlobType::File) {
178  FileInfo fi;
179  fi.type = Type::File;
180  fi.path = file;
181  fi.size = static_cast<uint64_t>(filestat.st_size);
182  filelist.emplace_back(fi);
183  }
184  }
185  else {
186  // directory entries or others
187  if (gtype == GlobType::All || gtype == GlobType::Directory) {
188  FileInfo fi;
189  fi.type = Type::Directory;
190  fi.path = file;
191  fi.size = 0;
192  filelist.emplace_back(fi);
193  }
194  else if (gtype == GlobType::File) {
195  SysGlobWalkRecursive(file, filelist);
196  }
197  }
198  }
199 }
200 
201 /******************************************************************************/
202 
203 /*!
204  * Represents a POSIX system file via its file descriptor.
205  */
206 class SysFile final : public virtual ReadStream, public virtual WriteStream
207 {
208  static constexpr bool debug = false;
209 
210 public:
211  //! default constructor
212  SysFile() : fd_(-1) { }
213 
214  //! constructor: use OpenForRead or OpenForWrite.
215  explicit SysFile(int fd, int pid = 0) noexcept
216  : fd_(fd), pid_(pid) { }
217 
218  //! non-copyable: delete copy-constructor
219  SysFile(const SysFile&) = delete;
220  //! non-copyable: delete assignment operator
221  SysFile& operator = (const SysFile&) = delete;
222  //! move-constructor
223  SysFile(SysFile&& f) noexcept
224  : fd_(f.fd_), pid_(f.pid_) {
225  f.fd_ = -1, f.pid_ = 0;
226  }
227  //! move-assignment
228  SysFile& operator = (SysFile&& f) {
229  close();
230  fd_ = f.fd_, pid_ = f.pid_;
231  f.fd_ = -1, f.pid_ = 0;
232  return *this;
233  }
234 
235  ~SysFile() {
236  close();
237  }
238 
239  //! POSIX write function.
240  ssize_t write(const void* data, size_t count) final {
241  assert(fd_ >= 0);
242 #if defined(_MSC_VER)
243  return ::_write(fd_, data, static_cast<unsigned>(count));
244 #else
245  return ::write(fd_, data, count);
246 #endif
247  }
248 
249  //! POSIX read function.
250  ssize_t read(void* data, size_t count) final {
251  assert(fd_ >= 0);
252 #if defined(_MSC_VER)
253  return ::_read(fd_, data, static_cast<unsigned>(count));
254 #else
255  return ::read(fd_, data, count);
256 #endif
257  }
258 
259  //! close the file descriptor
260  void close() final;
261 
262 private:
263  //! file descriptor
264  int fd_ = -1;
265 
266 #if defined(_MSC_VER)
267  using pid_t = int;
268 #endif
269 
270  //! pid of child process to wait for
271  pid_t pid_ = 0;
272 };
273 
274 void SysFile::close() {
275  if (fd_ >= 0) {
276  sLOG << "SysFile::close(): fd" << fd_;
277  if (::close(fd_) != 0)
278  {
279  LOG1 << "SysFile::close()"
280  << " fd_=" << fd_
281  << " errno=" << errno
282  << " error=" << strerror(errno);
283  }
284  fd_ = -1;
285  }
286 #if !defined(_MSC_VER)
287  if (pid_ != 0) {
288  sLOG << "SysFile::close(): waitpid for" << pid_;
289  int status;
290  pid_t p = waitpid(pid_, &status, 0);
291  if (p != pid_) {
292  throw common::SystemException(
293  "SysFile: waitpid() failed to return child");
294  }
295  if (WIFEXITED(status)) {
296  // child program exited normally
297  if (WEXITSTATUS(status) != 0) {
298  throw common::ErrnoException(
299  "SysFile: child failed with return code "
300  + std::to_string(WEXITSTATUS(status)));
301  }
302  else {
303  // zero return code. good.
304  }
305  }
306  else if (WIFSIGNALED(status)) {
307  throw common::ErrnoException(
308  "SysFile: child killed by signal "
309  + std::to_string(WTERMSIG(status)));
310  }
311  else {
312  throw common::ErrnoException(
313  "SysFile: child failed with an unknown error");
314  }
315  pid_ = 0;
316  }
317 #endif
318 }
319 
320 /******************************************************************************/
321 
323  const std::string& path, const common::Range& range) {
324 
325  static constexpr bool debug = false;
326 
327  // first open the file and see if it exists at all.
328 
329  int fd = ::open(path.c_str(), O_RDONLY | O_BINARY, 0);
330  if (fd < 0) {
331  throw common::ErrnoException("Cannot open file " + path);
332  }
333 
334  // then figure out whether we need to pipe it through a decompressor.
335 
336  const char* decompressor;
337 
338  if (tlx::ends_with(path, ".xz")) {
339  decompressor = "xz";
340  }
341  else if (tlx::ends_with(path, ".lzo")) {
342  decompressor = "lzop";
343  }
344  else if (tlx::ends_with(path, ".lz4")) {
345  decompressor = "lz4";
346  }
347  else {
348  // not a compressed file
350 
351  sLOG << "SysFile::OpenForRead(): filefd" << fd;
352 
353  if (range.begin) {
354  //! POSIX lseek function from current position.
355  ::lseek(fd, range.begin, SEEK_CUR);
356  }
357 
358  return tlx::make_counting<SysFile>(fd);
359  }
360 
361 #if defined(_MSC_VER)
363  "Reading compressed files is not supported on windows, yet. "
364  "Please submit a patch.");
365 #else
366  // if decompressor: fork a child program which calls the decompressor and
367  // connect file descriptors via a pipe.
368 
369  // pipe[0] = read, pipe[1] = write
370  int pipefd[2];
371  common::MakePipe(pipefd);
372 
373  pid_t pid = fork();
374  if (pid == 0) {
375  // close read end
376  ::close(pipefd[0]);
377 
378  // replace stdin with file descriptor to file opened above.
379  dup2(fd, STDIN_FILENO);
380  ::close(fd);
381  // replace stdout with pipe going back to Thrill process
382  dup2(pipefd[1], STDOUT_FILENO);
383  ::close(pipefd[1]);
384 
385  execlp(decompressor, decompressor, "-d", nullptr);
386 
387  LOG1 << "Pipe execution failed: " << strerror(errno);
388  // close write end
389  ::close(pipefd[1]);
390  exit(-1);
391  }
392  else if (pid < 0) {
393  throw common::ErrnoException("Error creating child process");
394  }
395 
396  sLOG << "SysFile::OpenForRead(): pipefd" << pipefd[0] << "to pid" << pid;
397 
398  // close pipe write end
399  ::close(pipefd[1]);
400 
401  // close the file descriptor
402  ::close(fd);
403 
404  if (range.begin) {
405  //! POSIX lseek function from current position.
406  ::lseek(pipefd[0], range.begin, SEEK_CUR);
407  }
408 
409  return tlx::make_counting<SysFile>(pipefd[0], pid);
410 #endif
411 }
412 
414 
415  static constexpr bool debug = false;
416 
417  // first create the file and see if we can write it at all.
418 
419  int fd = ::open(path.c_str(), O_CREAT | O_WRONLY | O_BINARY, 0666);
420  if (fd < 0) {
421  throw common::ErrnoException("Cannot create file " + path);
422  }
423 
424  // then figure out whether we need to pipe it through a compressor.
425 
426  const char* compressor;
427 
428  if (tlx::ends_with(path, ".xz")) {
429  compressor = "xz";
430  }
431  else if (tlx::ends_with(path, ".lzo")) {
432  compressor = "lzop";
433  }
434  else if (tlx::ends_with(path, ".lz4")) {
435  compressor = "lz4";
436  }
437  else {
438  // not a compressed file
440 
441  sLOG << "SysFile::OpenForWrite(): filefd" << fd;
442 
443  return tlx::make_counting<SysFile>(fd);
444  }
445 
446 #if defined(_MSC_VER)
448  "Reading compressed files is not supported on windows, yet. "
449  "Please submit a patch.");
450 #else
451  // if compressor: fork a child program which calls the compressor and
452  // connect file descriptors via a pipe.
453 
454  // pipe[0] = read, pipe[1] = write
455  int pipefd[2];
456  common::MakePipe(pipefd);
457 
458  pid_t pid = fork();
459  if (pid == 0) {
460  // close write end
461  ::close(pipefd[1]);
462 
463  // replace stdin with pipe
464  dup2(pipefd[0], STDIN_FILENO);
465  ::close(pipefd[0]);
466  // replace stdout with file descriptor to file created above.
467  dup2(fd, STDOUT_FILENO);
468  ::close(fd);
469 
470  execlp(compressor, compressor, nullptr);
471 
472  LOG1 << "Pipe execution failed: " << strerror(errno);
473  // close read end
474  ::close(pipefd[0]);
475  exit(-1);
476  }
477  else if (pid < 0) {
478  throw common::ErrnoException("Error creating child process");
479  }
480 
481  sLOG << "SysFile::OpenForWrite(): pipefd" << pipefd[0] << "to pid" << pid;
482 
483  // close read end
484  ::close(pipefd[0]);
485 
486  // close file descriptor (it is used by the fork)
487  ::close(fd);
488 
489  return tlx::make_counting<SysFile>(pipefd[1], pid);
490 #endif
491 }
492 
493 } // namespace vfs
494 } // namespace thrill
495 
496 /******************************************************************************/
Type type
type of entry
Definition: file_io.hpp:59
A cross-platform file globbing library providing the ability to expand wildcards in command-line argu...
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
An Exception which is thrown on system errors.
GlobType
Type of objects to include in glob result.
Definition: file_io.hpp:99
#define LOG1
Definition: logger.hpp:28
An Exception which is thrown on system errors and contains errno information.
void PortSetCloseOnExec(int fd)
set FD_CLOEXEC on file descriptor (if possible)
Definition: porting.cpp:41
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
#define CSimpleGlob
TCHAR version dependent on if _UNICODE is defined.
void MakePipe(int out_pipefds[2])
create a pair of pipe file descriptors
Definition: porting.cpp:51
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
General information of vfs file.
Definition: file_io.hpp:57
static by_string to_string(int val)
convert to string
uint64_t size
size of file.
Definition: file_io.hpp:63
WriteStreamPtr SysOpenWriteStream(const std::string &path)
Open file for writing and return file descriptor.
Definition: sys_file.cpp:413
std::string path
path to file
Definition: file_io.hpp:61
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static constexpr bool debug
void SysGlob(const std::string &path, const GlobType &gtype, FileList &filelist)
Glob a path and augment the FileList with matching file names.
Definition: sys_file.cpp:143
High-performance smart pointer used as a wrapping reference counting pointer.
ReadStreamPtr SysOpenReadStream(const std::string &path, const common::Range &range)
Open file for reading and return file descriptor.
Definition: sys_file.cpp:322
List of file info and additional overall info.
Definition: file_io.hpp:79
size_t begin
begin index
Definition: math.hpp:56
#define O_BINARY
Definition: sys_file.cpp:33
bool ends_with(const std::string &str, const std::string &match)
Checks if the given match string is located at the end of this string.
Definition: ends_with.cpp:20
static void SysGlobWalkRecursive(const std::string &path, FileList &filelist)
Definition: sys_file.cpp:54