37 static constexpr
bool debug =
false;
54 void LibS3LogError(S3Status status,
const S3ErrorDetails* error) {
56 if (status != S3StatusOK) {
57 LOG1 <<
"S3-ERROR - Status: " << S3_get_status_name(status);
60 if (error !=
nullptr && error->message !=
nullptr) {
61 LOG1 <<
"S3-ERROR - Message: " << error->message;
63 if (error !=
nullptr && error->resource !=
nullptr) {
64 LOG1 <<
"S3-ERROR - Resource: " << error->resource;
66 if (error !=
nullptr && error->furtherDetails !=
nullptr) {
67 LOG1 <<
"S3-ERROR - Further Details: " << error->furtherDetails;
69 if (error !=
nullptr && error->extraDetailsCount != 0) {
70 LOG1 <<
"S3-ERROR - Extra Details:";
71 for (
int i = 0; i < error->extraDetailsCount; i++) {
72 LOG1 <<
"S3-ERROR - - " << error->extraDetails[i].name
73 <<
": " << error->extraDetails[i].value;
79 S3Status ResponsePropertiesCallback(
80 const S3ResponseProperties* properties,
void* ) {
82 if (!debug)
return S3StatusOK;
84 if (properties->contentType !=
nullptr)
85 LOG1 <<
"S3-DEBUG - Content-Type: " << properties->contentType;
86 if (properties->requestId !=
nullptr)
87 LOG1 <<
"S3-DEBUG - Request-Id: " << properties->requestId;
88 if (properties->requestId2 !=
nullptr)
89 LOG1 <<
"S3-DEBUG - Request-Id-2: " << properties->requestId2;
90 if (properties->contentLength > 0)
91 LOG1 <<
"S3-DEBUG - Content-Length: " << properties->contentLength;
92 if (properties->server !=
nullptr)
93 LOG1 <<
"S3-DEBUG - Server: " << properties->server;
94 if (properties->eTag !=
nullptr)
95 LOG1 <<
"S3-DEBUG - ETag: " << properties->eTag;
96 if (properties->lastModified > 0) {
98 time_t t = (time_t)properties->lastModified;
100 memset(&tm, 0,
sizeof(tm));
102 strftime(timebuf,
sizeof(timebuf),
103 "%Y-%m-%dT%H:%M:%SZ", localtime_r(&t, &tm));
104 LOG1 <<
"S3-DEBUG - Last-Modified: " << timebuf;
106 for (
int i = 0; i < properties->metaDataCount; i++) {
107 LOG1 <<
"S3-DEBUG - x-amz-meta-" << properties->metaData[i].name
108 <<
": " << properties->metaData[i].value;
118 static void FillS3BucketContext(S3BucketContext& bkt,
const std::string& key) {
119 memset(&bkt, 0,
sizeof(bkt));
121 bkt.hostName = getenv(
"THRILL_S3_HOST");
122 bkt.bucketName = key.c_str();
123 bkt.protocol = S3ProtocolHTTPS;
124 bkt.uriStyle = S3UriStyleVirtualHost;
125 bkt.accessKeyId = getenv(
"THRILL_S3_KEY");
126 bkt.secretAccessKey = getenv(
"THRILL_S3_SECRET");
127 bkt.authRegion = getenv(
"THRILL_S3_REGION");
129 if (bkt.accessKeyId ==
nullptr) {
130 LOG1 <<
"S3-WARNING - no key given - set environment variable THRILL_S3_KEY";
132 if (bkt.secretAccessKey ==
nullptr) {
133 LOG1 <<
"S3-WARNING - no secret given - set environment variable THRILL_S3_SECRET";
144 const S3BucketContext* bucket_context,
145 const char* prefix,
const char* marker =
nullptr,
146 const char* delimiter =
nullptr,
149 path_prefix_ = path_prefix;
152 S3ListBucketHandler handlers;
153 memset(&handlers, 0,
sizeof(handlers));
155 handlers.responseHandler.propertiesCallback =
156 &ResponsePropertiesCallback;
157 handlers.responseHandler.completeCallback =
158 &S3ListBucket::ResponseCompleteCallback;
159 handlers.listBucketCallback = &S3ListBucket::ListBucketCallback;
162 status_ = S3StatusOK;
163 last_marker_ = marker;
164 is_truncated_ =
false;
168 bucket_context, prefix, last_marker_, delimiter, maxkeys,
171 }
while (status_ == S3StatusOK && is_truncated_);
174 std::sort(filelist_.begin(), filelist_.end(),
175 [](
const FileInfo& a,
const FileInfo& b) {
176 return a.path < b.path;
179 return (status_ == S3StatusOK);
183 const std::vector<FileInfo>& filelist()
const {
return filelist_; }
190 std::vector<FileInfo> filelist_;
193 S3Status status_ = S3StatusOK;
196 const char* last_marker_;
199 bool is_truncated_ =
false;
202 void ResponseCompleteCallback(
203 S3Status status,
const S3ErrorDetails* error) {
206 if (status != S3StatusOK)
207 LibS3LogError(status, error);
211 static void ResponseCompleteCallback(
212 S3Status status,
const S3ErrorDetails* error,
void* cookie) {
213 S3ListBucket* t =
reinterpret_cast<S3ListBucket*
>(cookie);
214 return t->ResponseCompleteCallback(status, error);
218 S3Status ListBucketCallback(
219 int is_truncated,
const char* ,
220 int contents_count,
const S3ListBucketContent* contents,
221 int common_prefixes_count,
const char** common_prefixes) {
222 for (
int i = 0; i < contents_count; ++i) {
225 fi.path = path_prefix_ + contents[i].key;
226 fi.size = contents[i].size;
227 filelist_.emplace_back(fi);
229 for (
int i = 0; i < common_prefixes_count; ++i) {
232 fi.path = path_prefix_ + common_prefixes[i];
234 filelist_.emplace_back(fi);
236 last_marker_ = contents[contents_count - 1].key;
237 is_truncated_ = (is_truncated != 0);
242 static S3Status ListBucketCallback(
243 int is_truncated,
const char* next_marker,
int contents_count,
244 const S3ListBucketContent* contents,
int common_prefixes_count,
245 const char** common_prefixes,
void* cookie) {
246 S3ListBucket* t =
reinterpret_cast<S3ListBucket*
>(cookie);
247 return t->ListBucketCallback(
248 is_truncated, next_marker, contents_count, contents,
249 common_prefixes_count, common_prefixes);
254 FileList& filelist) {
259 path = path.substr(5);
262 std::vector<std::string> splitted =
tlx::split(
'/', path, 2);
266 FillS3BucketContext(bkt, splitted[0]);
269 list.list_bucket(
"s3://" + splitted[0] +
"/",
270 &bkt, splitted[1].c_str(),
274 for (
const FileInfo& fi : list.filelist())
278 filelist.emplace_back(fi);
283 filelist.emplace_back(fi);
292 class S3ReadStream :
public ReadStream
296 const S3GetConditions* get_conditions,
297 uint64_t start_byte, uint64_t byte_count)
298 : bucket_(bucket), key_(key) {
301 S3BucketContext bucket_context;
302 FillS3BucketContext(bucket_context, bucket_);
305 S3GetObjectHandler handler;
306 memset(&handler, 0,
sizeof(handler));
308 handler.responseHandler.propertiesCallback =
309 &ResponsePropertiesCallback;
310 handler.responseHandler.completeCallback =
311 &S3ReadStream::ResponseCompleteCallback;
312 handler.getObjectDataCallback = &S3ReadStream::GetObjectDataCallback;
315 S3Status status = S3_create_request_context(&req_ctx_);
316 if (status != S3StatusOK || req_ctx_ ==
nullptr)
317 die(
"S3_create_request_context() failed.");
321 &bucket_context, key_.c_str(), get_conditions,
322 start_byte, byte_count, req_ctx_,
328 uint64_t start_byte = 0, uint64_t byte_count = 0)
329 : S3ReadStream(bucket, key, nullptr,
330 start_byte, byte_count) { }
333 S3ReadStream(
const S3ReadStream&) =
delete;
335 S3ReadStream& operator = (
const S3ReadStream&) =
delete;
337 ~S3ReadStream()
override {
341 ssize_t read(
void* data,
size_t size)
final {
344 if (status_ != S3StatusOK)
345 die(
"S3-ERROR during read: " << S3_get_status_name(status_));
347 status_ = S3StatusOK;
348 uint8_t* output_begin =
reinterpret_cast<uint8_t*
>(data);
349 output_ = output_begin;
350 output_end_ = output_ + size;
353 size_t wb =
std::min(size, buffer_.size());
354 std::copy(buffer_.begin(), buffer_.begin() + wb, output_);
356 buffer_.erase(buffer_.begin(), buffer_.begin() + wb);
359 int remaining_requests = 1;
360 while (status_ == S3StatusOK &&
361 output_ < output_end_ && remaining_requests)
364 fd_set read_fds, write_fds, except_fds;
367 FD_ZERO(&except_fds);
370 S3Status status = S3_get_request_context_fdsets(
371 req_ctx_, &read_fds, &write_fds, &except_fds, &max_fd);
375 int64_t timeout = S3_get_request_context_timeout(req_ctx_);
376 struct timeval tv = { timeout / 1000, (timeout % 1000) * 1000 };
377 int r = select(max_fd + 1, &read_fds, &write_fds, &except_fds,
378 (timeout == -1) ? 0 : &tv);
383 S3_runonce_request_context(req_ctx_, &remaining_requests);
386 return output_ - output_begin;
390 if (req_ctx_ ==
nullptr)
return;
392 S3_destroy_request_context(req_ctx_);
398 S3RequestContext* req_ctx_ =
nullptr;
401 S3Status status_ = S3StatusOK;
410 std::vector<uint8_t> buffer_;
416 uint8_t* output_end_;
421 void ResponseCompleteCallback(
422 S3Status status,
const S3ErrorDetails* error) {
425 if (status != S3StatusOK && status != S3StatusInterrupted)
426 LibS3LogError(status, error);
430 static void ResponseCompleteCallback(
431 S3Status status,
const S3ErrorDetails* error,
void* cookie) {
432 S3ReadStream* t =
reinterpret_cast<S3ReadStream*
>(cookie);
433 return t->ResponseCompleteCallback(status, error);
437 S3Status GetObjectDataCallback(
int bufferSize,
const char* buffer) {
441 output_end_ - output_, static_cast<intptr_t>(bufferSize));
442 std::copy(buffer, buffer + wb, output_);
446 if (wb != static_cast<uintptr_t>(bufferSize))
450 buffer_.resize(bufferSize - wb);
451 std::copy(buffer + wb, buffer + bufferSize, buffer_.data());
458 static S3Status GetObjectDataCallback(
459 int bufferSize,
const char* buffer,
void* cookie) {
460 S3ReadStream* t =
reinterpret_cast<S3ReadStream*
>(cookie);
461 return t->GetObjectDataCallback(bufferSize, buffer);
466 const std::string& path,
const common::Range& range) {
471 path_ = path_.substr(5);
474 std::vector<std::string> splitted =
tlx::split(
'/', path_, 2);
476 return tlx::make_counting<S3ReadStream>(
477 splitted[0], splitted[1],
479 range.end == 0 ? 0 : range.size());
484 class S3WriteStream :
public WriteStream
488 S3PutProperties* put_properties =
nullptr)
489 : bucket_(bucket), key_(key),
490 put_properties_(put_properties) {
492 S3BucketContext bucket_context;
493 FillS3BucketContext(bucket_context, bucket);
496 S3MultipartInitialHandler handler;
497 memset(&handler, 0,
sizeof(handler));
499 handler.responseHandler.propertiesCallback =
500 &ResponsePropertiesCallback;
501 handler.responseHandler.completeCallback =
502 &S3WriteStream::ResponseCompleteCallback;
503 handler.responseXmlCallback =
504 &S3WriteStream::MultipartInitialResponseCallback;
507 S3_initiate_multipart(
508 &bucket_context, key_.c_str(), put_properties, &handler,
512 ~S3WriteStream()
override {
516 ssize_t write(
const void* _data,
size_t size)
final {
517 const uint8_t* data =
reinterpret_cast<const uint8_t*
>(_data);
522 size_t buffer_pos = buffer_.size();
523 size_t wb =
std::min(size, buffer_max_ - buffer_pos);
524 buffer_.resize(buffer_pos + wb);
525 std::copy(data, data + wb, buffer_.data() + buffer_pos);
529 if (buffer_.size() >= buffer_max_)
537 if (upload_id_.empty())
return;
540 if (!buffer_.empty())
543 LOG1 <<
"commit multipart";
547 std::ostringstream xml;
548 xml <<
"<CompleteMultipartUpload>";
549 for (
size_t i = 0; i < part_etag_.size(); ++i) {
551 <<
"<PartNumber>" << (i + 1) <<
"</PartNumber>" 552 <<
"<ETag>" << part_etag_[i] <<
"</ETag>" 555 xml <<
"</CompleteMultipartUpload>";
559 upload_ =
reinterpret_cast<const uint8_t*
>(xml_str.data());
560 upload_end_ = upload_ + xml_str.size();
562 S3BucketContext bucket_context;
563 FillS3BucketContext(bucket_context, bucket_);
566 S3MultipartCommitHandler handler;
567 memset(&handler, 0,
sizeof(handler));
569 handler.responseHandler.propertiesCallback =
570 &ResponsePropertiesCallback;
571 handler.responseHandler.completeCallback =
572 &S3WriteStream::ResponseCompleteCallback;
573 handler.putObjectDataCallback =
574 &S3WriteStream::PutObjectDataCallback;
575 handler.responseXmlCallback =
576 &S3WriteStream::MultipartCommitResponseCallback;
579 S3_complete_multipart_upload(
580 &bucket_context, key_.c_str(), &handler, upload_id_.c_str(),
589 S3Status status_ = S3StatusOK;
598 S3PutProperties* put_properties_;
607 size_t buffer_max_ = 16 * 1024 * 1024;
610 std::vector<uint8_t> buffer_;
613 const uint8_t* upload_;
616 const uint8_t* upload_end_;
619 std::vector<std::string> part_etag_;
624 void ResponseCompleteCallback(
625 S3Status status,
const S3ErrorDetails* error) {
628 if (status != S3StatusOK)
629 LibS3LogError(status, error);
633 static void ResponseCompleteCallback(
634 S3Status status,
const S3ErrorDetails* error,
void* cookie) {
635 S3WriteStream* t =
reinterpret_cast<S3WriteStream*
>(cookie);
636 return t->ResponseCompleteCallback(status, error);
640 static S3Status MultipartInitialResponseCallback(
641 const char* upload_id,
void* cookie) {
642 S3WriteStream* t =
reinterpret_cast<S3WriteStream*
>(cookie);
643 t->upload_id_ = upload_id;
647 static S3Status MultipartCommitResponseCallback(
648 const char* ,
const char* ,
656 void UploadMultipart() {
657 LOG1 <<
"S3-INFO - Upload multipart[" << upload_seq_ <<
"]" 658 <<
" size " << buffer_.size();
660 S3BucketContext bucket_context;
661 FillS3BucketContext(bucket_context, bucket_);
664 S3PutObjectHandler handler;
665 memset(&handler, 0,
sizeof(handler));
667 handler.responseHandler.propertiesCallback =
668 &S3WriteStream::MultipartPropertiesCallback;
669 handler.responseHandler.completeCallback =
670 &S3WriteStream::ResponseCompleteCallback;
671 handler.putObjectDataCallback =
672 &S3WriteStream::PutObjectDataCallback;
675 upload_ = buffer_.data();
676 upload_end_ = buffer_.data() + buffer_.size();
677 S3_upload_part(&bucket_context, key_.c_str(), put_properties_,
678 &handler, upload_seq_++, upload_id_.c_str(),
686 S3Status MultipartPropertiesCallback(
687 const S3ResponseProperties* properties) {
689 part_etag_.emplace_back(properties->eTag);
691 ResponsePropertiesCallback(properties,
nullptr);
695 static S3Status MultipartPropertiesCallback(
696 const S3ResponseProperties* properties,
void* cookie) {
697 S3WriteStream* t =
reinterpret_cast<S3WriteStream*
>(cookie);
698 return t->MultipartPropertiesCallback(properties);
701 int PutObjectDataCallback(
int bufferSize,
char* buffer) {
703 static_cast<intptr_t>(bufferSize), upload_end_ - upload_);
704 std::copy(upload_, upload_ + wb, buffer);
709 static int PutObjectDataCallback(
710 int bufferSize,
char* buffer,
void* cookie) {
711 S3WriteStream* t =
reinterpret_cast<S3WriteStream*
>(cookie);
712 return t->PutObjectDataCallback(bufferSize, buffer);
721 path_ = path_.substr(5);
724 std::vector<std::string> splitted =
tlx::split(
'/', path_, 2);
726 return tlx::make_counting<S3WriteStream>(splitted[0], splitted[1]);
729 #else // !THRILL_HAVE_LIBS3 739 die(
"s3:// is not available, because Thrill was built without libS3.");
744 die(
"s3:// is not available, because Thrill was built without libS3.");
748 die(
"s3:// is not available, because Thrill was built without libS3.");
751 #endif // !THRILL_HAVE_LIBS3 static uint_pair max()
return an uint_pair instance containing the largest value possible
tlx::CountingPtr< WriteStream > WriteStreamPtr
GlobType
Type of objects to include in glob result.
bool starts_with(const char *str, const char *match)
Checks if the given match string is located at the start of this string.
represents a 1 dimensional range (interval) [begin,end)
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
void S3Glob(const std::string &, const GlobType &, FileList &)
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
ReadStreamPtr S3OpenReadStream(const std::string &, const common::Range &)
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
static constexpr bool debug
High-performance smart pointer used as a wrapping reference counting pointer.
WriteStreamPtr S3OpenWriteStream(const std::string &)
tlx::CountingPtr< ReadStream > ReadStreamPtr
static uint_pair min()
return an uint_pair instance containing the smallest value possible
List of file info and additional overall info.