Thrill  0.1
file_io.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/vfs/file_io.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/vfs/file_io.hpp>
13 
14 #include <thrill/common/string.hpp>
18 #include <thrill/vfs/s3_file.hpp>
19 #include <thrill/vfs/sys_file.hpp>
20 
21 #include <tlx/die.hpp>
22 #include <tlx/string/ends_with.hpp>
23 #include <tlx/string/ssprintf.hpp>
25 
26 #include <algorithm>
27 #include <string>
28 #include <vector>
29 
30 namespace thrill {
31 namespace vfs {
32 
33 /******************************************************************************/
34 
35 void Initialize() {
36  S3Initialize();
38 }
39 
40 void Deinitialize() {
43 }
44 
45 /******************************************************************************/
46 
47 bool IsCompressed(const std::string& path) {
48  return tlx::ends_with(path, ".gz") ||
49  tlx::ends_with(path, ".bz2") ||
50  tlx::ends_with(path, ".xz") ||
51  tlx::ends_with(path, ".lzo") ||
52  tlx::ends_with(path, ".lz4");
53 }
54 
55 bool IsRemoteUri(const std::string& path) {
56  return tlx::starts_with(path, "s3://") ||
57  tlx::starts_with(path, "hdfs://");
58 }
59 
60 std::ostream& operator << (std::ostream& os, const Type& t) {
61  switch (t) {
62  case Type::File:
63  return os << "File";
64  case Type::Directory:
65  return os << "Directory";
66  default:
67  return os << "Invalid";
68  }
69 }
70 
72  size_t worker, size_t file_part) {
73 
74  static constexpr bool debug = false;
75 
76  using size_type = std::string::size_type;
77 
78  std::string out_path = pathbase;
79 
80  // detect and save extension
81  std::string extension;
82  {
83  size_type slash_end = out_path.rfind('/');
84  size_type dot_end = out_path.rfind('.');
85  if (dot_end != std::string::npos &&
86  // dot is after slash
87  (slash_end == std::string::npos || slash_end < dot_end)) {
88  extension = out_path.substr(dot_end);
89  out_path.erase(dot_end);
90  }
91  }
92  {
93  // replace @
94  size_type at_end = out_path.rfind('@');
95  size_type at_begin = out_path.find_last_not_of('@', at_end);
96 
97  size_type at_length =
98  at_end != std::string::npos && at_end > at_begin
99  ? at_end - at_begin : 4;
100 
101  sLOG << "at_length" << at_length;
102  out_path.replace(at_begin + 1, at_length,
103  tlx::ssnprintf(at_length + 2, "%0*zu",
104  static_cast<int>(at_length),
105  worker));
106  }
107  {
108  // replace hash signs
109  size_type hash_end = out_path.rfind('#');
110  size_type hash_begin = out_path.find_last_not_of('#', hash_end);
111 
112  size_type hash_length =
113  hash_end != std::string::npos && hash_end > hash_begin
114  ? hash_end - hash_begin : 10;
115 
116  sLOG << "hash_length" << hash_length;
117  out_path.replace(hash_begin + 1, hash_length,
118  tlx::ssnprintf(hash_length + 2, "%0*zu",
119  static_cast<int>(hash_length),
120  file_part));
121  }
122  out_path += extension;
123  return out_path;
124 }
125 
126 /******************************************************************************/
127 
128 FileList Glob(const std::vector<std::string>& globlist, const GlobType& gtype) {
129  FileList filelist;
130 
131  // run through globs and collect files. The sub-Glob() methods must only
132  // fill in the fields "path" and "size" of FileInfo, overall stats are
133  // calculated afterwards.
134  for (const std::string& path : globlist)
135  {
136  if (tlx::starts_with(path, "file://")) {
137  // remove the file:// prefix
138  SysGlob(path.substr(7), gtype, filelist);
139  }
140  else if (tlx::starts_with(path, "s3://")) {
141  S3Glob(path, gtype, filelist);
142  }
143  else if (tlx::starts_with(path, "hdfs://")) {
144  Hdfs3Glob(path, gtype, filelist);
145  }
146  else {
147  SysGlob(path, gtype, filelist);
148  }
149  }
150 
151  // calculate exclusive prefix sum and overall stats
152 
153  filelist.contains_compressed = false;
154  filelist.contains_remote_uri = false;
155  filelist.total_size = 0;
156  uint64_t size_ex_psum = 0;
157 
158  for (FileInfo& fi : filelist)
159  {
160  uint64_t size_next = size_ex_psum + fi.size;
161  fi.size_ex_psum = size_ex_psum;
162  size_ex_psum = size_next;
163 
164  filelist.contains_compressed |= fi.IsCompressed();
165  filelist.contains_remote_uri |= fi.IsRemoteUri();
166  filelist.total_size += fi.size;
167  }
168 
169  return filelist;
170 }
171 
172 FileList Glob(const std::string& glob, const GlobType& gtype) {
173  return Glob(std::vector<std::string>{ glob }, gtype);
174 }
175 
176 /******************************************************************************/
177 
179 
181  const std::string& path, const common::Range& range) {
182 
183  ReadStreamPtr p;
184  if (tlx::starts_with(path, "file://")) {
185  p = SysOpenReadStream(path.substr(7), range);
186  }
187  else if (tlx::starts_with(path, "s3://")) {
188  p = S3OpenReadStream(path, range);
189  }
190  else if (tlx::starts_with(path, "hdfs://")) {
191  p = Hdfs3OpenReadStream(path, range);
192  }
193  else {
194  p = SysOpenReadStream(path, range);
195  }
196 
197  if (tlx::ends_with(path, ".gz")) {
198  p = MakeGZipReadFilter(p);
199  die_unless(range.begin == 0 || "Cannot seek in compressed streams.");
200  }
201  else if (tlx::ends_with(path, ".bz2")) {
202  p = MakeBZip2ReadFilter(p);
203  die_unless(range.begin == 0 || "Cannot seek in compressed streams.");
204  }
205 
206  return p;
207 }
208 
210 
212 
213  WriteStreamPtr p;
214  if (tlx::starts_with(path, "file://")) {
215  p = SysOpenWriteStream(path.substr(7));
216  }
217  else if (tlx::starts_with(path, "s3://")) {
218  p = S3OpenWriteStream(path);
219  }
220  else if (tlx::starts_with(path, "hdfs://")) {
221  p = Hdfs3OpenWriteStream(path);
222  }
223  else {
224  p = SysOpenWriteStream(path);
225  }
226 
227  if (tlx::ends_with(path, ".gz")) {
228  p = MakeGZipWriteFilter(p);
229  }
230  else if (tlx::ends_with(path, ".bz2")) {
231  p = MakeBZip2WriteFilter(p);
232  }
233 
234  return p;
235 }
236 
237 } // namespace vfs
238 } // namespace thrill
239 
240 /******************************************************************************/
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
FileList Glob(const std::vector< std::string > &globlist, const GlobType &gtype)
Reads a glob path list and deliver a file list, sizes, and prefixsums (in bytes) for all matching fil...
Definition: file_io.cpp:128
uint64_t total_size
total size of files
Definition: file_io.hpp:81
#define die_unless(X)
Definition: die.hpp:27
WriteStreamPtr Hdfs3OpenWriteStream(const std::string &)
Definition: hdfs3_file.cpp:308
void Initialize()
Initialize VFS layer.
Definition: file_io.cpp:35
GlobType
Type of objects to include in glob result.
Definition: file_io.hpp:99
void S3Deinitialize()
Definition: s3_file.cpp:734
std::string FillFilePattern(const std::string &pathbase, size_t worker, size_t file_part)
Definition: file_io.cpp:71
void Hdfs3Initialize()
Definition: hdfs3_file.cpp:292
Type
VFS object type.
Definition: file_io.hpp:52
bool starts_with(const char *str, const char *match)
Checks if the given match string is located at the start of this string.
Definition: starts_with.cpp:21
ReadStreamPtr OpenReadStream(const std::string &path, const common::Range &range)
Construct reader for given path uri.
Definition: file_io.cpp:180
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
bool contains_remote_uri
whether the list contains a remote-uri file.
Definition: file_io.hpp:87
void Hdfs3Deinitialize()
Definition: hdfs3_file.cpp:295
General information of vfs file.
Definition: file_io.hpp:57
ReadStreamPtr MakeBZip2ReadFilter(const ReadStreamPtr &)
void S3Glob(const std::string &, const GlobType &, FileList &)
Definition: s3_file.cpp:737
ReadStreamPtr S3OpenReadStream(const std::string &, const common::Range &)
Definition: s3_file.cpp:742
bool ends_with(const char *str, const char *match)
Checks if the given match string is located at the end of this string.
Definition: ends_with.cpp:22
void Deinitialize()
Deinitialize VFS layer.
Definition: file_io.cpp:40
WriteStreamPtr SysOpenWriteStream(const std::string &path)
Open file for writing and return file descriptor.
Definition: sys_file.cpp:414
ReadStreamPtr MakeGZipReadFilter(const ReadStreamPtr &)
ReadStreamPtr Hdfs3OpenReadStream(const std::string &, const common::Range &)
Definition: hdfs3_file.cpp:303
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static constexpr bool debug
void SysGlob(const std::string &path, const GlobType &gtype, FileList &filelist)
Glob a path and augment the FileList with matching file names.
Definition: sys_file.cpp:144
WriteStreamPtr MakeGZipWriteFilter(const WriteStreamPtr &)
bool IsCompressed(const std::string &path)
Definition: file_io.cpp:47
High-performance smart pointer used as a wrapping reference counting pointer.
WriteStreamPtr S3OpenWriteStream(const std::string &)
Definition: s3_file.cpp:747
void S3Initialize()
Definition: s3_file.cpp:731
ReadStreamPtr SysOpenReadStream(const std::string &path, const common::Range &range)
Open file for reading and return file descriptor.
Definition: sys_file.cpp:323
List of file info and additional overall info.
Definition: file_io.hpp:79
bool IsRemoteUri(const std::string &path)
Returns true, if file at filepath is a remote uri like s3:// or hdfs://.
Definition: file_io.cpp:55
std::ostream & operator<<(std::ostream &os, const Type &t)
Definition: file_io.cpp:60
size_t begin
begin index
Definition: math.hpp:56
std::string ssnprintf(size_t max_size, const char *fmt,...)
Helper for return the result of a snprintf() call inside a std::string.
Definition: ssprintf.cpp:42
WriteStreamPtr MakeBZip2WriteFilter(const WriteStreamPtr &)
bool contains_compressed
whether the list contains a compressed file.
Definition: file_io.hpp:84
WriteStreamPtr OpenWriteStream(const std::string &path)
Definition: file_io.cpp:211
void Hdfs3Glob(const std::string &, const GlobType &, FileList &)
Definition: hdfs3_file.cpp:298