Thrill  0.1
s3_file.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/vfs/s3_file.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  * Copyright (C) 2017 Tim Zeitz <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/vfs/s3_file.hpp>
13 
14 #include <thrill/common/logger.hpp>
15 #include <thrill/common/string.hpp>
16 
17 #include <tlx/die.hpp>
18 #include <tlx/string/split.hpp>
20 
21 #if THRILL_HAVE_LIBS3
22 #include <libs3.h>
23 #endif
24 
25 #include <algorithm>
26 #include <limits>
27 #include <string>
28 #include <utility>
29 #include <vector>
30 
31 namespace thrill {
32 namespace vfs {
33 
34 #if THRILL_HAVE_LIBS3
35 
36 // flag to output debug info from S3
37 static constexpr bool debug = false;
38 
39 /******************************************************************************/
40 
41 void S3Initialize() {
42  S3_initialize(
43  /* userAgentInfo */ nullptr, S3_INIT_ALL,
44  /* defaultS3Hostname */ nullptr);
45 }
46 
47 void S3Deinitialize() {
48  S3_deinitialize();
49 }
50 
51 /******************************************************************************/
52 
53 //! Generic S3 error logger
54 void LibS3LogError(S3Status status, const S3ErrorDetails* error) {
55 
56  if (status != S3StatusOK) {
57  LOG1 << "S3-ERROR - Status: " << S3_get_status_name(status);
58  }
59 
60  if (error != nullptr && error->message != nullptr) {
61  LOG1 << "S3-ERROR - Message: " << error->message;
62  }
63  if (error != nullptr && error->resource != nullptr) {
64  LOG1 << "S3-ERROR - Resource: " << error->resource;
65  }
66  if (error != nullptr && error->furtherDetails != nullptr) {
67  LOG1 << "S3-ERROR - Further Details: " << error->furtherDetails;
68  }
69  if (error != nullptr && error->extraDetailsCount != 0) {
70  LOG1 << "S3-ERROR - Extra Details:";
71  for (int i = 0; i < error->extraDetailsCount; i++) {
72  LOG1 << "S3-ERROR - - " << error->extraDetails[i].name
73  << ": " << error->extraDetails[i].value;
74  }
75  }
76 }
77 
78 //! Generic logger which outputs S3 properties
79 S3Status ResponsePropertiesCallback(
80  const S3ResponseProperties* properties, void* /* cookie */) {
81 
82  if (!debug) return S3StatusOK;
83 
84  if (properties->contentType != nullptr)
85  LOG1 << "S3-DEBUG - Content-Type: " << properties->contentType;
86  if (properties->requestId != nullptr)
87  LOG1 << "S3-DEBUG - Request-Id: " << properties->requestId;
88  if (properties->requestId2 != nullptr)
89  LOG1 << "S3-DEBUG - Request-Id-2: " << properties->requestId2;
90  if (properties->contentLength > 0)
91  LOG1 << "S3-DEBUG - Content-Length: " << properties->contentLength;
92  if (properties->server != nullptr)
93  LOG1 << "S3-DEBUG - Server: " << properties->server;
94  if (properties->eTag != nullptr)
95  LOG1 << "S3-DEBUG - ETag: " << properties->eTag;
96  if (properties->lastModified > 0) {
97  char timebuf[256];
98  time_t t = (time_t)properties->lastModified;
99  struct tm tm;
100  memset(&tm, 0, sizeof(tm));
101  // gmtime is not thread-safe but we don't care here.
102  strftime(timebuf, sizeof(timebuf),
103  "%Y-%m-%dT%H:%M:%SZ", localtime_r(&t, &tm));
104  LOG1 << "S3-DEBUG - Last-Modified: " << timebuf;
105  }
106  for (int i = 0; i < properties->metaDataCount; i++) {
107  LOG1 << "S3-DEBUG - x-amz-meta-" << properties->metaData[i].name
108  << ": " << properties->metaData[i].value;
109  }
110 
111  return S3StatusOK;
112 }
113 
114 /******************************************************************************/
115 // Helper Methods
116 
117 //! fill in a S3BucketContext
118 static void FillS3BucketContext(S3BucketContext& bkt, const std::string& key) {
119  memset(&bkt, 0, sizeof(bkt));
120 
121  bkt.hostName = getenv("THRILL_S3_HOST");
122  bkt.bucketName = key.c_str();
123  bkt.protocol = S3ProtocolHTTPS;
124  bkt.uriStyle = S3UriStyleVirtualHost;
125  bkt.accessKeyId = getenv("THRILL_S3_KEY");
126  bkt.secretAccessKey = getenv("THRILL_S3_SECRET");
127  bkt.authRegion = getenv("THRILL_S3_REGION");
128 
129  if (bkt.accessKeyId == nullptr) {
130  LOG1 << "S3-WARNING - no key given - set environment variable THRILL_S3_KEY";
131  }
132  if (bkt.secretAccessKey == nullptr) {
133  LOG1 << "S3-WARNING - no secret given - set environment variable THRILL_S3_SECRET";
134  }
135 }
136 
137 /******************************************************************************/
138 // List Bucket Contents on S3
139 
140 class S3ListBucket
141 {
142 public:
143  bool list_bucket(const std::string& path_prefix,
144  const S3BucketContext* bucket_context,
145  const char* prefix, const char* marker = nullptr,
146  const char* delimiter = nullptr,
147  int maxkeys = std::numeric_limits<int>::max()) {
148 
149  path_prefix_ = path_prefix;
150 
151  // construct handlers
152  S3ListBucketHandler handlers;
153  memset(&handlers, 0, sizeof(handlers));
154 
155  handlers.responseHandler.propertiesCallback =
156  &ResponsePropertiesCallback;
157  handlers.responseHandler.completeCallback =
158  &S3ListBucket::ResponseCompleteCallback;
159  handlers.listBucketCallback = &S3ListBucket::ListBucketCallback;
160 
161  // loop until all keys were received
162  status_ = S3StatusOK;
163  last_marker_ = marker;
164  is_truncated_ = false;
165 
166  do {
167  S3_list_bucket(
168  bucket_context, prefix, last_marker_, delimiter, maxkeys,
169  /* request_context */ nullptr, /* timeoutMs */ 0,
170  &handlers, this);
171  } while (status_ == S3StatusOK && is_truncated_); // NOLINT
172 
173  // S3 keys are usually returned sorted, but we sort anyway
174  std::sort(filelist_.begin(), filelist_.end(),
175  [](const FileInfo& a, const FileInfo& b) {
176  return a.path < b.path;
177  });
178 
179  return (status_ == S3StatusOK);
180  }
181 
182  //! Returns filelist_
183  const std::vector<FileInfo>& filelist() const { return filelist_; }
184 
185 private:
186  //! s3://path prefix for FileInfo
187  std::string path_prefix_;
188 
189  //! vector of FileInfo results
190  std::vector<FileInfo> filelist_;
191 
192  //! status of request
193  S3Status status_ = S3StatusOK;
194 
195  //! last key seen
196  const char* last_marker_;
197 
198  //! if result is truncated, issue next request
199  bool is_truncated_ = false;
200 
201  //! completion callback, check for errors
202  void ResponseCompleteCallback(
203  S3Status status, const S3ErrorDetails* error) {
204 
205  status_ = status;
206  if (status != S3StatusOK)
207  LibS3LogError(status, error);
208  }
209 
210  //! static wrapper to call
211  static void ResponseCompleteCallback(
212  S3Status status, const S3ErrorDetails* error, void* cookie) {
213  S3ListBucket* t = reinterpret_cast<S3ListBucket*>(cookie);
214  return t->ResponseCompleteCallback(status, error);
215  }
216 
217  //! callback delivered list
218  S3Status ListBucketCallback(
219  int is_truncated, const char* /* next_marker */,
220  int contents_count, const S3ListBucketContent* contents,
221  int common_prefixes_count, const char** common_prefixes) {
222  for (int i = 0; i < contents_count; ++i) {
223  FileInfo fi;
224  fi.type = Type::File;
225  fi.path = path_prefix_ + contents[i].key;
226  fi.size = contents[i].size;
227  filelist_.emplace_back(fi);
228  }
229  for (int i = 0; i < common_prefixes_count; ++i) {
230  FileInfo fi;
231  fi.type = Type::Directory;
232  fi.path = path_prefix_ + common_prefixes[i];
233  fi.size = 0;
234  filelist_.emplace_back(fi);
235  }
236  last_marker_ = contents[contents_count - 1].key;
237  is_truncated_ = (is_truncated != 0);
238  return S3StatusOK;
239  }
240 
241  //! static wrapper to call ListBucketCallback
242  static S3Status ListBucketCallback(
243  int is_truncated, const char* next_marker, int contents_count,
244  const S3ListBucketContent* contents, int common_prefixes_count,
245  const char** common_prefixes, void* cookie) {
246  S3ListBucket* t = reinterpret_cast<S3ListBucket*>(cookie);
247  return t->ListBucketCallback(
248  is_truncated, next_marker, contents_count, contents,
249  common_prefixes_count, common_prefixes);
250  }
251 };
252 
253 void S3Glob(const std::string& _path, const GlobType& gtype,
254  FileList& filelist) {
255 
256  std::string path = _path;
257  // crop off s3://
258  die_unless(tlx::starts_with(path, "s3://"));
259  path = path.substr(5);
260 
261  // split uri into host/path
262  std::vector<std::string> splitted = tlx::split('/', path, 2);
263 
264  // construct bucket
265  S3BucketContext bkt;
266  FillS3BucketContext(bkt, splitted[0]);
267 
268  S3ListBucket list;
269  list.list_bucket(/* path_prefix */ "s3://" + splitted[0] + "/",
270  &bkt, /* prefix */ splitted[1].c_str(),
271  nullptr, /* delimiter */ "/");
272 
273  // append sorted result list
274  for (const FileInfo& fi : list.filelist())
275  {
276  if (fi.type == Type::File) {
277  if (gtype == GlobType::All || gtype == GlobType::File) {
278  filelist.emplace_back(fi);
279  }
280  }
281  else if (fi.type == Type::Directory) {
282  if (gtype == GlobType::All || gtype == GlobType::Directory) {
283  filelist.emplace_back(fi);
284  }
285  }
286  }
287 }
288 
289 /******************************************************************************/
290 // Stream Reading from S3
291 
292 class S3ReadStream : public ReadStream
293 {
294 public:
295  S3ReadStream(const std::string& bucket, const std::string& key,
296  const S3GetConditions* get_conditions,
297  uint64_t start_byte, uint64_t byte_count)
298  : bucket_(bucket), key_(key) {
299 
300  // construct bucket
301  S3BucketContext bucket_context;
302  FillS3BucketContext(bucket_context, bucket_);
303 
304  // construct handlers
305  S3GetObjectHandler handler;
306  memset(&handler, 0, sizeof(handler));
307 
308  handler.responseHandler.propertiesCallback =
309  &ResponsePropertiesCallback;
310  handler.responseHandler.completeCallback =
311  &S3ReadStream::ResponseCompleteCallback;
312  handler.getObjectDataCallback = &S3ReadStream::GetObjectDataCallback;
313 
314  // create request context
315  S3Status status = S3_create_request_context(&req_ctx_);
316  if (status != S3StatusOK || req_ctx_ == nullptr)
317  die("S3_create_request_context() failed.");
318 
319  // issue request but do not wait for data
320  S3_get_object(
321  &bucket_context, key_.c_str(), get_conditions,
322  start_byte, byte_count, /* request_context */ req_ctx_,
323  /* timeoutMs */ 0, &handler, this);
324  }
325 
326  //! simpler constructor
327  S3ReadStream(const std::string& bucket, const std::string& key,
328  uint64_t start_byte = 0, uint64_t byte_count = 0)
329  : S3ReadStream(bucket, key, /* get_conditions */ nullptr,
330  start_byte, byte_count) { }
331 
332  //! non-copyable: delete copy-constructor
333  S3ReadStream(const S3ReadStream&) = delete;
334  //! non-copyable: delete assignment operator
335  S3ReadStream& operator = (const S3ReadStream&) = delete;
336 
337  ~S3ReadStream() override {
338  close();
339  }
340 
341  ssize_t read(void* data, size_t size) final {
342  assert(req_ctx_);
343 
344  if (status_ != S3StatusOK)
345  die("S3-ERROR during read: " << S3_get_status_name(status_));
346 
347  status_ = S3StatusOK;
348  uint8_t* output_begin = reinterpret_cast<uint8_t*>(data);
349  output_ = output_begin;
350  output_end_ = output_ + size;
351 
352  // copy data from reception buffer
353  size_t wb = std::min(size, buffer_.size());
354  std::copy(buffer_.begin(), buffer_.begin() + wb, output_);
355  output_ += wb;
356  buffer_.erase(buffer_.begin(), buffer_.begin() + wb);
357 
358  // wait for more callbacks to deliver data, use select() to wait
359  int remaining_requests = 1;
360  while (status_ == S3StatusOK &&
361  output_ < output_end_ && remaining_requests)
362  {
363  // perform a select() waiting on new data
364  fd_set read_fds, write_fds, except_fds;
365  FD_ZERO(&read_fds);
366  FD_ZERO(&write_fds);
367  FD_ZERO(&except_fds);
368  int max_fd;
369 
370  S3Status status = S3_get_request_context_fdsets(
371  req_ctx_, &read_fds, &write_fds, &except_fds, &max_fd);
372  die_unless(status == S3StatusOK);
373 
374  if (max_fd != -1) {
375  int64_t timeout = S3_get_request_context_timeout(req_ctx_);
376  struct timeval tv = { timeout / 1000, (timeout % 1000) * 1000 };
377  int r = select(max_fd + 1, &read_fds, &write_fds, &except_fds,
378  /* timeout */ (timeout == -1) ? 0 : &tv);
379  die_unless(r >= 0);
380  }
381 
382  // run callbacks
383  S3_runonce_request_context(req_ctx_, &remaining_requests);
384  }
385 
386  return output_ - output_begin;
387  }
388 
389  void close() final {
390  if (req_ctx_ == nullptr) return;
391 
392  S3_destroy_request_context(req_ctx_);
393  req_ctx_ = nullptr;
394  }
395 
396 private:
397  //! request context for waiting on more data
398  S3RequestContext* req_ctx_ = nullptr;
399 
400  //! status of request
401  S3Status status_ = S3StatusOK;
402 
403  //! bucket for upload
404  std::string bucket_;
405 
406  //! bucket key for upload
407  std::string key_;
408 
409  //! reception buffer containing unneeded bytes from callback
410  std::vector<uint8_t> buffer_;
411 
412  //! output buffer for read()
413  uint8_t* output_;
414 
415  //! end of output buffer for read()
416  uint8_t* output_end_;
417 
418  /**************************************************************************/
419 
420  //! completion callback, check for errors
421  void ResponseCompleteCallback(
422  S3Status status, const S3ErrorDetails* error) {
423  status_ = status;
424 
425  if (status != S3StatusOK && status != S3StatusInterrupted)
426  LibS3LogError(status, error);
427  }
428 
429  //! static wrapper to call
430  static void ResponseCompleteCallback(
431  S3Status status, const S3ErrorDetails* error, void* cookie) {
432  S3ReadStream* t = reinterpret_cast<S3ReadStream*>(cookie);
433  return t->ResponseCompleteCallback(status, error);
434  }
435 
436  //! callback receiving data
437  S3Status GetObjectDataCallback(int bufferSize, const char* buffer) {
438 
439  // copy as much data into output_ as fits
440  size_t wb = std::min(
441  output_end_ - output_, static_cast<intptr_t>(bufferSize));
442  std::copy(buffer, buffer + wb, output_);
443  output_ += wb;
444 
445  // store remaining unneeded bytes in buffer_
446  if (wb != static_cast<uintptr_t>(bufferSize))
447  {
448  die_unless(output_ == output_end_);
449  die_unless(buffer_.empty());
450  buffer_.resize(bufferSize - wb);
451  std::copy(buffer + wb, buffer + bufferSize, buffer_.data());
452  }
453 
454  return S3StatusOK;
455  }
456 
457  //! static wrapper to call GetObjectDataCallback
458  static S3Status GetObjectDataCallback(
459  int bufferSize, const char* buffer, void* cookie) {
460  S3ReadStream* t = reinterpret_cast<S3ReadStream*>(cookie);
461  return t->GetObjectDataCallback(bufferSize, buffer);
462  }
463 };
464 
466  const std::string& path, const common::Range& range) {
467 
468  std::string path_ = path;
469  // crop off s3://
470  die_unless(tlx::starts_with(path_, "s3://"));
471  path_ = path_.substr(5);
472 
473  // split uri into host/path
474  std::vector<std::string> splitted = tlx::split('/', path_, 2);
475 
476  return tlx::make_counting<S3ReadStream>(
477  splitted[0], splitted[1],
478  /* start_byte */ range.begin,
479  /* byte_count */ range.end == 0 ? 0 : range.size());
480 }
481 
482 /******************************************************************************/
483 
484 class S3WriteStream : public WriteStream
485 {
486 public:
487  S3WriteStream(const std::string& bucket, const std::string& key,
488  S3PutProperties* put_properties = nullptr)
489  : bucket_(bucket), key_(key),
490  put_properties_(put_properties) {
491 
492  S3BucketContext bucket_context;
493  FillS3BucketContext(bucket_context, bucket);
494 
495  // construct handlers
496  S3MultipartInitialHandler handler;
497  memset(&handler, 0, sizeof(handler));
498 
499  handler.responseHandler.propertiesCallback =
500  &ResponsePropertiesCallback;
501  handler.responseHandler.completeCallback =
502  &S3WriteStream::ResponseCompleteCallback;
503  handler.responseXmlCallback =
504  &S3WriteStream::MultipartInitialResponseCallback;
505 
506  // create new multi part upload
507  S3_initiate_multipart(
508  &bucket_context, key_.c_str(), put_properties, &handler,
509  /* request_context */ nullptr, /* timeoutMs */ 0, this);
510  }
511 
512  ~S3WriteStream() override {
513  close();
514  }
515 
516  ssize_t write(const void* _data, size_t size) final {
517  const uint8_t* data = reinterpret_cast<const uint8_t*>(_data);
518 
519  while (size > 0)
520  {
521  // copy data to buffer
522  size_t buffer_pos = buffer_.size();
523  size_t wb = std::min(size, buffer_max_ - buffer_pos);
524  buffer_.resize(buffer_pos + wb);
525  std::copy(data, data + wb, buffer_.data() + buffer_pos);
526  data += wb;
527  size -= wb;
528 
529  if (buffer_.size() >= buffer_max_)
530  UploadMultipart();
531  }
532 
533  return size;
534  }
535 
536  void close() final {
537  if (upload_id_.empty()) return;
538 
539  // upload last multipart piece
540  if (!buffer_.empty())
541  UploadMultipart();
542 
543  LOG1 << "commit multipart";
544 
545  // construct commit XML
546 
547  std::ostringstream xml;
548  xml << "<CompleteMultipartUpload>";
549  for (size_t i = 0; i < part_etag_.size(); ++i) {
550  xml << "<Part>"
551  << "<PartNumber>" << (i + 1) << "</PartNumber>"
552  << "<ETag>" << part_etag_[i] << "</ETag>"
553  << "</Part>";
554  }
555  xml << "</CompleteMultipartUpload>";
556 
557  // put commit message into buffer_
558  std::string xml_str = xml.str();
559  upload_ = reinterpret_cast<const uint8_t*>(xml_str.data());
560  upload_end_ = upload_ + xml_str.size();
561 
562  S3BucketContext bucket_context;
563  FillS3BucketContext(bucket_context, bucket_);
564 
565  // construct handlers
566  S3MultipartCommitHandler handler;
567  memset(&handler, 0, sizeof(handler));
568 
569  handler.responseHandler.propertiesCallback =
570  &ResponsePropertiesCallback;
571  handler.responseHandler.completeCallback =
572  &S3WriteStream::ResponseCompleteCallback;
573  handler.putObjectDataCallback =
574  &S3WriteStream::PutObjectDataCallback;
575  handler.responseXmlCallback =
576  &S3WriteStream::MultipartCommitResponseCallback;
577 
578  // synchronous upload of multi part data
579  S3_complete_multipart_upload(
580  &bucket_context, key_.c_str(), &handler, upload_id_.c_str(),
581  /* content_length */ xml_str.size(),
582  /* request_context */ nullptr, /* timeoutMs */ 0, this);
583 
584  upload_id_.clear();
585  }
586 
587 private:
588  //! status of request
589  S3Status status_ = S3StatusOK;
590 
591  //! bucket for upload
592  std::string bucket_;
593 
594  //! bucket key for upload
595  std::string key_;
596 
597  //! put properties
598  S3PutProperties* put_properties_;
599 
600  //! unique identifier for multi part upload
601  std::string upload_id_;
602 
603  //! sequence number of uploads
604  int upload_seq_ = 1;
605 
606  //! block size to upload as multi part
607  size_t buffer_max_ = 16 * 1024 * 1024;
608 
609  //! output buffer, if this grows to 16 MiB a part upload is initiated.
610  std::vector<uint8_t> buffer_;
611 
612  //! current upload position in buffer_ or other memory areas
613  const uint8_t* upload_;
614 
615  //! end position of upload area
616  const uint8_t* upload_end_;
617 
618  //! list of ETags of uploaded multiparts
619  std::vector<std::string> part_etag_;
620 
621  /**************************************************************************/
622 
623  //! completion callback, check for errors
624  void ResponseCompleteCallback(
625  S3Status status, const S3ErrorDetails* error) {
626  status_ = status;
627 
628  if (status != S3StatusOK)
629  LibS3LogError(status, error);
630  }
631 
632  //! static wrapper to call
633  static void ResponseCompleteCallback(
634  S3Status status, const S3ErrorDetails* error, void* cookie) {
635  S3WriteStream* t = reinterpret_cast<S3WriteStream*>(cookie);
636  return t->ResponseCompleteCallback(status, error);
637  }
638 
639  //! initiation of multipart uploads
640  static S3Status MultipartInitialResponseCallback(
641  const char* upload_id, void* cookie) {
642  S3WriteStream* t = reinterpret_cast<S3WriteStream*>(cookie);
643  t->upload_id_ = upload_id;
644  return S3StatusOK;
645  }
646 
647  static S3Status MultipartCommitResponseCallback(
648  const char* /* location */, const char* /* etag */,
649  void* /* cookie */) {
650  // could save the parameters if we need them.
651  return S3StatusOK;
652  }
653 
654  /**************************************************************************/
655 
656  void UploadMultipart() {
657  LOG1 << "S3-INFO - Upload multipart[" << upload_seq_ << "]"
658  << " size " << buffer_.size();
659 
660  S3BucketContext bucket_context;
661  FillS3BucketContext(bucket_context, bucket_);
662 
663  // construct handlers
664  S3PutObjectHandler handler;
665  memset(&handler, 0, sizeof(handler));
666 
667  handler.responseHandler.propertiesCallback =
668  &S3WriteStream::MultipartPropertiesCallback;
669  handler.responseHandler.completeCallback =
670  &S3WriteStream::ResponseCompleteCallback;
671  handler.putObjectDataCallback =
672  &S3WriteStream::PutObjectDataCallback;
673 
674  // synchronous upload of multi part data
675  upload_ = buffer_.data();
676  upload_end_ = buffer_.data() + buffer_.size();
677  S3_upload_part(&bucket_context, key_.c_str(), put_properties_,
678  &handler, upload_seq_++, upload_id_.c_str(),
679  /* partContentLength */ buffer_.size(),
680  /* request_context */ nullptr,
681  /* timeoutMs */ 0, this);
682 
683  buffer_.clear();
684  }
685 
686  S3Status MultipartPropertiesCallback(
687  const S3ResponseProperties* properties) {
688 
689  part_etag_.emplace_back(properties->eTag);
690  // output properties
691  ResponsePropertiesCallback(properties, nullptr);
692  return S3StatusOK;
693  }
694 
695  static S3Status MultipartPropertiesCallback(
696  const S3ResponseProperties* properties, void* cookie) {
697  S3WriteStream* t = reinterpret_cast<S3WriteStream*>(cookie);
698  return t->MultipartPropertiesCallback(properties);
699  }
700 
701  int PutObjectDataCallback(int bufferSize, char* buffer) {
702  size_t wb = std::min(
703  static_cast<intptr_t>(bufferSize), upload_end_ - upload_);
704  std::copy(upload_, upload_ + wb, buffer);
705  upload_ += wb;
706  return wb;
707  }
708 
709  static int PutObjectDataCallback(
710  int bufferSize, char* buffer, void* cookie) {
711  S3WriteStream* t = reinterpret_cast<S3WriteStream*>(cookie);
712  return t->PutObjectDataCallback(bufferSize, buffer);
713  }
714 };
715 
717 
718  std::string path_ = path;
719  // crop off s3://
720  die_unless(tlx::starts_with(path_, "s3://"));
721  path_ = path_.substr(5);
722 
723  // split uri into host/path
724  std::vector<std::string> splitted = tlx::split('/', path_, 2);
725 
726  return tlx::make_counting<S3WriteStream>(splitted[0], splitted[1]);
727 }
728 
729 #else // !THRILL_HAVE_LIBS3
730 
732 { }
733 
735 { }
736 
737 void S3Glob(const std::string& /* path */, const GlobType& /* gtype */,
738  FileList& /* filelist */) {
739  die("s3:// is not available, because Thrill was built without libS3.");
740 }
741 
743  const std::string& /* path */, const common::Range& /* range */) {
744  die("s3:// is not available, because Thrill was built without libS3.");
745 }
746 
748  die("s3:// is not available, because Thrill was built without libS3.");
749 }
750 
751 #endif // !THRILL_HAVE_LIBS3
752 
753 } // namespace vfs
754 } // namespace thrill
755 
756 /******************************************************************************/
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
#define die_unless(X)
Definition: die.hpp:27
tlx::CountingPtr< WriteStream > WriteStreamPtr
Definition: file_io.hpp:146
GlobType
Type of objects to include in glob result.
Definition: file_io.hpp:99
#define LOG1
Definition: logger.hpp:28
void S3Deinitialize()
Definition: s3_file.cpp:734
bool starts_with(const char *str, const char *match)
Checks if the given match string is located at the start of this string.
Definition: starts_with.cpp:21
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
void S3Glob(const std::string &, const GlobType &, FileList &)
Definition: s3_file.cpp:737
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
Definition: split.cpp:20
ReadStreamPtr S3OpenReadStream(const std::string &, const common::Range &)
Definition: s3_file.cpp:742
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static constexpr bool debug
High-performance smart pointer used as a wrapping reference counting pointer.
WriteStreamPtr S3OpenWriteStream(const std::string &)
Definition: s3_file.cpp:747
void S3Initialize()
Definition: s3_file.cpp:731
tlx::CountingPtr< ReadStream > ReadStreamPtr
Definition: file_io.hpp:145
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
List of file info and additional overall info.
Definition: file_io.hpp:79