Thrill  0.1
sys_file.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/vfs/sys_file.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/vfs/sys_file.hpp>
13 
16 #include <thrill/common/string.hpp>
19 
20 #include <tlx/die.hpp>
21 #include <tlx/string/ends_with.hpp>
22 
23 #include <fcntl.h>
24 #include <sys/stat.h>
25 
26 #if !defined(_MSC_VER)
27 
28 #include <dirent.h>
29 #include <glob.h>
30 #include <sys/wait.h>
31 #include <unistd.h>
32 
33 #if !defined(O_BINARY)
34 #define O_BINARY 0
35 #endif
36 
37 #else
38 
39 #include <io.h>
40 #include <windows.h>
41 
42 #define S_ISREG(m) (((m) & _S_IFMT) == _S_IFREG)
43 
44 #endif
45 
46 #include <algorithm>
47 #include <string>
48 #include <vector>
49 
50 namespace thrill {
51 namespace vfs {
52 
53 /******************************************************************************/
54 
55 static void SysGlobWalkRecursive(const std::string& path, FileList& filelist) {
56 #if defined(_MSC_VER)
57 
58  WIN32_FIND_DATA ff;
59  HANDLE h = FindFirstFile((path + "\\*").c_str(), &ff);
60 
61  if (h == INVALID_HANDLE_VALUE) {
63  "FindFirstFile failed:" + std::to_string(GetLastError()));
64  }
65 
66  std::vector<FileInfo> tmp_list;
67 
68  do {
69  if (ff.cFileName[0] != '.')
70  {
71  FileInfo fi;
72  if (ff.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
73  fi.type = Type::Directory;
74  }
75  else {
76  fi.type = Type::File;
77  }
78  fi.path = path + "\\" + ff.cFileName;
79  fi.size = (static_cast<uint64_t>(ff.nFileSizeHigh) * (MAXDWORD + 1))
80  + static_cast<uint64_t>(ff.nFileSizeLow);
81  tmp_list.emplace_back(fi);
82  }
83  } while (FindNextFile(h, &ff) != 0);
84 
85  DWORD e = GetLastError();
86  if (e != ERROR_NO_MORE_FILES) {
88  "FindFirstFile failed:" + std::to_string(GetLastError()));
89  }
90 
91  std::sort(tmp_list.begin(), tmp_list.end());
92 
93  for (const FileInfo& fi : tmp_list) {
94  if (fi.type == Type::Directory) {
95  SysGlobWalkRecursive(fi.path, filelist);
96  }
97  else {
98  filelist.emplace_back(fi);
99  }
100  }
101 
102 #else
103  // read entries
104  DIR* dir = opendir(path.c_str());
105  if (dir == nullptr)
106  throw common::ErrnoException("Could not read directory " + path);
107 
108  struct dirent* de;
109  struct stat st;
110 
111  std::vector<std::string> list;
112 
113  while ((de = common::ts_readdir(dir)) != nullptr) {
114  // skip ".", "..", and also hidden files (don't create them).
115  if (de->d_name[0] == '.') continue;
116 
117  list.emplace_back(path + "/" + de->d_name);
118  }
119 
120  closedir(dir);
121 
122  // sort file names
123  std::sort(list.begin(), list.end());
124 
125  for (const std::string& entry : list) {
126  if (stat(entry.c_str(), &st) != 0)
127  throw common::ErrnoException("Could not lstat() " + entry);
128 
129  if (S_ISDIR(st.st_mode)) {
130  // descend into directories
131  SysGlobWalkRecursive(entry, filelist);
132  }
133  else if (S_ISREG(st.st_mode)) {
134  FileInfo fi;
135  fi.type = Type::File;
136  fi.path = entry;
137  fi.size = static_cast<uint64_t>(st.st_size);
138  filelist.emplace_back(fi);
139  }
140  }
141 #endif
142 }
143 
144 void SysGlob(const std::string& path, const GlobType& gtype,
145  FileList& filelist) {
146 
147  std::vector<std::string> list;
148 
149  // collect file names
150 #if defined(_MSC_VER)
152  sglob.Add(path.c_str());
153  for (int n = 0; n < sglob.FileCount(); ++n) {
154  list.emplace_back(sglob.File(n));
155  }
156 #else
157  glob_t glob_result;
158  glob(path.c_str(), GLOB_TILDE, nullptr, &glob_result);
159 
160  for (unsigned int i = 0; i < glob_result.gl_pathc; ++i) {
161  list.push_back(glob_result.gl_pathv[i]);
162  }
163  globfree(&glob_result);
164 #endif
165 
166  // sort file names
167  std::sort(list.begin(), list.end());
168 
169  // stat files to collect size information
170  struct stat filestat;
171  for (const std::string& file : list)
172  {
173  if (::stat(file.c_str(), &filestat) != 0) {
174  die("ERROR: could not stat() path " + file);
175  }
176 
177  if (S_ISREG(filestat.st_mode)) {
178  if (gtype == GlobType::All || gtype == GlobType::File) {
179  FileInfo fi;
180  fi.type = Type::File;
181  fi.path = file;
182  fi.size = static_cast<uint64_t>(filestat.st_size);
183  filelist.emplace_back(fi);
184  }
185  }
186  else {
187  // directory entries or others
188  if (gtype == GlobType::All || gtype == GlobType::Directory) {
189  FileInfo fi;
190  fi.type = Type::Directory;
191  fi.path = file;
192  fi.size = 0;
193  filelist.emplace_back(fi);
194  }
195  else if (gtype == GlobType::File) {
196  SysGlobWalkRecursive(file, filelist);
197  }
198  }
199  }
200 }
201 
202 /******************************************************************************/
203 
204 /*!
205  * Represents a POSIX system file via its file descriptor.
206  */
207 class SysFile final : public virtual ReadStream, public virtual WriteStream
208 {
209  static constexpr bool debug = false;
210 
211 public:
212  //! default constructor
213  SysFile() : fd_(-1) { }
214 
215  //! constructor: use OpenForRead or OpenForWrite.
216  explicit SysFile(int fd, int pid = 0) noexcept
217  : fd_(fd), pid_(pid) { }
218 
219  //! non-copyable: delete copy-constructor
220  SysFile(const SysFile&) = delete;
221  //! non-copyable: delete assignment operator
222  SysFile& operator = (const SysFile&) = delete;
223  //! move-constructor
224  SysFile(SysFile&& f) noexcept
225  : fd_(f.fd_), pid_(f.pid_) {
226  f.fd_ = -1, f.pid_ = 0;
227  }
228  //! move-assignment
229  SysFile& operator = (SysFile&& f) {
230  close();
231  fd_ = f.fd_, pid_ = f.pid_;
232  f.fd_ = -1, f.pid_ = 0;
233  return *this;
234  }
235 
236  ~SysFile() {
237  close();
238  }
239 
240  //! POSIX write function.
241  ssize_t write(const void* data, size_t count) final {
242  assert(fd_ >= 0);
243 #if defined(_MSC_VER)
244  return ::_write(fd_, data, static_cast<unsigned>(count));
245 #else
246  return ::write(fd_, data, count);
247 #endif
248  }
249 
250  //! POSIX read function.
251  ssize_t read(void* data, size_t count) final {
252  assert(fd_ >= 0);
253 #if defined(_MSC_VER)
254  return ::_read(fd_, data, static_cast<unsigned>(count));
255 #else
256  return ::read(fd_, data, count);
257 #endif
258  }
259 
260  //! close the file descriptor
261  void close() final;
262 
263 private:
264  //! file descriptor
265  int fd_ = -1;
266 
267 #if defined(_MSC_VER)
268  using pid_t = int;
269 #endif
270 
271  //! pid of child process to wait for
272  pid_t pid_ = 0;
273 };
274 
275 void SysFile::close() {
276  if (fd_ >= 0) {
277  sLOG << "SysFile::close(): fd" << fd_;
278  if (::close(fd_) != 0)
279  {
280  LOG1 << "SysFile::close()"
281  << " fd_=" << fd_
282  << " errno=" << errno
283  << " error=" << strerror(errno);
284  }
285  fd_ = -1;
286  }
287 #if !defined(_MSC_VER)
288  if (pid_ != 0) {
289  sLOG << "SysFile::close(): waitpid for" << pid_;
290  int status;
291  pid_t p = waitpid(pid_, &status, 0);
292  if (p != pid_) {
294  "SysFile: waitpid() failed to return child");
295  }
296  if (WIFEXITED(status)) {
297  // child program exited normally
298  if (WEXITSTATUS(status) != 0) {
300  "SysFile: child failed with return code "
301  + std::to_string(WEXITSTATUS(status)));
302  }
303  else {
304  // zero return code. good.
305  }
306  }
307  else if (WIFSIGNALED(status)) {
309  "SysFile: child killed by signal "
310  + std::to_string(WTERMSIG(status)));
311  }
312  else {
314  "SysFile: child failed with an unknown error");
315  }
316  pid_ = 0;
317  }
318 #endif
319 }
320 
321 /******************************************************************************/
322 
324  const std::string& path, const common::Range& range) {
325 
326  static constexpr bool debug = false;
327 
328  // first open the file and see if it exists at all.
329 
330  int fd = ::open(path.c_str(), O_RDONLY | O_BINARY, 0);
331  if (fd < 0) {
332  throw common::ErrnoException("Cannot open file " + path);
333  }
334 
335  // then figure out whether we need to pipe it through a decompressor.
336 
337  const char* decompressor;
338 
339  if (tlx::ends_with(path, ".xz")) {
340  decompressor = "xz";
341  }
342  else if (tlx::ends_with(path, ".lzo")) {
343  decompressor = "lzop";
344  }
345  else if (tlx::ends_with(path, ".lz4")) {
346  decompressor = "lz4";
347  }
348  else {
349  // not a compressed file
351 
352  sLOG << "SysFile::OpenForRead(): filefd" << fd;
353 
354  if (range.begin) {
355  //! POSIX lseek function from current position.
356  ::lseek(fd, range.begin, SEEK_CUR);
357  }
358 
359  return tlx::make_counting<SysFile>(fd);
360  }
361 
362 #if defined(_MSC_VER)
364  "Reading compressed files is not supported on windows, yet. "
365  "Please submit a patch.");
366 #else
367  // if decompressor: fork a child program which calls the decompressor and
368  // connect file descriptors via a pipe.
369 
370  // pipe[0] = read, pipe[1] = write
371  int pipefd[2];
372  common::MakePipe(pipefd);
373 
374  pid_t pid = fork();
375  if (pid == 0) {
376  // close read end
377  ::close(pipefd[0]);
378 
379  // replace stdin with file descriptor to file opened above.
380  dup2(fd, STDIN_FILENO);
381  ::close(fd);
382  // replace stdout with pipe going back to Thrill process
383  dup2(pipefd[1], STDOUT_FILENO);
384  ::close(pipefd[1]);
385 
386  execlp(decompressor, decompressor, "-d", nullptr);
387 
388  LOG1 << "Pipe execution failed: " << strerror(errno);
389  // close write end
390  ::close(pipefd[1]);
391  exit(-1);
392  }
393  else if (pid < 0) {
394  throw common::ErrnoException("Error creating child process");
395  }
396 
397  sLOG << "SysFile::OpenForRead(): pipefd" << pipefd[0] << "to pid" << pid;
398 
399  // close pipe write end
400  ::close(pipefd[1]);
401 
402  // close the file descriptor
403  ::close(fd);
404 
405  if (range.begin) {
406  //! POSIX lseek function from current position.
407  ::lseek(pipefd[0], range.begin, SEEK_CUR);
408  }
409 
410  return tlx::make_counting<SysFile>(pipefd[0], pid);
411 #endif
412 }
413 
415 
416  static constexpr bool debug = false;
417 
418  // first create the file and see if we can write it at all.
419 
420  int fd = ::open(path.c_str(), O_CREAT | O_WRONLY | O_BINARY, 0666);
421  if (fd < 0) {
422  throw common::ErrnoException("Cannot create file " + path);
423  }
424 
425  // then figure out whether we need to pipe it through a compressor.
426 
427  const char* compressor;
428 
429  if (tlx::ends_with(path, ".xz")) {
430  compressor = "xz";
431  }
432  else if (tlx::ends_with(path, ".lzo")) {
433  compressor = "lzop";
434  }
435  else if (tlx::ends_with(path, ".lz4")) {
436  compressor = "lz4";
437  }
438  else {
439  // not a compressed file
441 
442  sLOG << "SysFile::OpenForWrite(): filefd" << fd;
443 
444  return tlx::make_counting<SysFile>(fd);
445  }
446 
447 #if defined(_MSC_VER)
449  "Reading compressed files is not supported on windows, yet. "
450  "Please submit a patch.");
451 #else
452  // if compressor: fork a child program which calls the compressor and
453  // connect file descriptors via a pipe.
454 
455  // pipe[0] = read, pipe[1] = write
456  int pipefd[2];
457  common::MakePipe(pipefd);
458 
459  pid_t pid = fork();
460  if (pid == 0) {
461  // close write end
462  ::close(pipefd[1]);
463 
464  // replace stdin with pipe
465  dup2(pipefd[0], STDIN_FILENO);
466  ::close(pipefd[0]);
467  // replace stdout with file descriptor to file created above.
468  dup2(fd, STDOUT_FILENO);
469  ::close(fd);
470 
471  execlp(compressor, compressor, nullptr);
472 
473  LOG1 << "Pipe execution failed: " << strerror(errno);
474  // close read end
475  ::close(pipefd[0]);
476  exit(-1);
477  }
478  else if (pid < 0) {
479  throw common::ErrnoException("Error creating child process");
480  }
481 
482  sLOG << "SysFile::OpenForWrite(): pipefd" << pipefd[0] << "to pid" << pid;
483 
484  // close read end
485  ::close(pipefd[0]);
486 
487  // close file descriptor (it is used by the fork)
488  ::close(fd);
489 
490  return tlx::make_counting<SysFile>(pipefd[1], pid);
491 #endif
492 }
493 
494 } // namespace vfs
495 } // namespace thrill
496 
497 /******************************************************************************/
Type type
type of entry
Definition: file_io.hpp:59
A cross-platform file globbing library providing the ability to expand wildcards in command-line argu...
Writer object to output data to any supported URI.
Definition: file_io.hpp:135
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
Reader object from any source.
Definition: file_io.hpp:120
An Exception which is thrown on system errors.
GlobType
Type of objects to include in glob result.
Definition: file_io.hpp:99
#define LOG1
Definition: logger.hpp:28
An Exception which is thrown on system errors and contains errno information.
void PortSetCloseOnExec(int fd)
set FD_CLOEXEC on file descriptor (if possible)
Definition: porting.cpp:42
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
#define CSimpleGlob
TCHAR version dependent on if _UNICODE is defined.
void MakePipe(int out_pipefds[2])
create a pair of pipe file descriptors
Definition: porting.cpp:52
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
General information of vfs file.
Definition: file_io.hpp:57
static by_string to_string(int val)
convert to string
uint64_t size
size of file.
Definition: file_io.hpp:63
bool ends_with(const char *str, const char *match)
Checks if the given match string is located at the end of this string.
Definition: ends_with.cpp:22
struct dirent * ts_readdir(DIR *dirp)
mutex-locked readdir() call
Definition: porting.cpp:153
WriteStreamPtr SysOpenWriteStream(const std::string &path)
Open file for writing and return file descriptor.
Definition: sys_file.cpp:414
std::string path
path to file
Definition: file_io.hpp:61
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static constexpr bool debug
void SysGlob(const std::string &path, const GlobType &gtype, FileList &filelist)
Glob a path and augment the FileList with matching file names.
Definition: sys_file.cpp:144
High-performance smart pointer used as a wrapping reference counting pointer.
ReadStreamPtr SysOpenReadStream(const std::string &path, const common::Range &range)
Open file for reading and return file descriptor.
Definition: sys_file.cpp:323
List of file info and additional overall info.
Definition: file_io.hpp:79
size_t begin
begin index
Definition: math.hpp:56
#define O_BINARY
Definition: sys_file.cpp:34
static void SysGlobWalkRecursive(const std::string &path, FileList &filelist)
Definition: sys_file.cpp:55