Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
file_io.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/vfs/file_io.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/vfs/file_io.hpp>
13 
14 #include <thrill/common/string.hpp>
18 #include <thrill/vfs/s3_file.hpp>
19 #include <thrill/vfs/sys_file.hpp>
20 
21 #include <tlx/die.hpp>
22 #include <tlx/string/ends_with.hpp>
24 
25 #include <algorithm>
26 #include <string>
27 #include <vector>
28 
29 namespace thrill {
30 namespace vfs {
31 
32 /******************************************************************************/
33 
34 void Initialize() {
35  S3Initialize();
37 }
38 
39 void Deinitialize() {
42 }
43 
44 /******************************************************************************/
45 
46 bool IsCompressed(const std::string& path) {
47  return tlx::ends_with(path, ".gz") ||
48  tlx::ends_with(path, ".bz2") ||
49  tlx::ends_with(path, ".xz") ||
50  tlx::ends_with(path, ".lzo") ||
51  tlx::ends_with(path, ".lz4");
52 }
53 
54 bool IsRemoteUri(const std::string& path) {
55  return tlx::starts_with(path, "s3://") ||
56  tlx::starts_with(path, "hdfs://");
57 }
58 
59 std::ostream& operator << (std::ostream& os, const Type& t) {
60  switch (t) {
61  case Type::File:
62  return os << "File";
63  case Type::Directory:
64  return os << "Directory";
65  default:
66  return os << "Invalid";
67  }
68 }
69 
71  size_t worker, size_t file_part) {
72 
73  static constexpr bool debug = false;
74 
75  using size_type = std::string::size_type;
76 
77  std::string out_path = pathbase;
78 
79  // detect and save extension
80  std::string extension;
81  {
82  size_type slash_end = out_path.rfind('/');
83  size_type dot_end = out_path.rfind('.');
84  if (dot_end != std::string::npos &&
85  // dot is after slash
86  (slash_end == std::string::npos || slash_end < dot_end)) {
87  extension = out_path.substr(dot_end);
88  out_path.erase(dot_end);
89  }
90  }
91  {
92  // replace @
93  size_type at_end = out_path.rfind('@');
94  size_type at_begin = out_path.find_last_not_of('@', at_end);
95 
96  size_type at_length =
97  at_end != std::string::npos && at_end > at_begin
98  ? at_end - at_begin : 4;
99 
100  sLOG << "at_length" << at_length;
101  out_path.replace(at_begin + 1, at_length,
102  common::str_snprintf<>(at_length + 2, "%0*zu",
103  static_cast<int>(at_length),
104  worker));
105  }
106  {
107  // replace hash signs
108  size_type hash_end = out_path.rfind('#');
109  size_type hash_begin = out_path.find_last_not_of('#', hash_end);
110 
111  size_type hash_length =
112  hash_end != std::string::npos && hash_end > hash_begin
113  ? hash_end - hash_begin : 10;
114 
115  sLOG << "hash_length" << hash_length;
116  out_path.replace(hash_begin + 1, hash_length,
117  common::str_snprintf<>(hash_length + 2, "%0*zu",
118  static_cast<int>(hash_length),
119  file_part));
120  }
121  out_path += extension;
122  return out_path;
123 }
124 
125 /******************************************************************************/
126 
127 FileList Glob(const std::vector<std::string>& globlist, const GlobType& gtype) {
128  FileList filelist;
129 
130  // run through globs and collect files. The sub-Glob() methods must only
131  // fill in the fields "path" and "size" of FileInfo, overall stats are
132  // calculated afterwards.
133  for (const std::string& path : globlist)
134  {
135  if (tlx::starts_with(path, "file://")) {
136  // remove the file:// prefix
137  SysGlob(path.substr(7), gtype, filelist);
138  }
139  else if (tlx::starts_with(path, "s3://")) {
140  S3Glob(path, gtype, filelist);
141  }
142  else if (tlx::starts_with(path, "hdfs://")) {
143  Hdfs3Glob(path, gtype, filelist);
144  }
145  else {
146  SysGlob(path, gtype, filelist);
147  }
148  }
149 
150  // calculate exclusive prefix sum and overall stats
151 
152  filelist.contains_compressed = false;
153  filelist.contains_remote_uri = false;
154  filelist.total_size = 0;
155  uint64_t size_ex_psum = 0;
156 
157  for (FileInfo& fi : filelist)
158  {
159  uint64_t size_next = size_ex_psum + fi.size;
160  fi.size_ex_psum = size_ex_psum;
161  size_ex_psum = size_next;
162 
163  filelist.contains_compressed |= fi.IsCompressed();
164  filelist.contains_remote_uri |= fi.IsRemoteUri();
165  filelist.total_size += fi.size;
166  }
167 
168  return filelist;
169 }
170 
171 FileList Glob(const std::string& glob, const GlobType& gtype) {
172  return Glob(std::vector<std::string>{ glob }, gtype);
173 }
174 
175 /******************************************************************************/
176 
178 
180  const std::string& path, const common::Range& range) {
181 
182  ReadStreamPtr p;
183  if (tlx::starts_with(path, "file://")) {
184  p = SysOpenReadStream(path.substr(7), range);
185  }
186  else if (tlx::starts_with(path, "s3://")) {
187  p = S3OpenReadStream(path, range);
188  }
189  else if (tlx::starts_with(path, "hdfs://")) {
190  p = Hdfs3OpenReadStream(path, range);
191  }
192  else {
193  p = SysOpenReadStream(path, range);
194  }
195 
196  if (tlx::ends_with(path, ".gz")) {
197  p = MakeGZipReadFilter(p);
198  die_unless(range.begin == 0 || "Cannot seek in compressed streams.");
199  }
200  else if (tlx::ends_with(path, ".bz2")) {
201  p = MakeBZip2ReadFilter(p);
202  die_unless(range.begin == 0 || "Cannot seek in compressed streams.");
203  }
204 
205  return p;
206 }
207 
209 
211 
212  WriteStreamPtr p;
213  if (tlx::starts_with(path, "file://")) {
214  p = SysOpenWriteStream(path.substr(7));
215  }
216  else if (tlx::starts_with(path, "s3://")) {
217  p = S3OpenWriteStream(path);
218  }
219  else if (tlx::starts_with(path, "hdfs://")) {
220  p = Hdfs3OpenWriteStream(path);
221  }
222  else {
223  p = SysOpenWriteStream(path);
224  }
225 
226  if (tlx::ends_with(path, ".gz")) {
227  p = MakeGZipWriteFilter(p);
228  }
229  else if (tlx::ends_with(path, ".bz2")) {
230  p = MakeBZip2WriteFilter(p);
231  }
232 
233  return p;
234 }
235 
236 } // namespace vfs
237 } // namespace thrill
238 
239 /******************************************************************************/
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
FileList Glob(const std::vector< std::string > &globlist, const GlobType &gtype)
Reads a glob path list and deliver a file list, sizes, and prefixsums (in bytes) for all matching fil...
Definition: file_io.cpp:127
uint64_t total_size
total size of files
Definition: file_io.hpp:81
#define die_unless(X)
Definition: die.hpp:27
WriteStreamPtr Hdfs3OpenWriteStream(const std::string &)
Definition: hdfs3_file.cpp:308
void Initialize()
Initialize VFS layer.
Definition: file_io.cpp:34
GlobType
Type of objects to include in glob result.
Definition: file_io.hpp:99
void S3Deinitialize()
Definition: s3_file.cpp:734
std::string FillFilePattern(const std::string &pathbase, size_t worker, size_t file_part)
Definition: file_io.cpp:70
void Hdfs3Initialize()
Definition: hdfs3_file.cpp:292
Type
VFS object type.
Definition: file_io.hpp:52
ReadStreamPtr OpenReadStream(const std::string &path, const common::Range &range)
Construct reader for given path uri.
Definition: file_io.cpp:179
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
bool contains_remote_uri
whether the list contains a remote-uri file.
Definition: file_io.hpp:87
void Hdfs3Deinitialize()
Definition: hdfs3_file.cpp:295
General information of vfs file.
Definition: file_io.hpp:57
ReadStreamPtr MakeBZip2ReadFilter(const ReadStreamPtr &)
void S3Glob(const std::string &, const GlobType &, FileList &)
Definition: s3_file.cpp:737
ReadStreamPtr S3OpenReadStream(const std::string &, const common::Range &)
Definition: s3_file.cpp:742
void Deinitialize()
Deinitialize VFS layer.
Definition: file_io.cpp:39
WriteStreamPtr SysOpenWriteStream(const std::string &path)
Open file for writing and return file descriptor.
Definition: sys_file.cpp:413
ReadStreamPtr MakeGZipReadFilter(const ReadStreamPtr &)
ReadStreamPtr Hdfs3OpenReadStream(const std::string &, const common::Range &)
Definition: hdfs3_file.cpp:303
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static constexpr bool debug
void SysGlob(const std::string &path, const GlobType &gtype, FileList &filelist)
Glob a path and augment the FileList with matching file names.
Definition: sys_file.cpp:143
WriteStreamPtr MakeGZipWriteFilter(const WriteStreamPtr &)
bool IsCompressed(const std::string &path)
Definition: file_io.cpp:46
High-performance smart pointer used as a wrapping reference counting pointer.
WriteStreamPtr S3OpenWriteStream(const std::string &)
Definition: s3_file.cpp:747
void S3Initialize()
Definition: s3_file.cpp:731
ReadStreamPtr SysOpenReadStream(const std::string &path, const common::Range &range)
Open file for reading and return file descriptor.
Definition: sys_file.cpp:322
List of file info and additional overall info.
Definition: file_io.hpp:79
bool IsRemoteUri(const std::string &path)
Returns true, if file at filepath is a remote uri like s3:// or hdfs://.
Definition: file_io.cpp:54
std::ostream & operator<<(std::ostream &os, const Type &t)
Definition: file_io.cpp:59
size_t begin
begin index
Definition: math.hpp:56
bool starts_with(const std::string &str, const std::string &match)
Checks if the given match string is located at the start of this string.
Definition: starts_with.cpp:19
WriteStreamPtr MakeBZip2WriteFilter(const WriteStreamPtr &)
bool contains_compressed
whether the list contains a compressed file.
Definition: file_io.hpp:84
WriteStreamPtr OpenWriteStream(const std::string &path)
Definition: file_io.cpp:210
void Hdfs3Glob(const std::string &, const GlobType &, FileList &)
Definition: hdfs3_file.cpp:298
bool ends_with(const std::string &str, const std::string &match)
Checks if the given match string is located at the end of this string.
Definition: ends_with.cpp:20