29 class BZip2WriteFilter final :
public virtual WriteStream 34 memset(&bz_stream_, 0,
sizeof(bz_stream_));
36 int err = BZ2_bzCompressInit(
42 buffer_.resize(2 * 1024 * 1024);
43 bz_stream_.next_out = buffer_.data();
44 bz_stream_.avail_out =
static_cast<unsigned>(buffer_.size());
53 ssize_t write(
const void* data,
const size_t size)
final {
56 bz_stream_.next_in =
const_cast<char*
>(
57 reinterpret_cast<const char*
>(data));
58 bz_stream_.avail_in = size;
62 err = BZ2_bzCompress(&bz_stream_, BZ_RUN);
64 if (err == BZ_OK && bz_stream_.avail_in != 0)
66 unsigned written_size =
67 buffer_.size() - bz_stream_.avail_out;
70 output_->write(buffer_.data(), written_size);
72 bz_stream_.next_out = buffer_.data();
73 bz_stream_.avail_out = buffer_.size();
76 while (bz_stream_.avail_in != 0 && err == BZ_RUN_OK);
84 if (!initialized_)
return;
90 err = BZ2_bzCompress(&bz_stream_, BZ_FINISH);
92 if (err == BZ_FINISH_OK && bz_stream_.avail_in != 0)
94 unsigned written_size =
95 buffer_.size() - bz_stream_.avail_out;
98 output_->write(buffer_.data(), written_size);
100 bz_stream_.next_out = buffer_.data();
101 bz_stream_.avail_out = buffer_.size();
104 while (err == BZ_FINISH_OK);
109 unsigned written_size = buffer_.size() - bz_stream_.avail_out;
110 output_->write(buffer_.data(), written_size);
114 BZ2_bzCompressEnd(&bz_stream_);
115 initialized_ =
false;
123 bz_stream bz_stream_;
126 std::vector<char> buffer_;
134 return tlx::make_counting<BZip2WriteFilter>(stream);
140 class BZip2ReadFilter :
public virtual ReadStream 145 memset(&bz_stream_, 0,
sizeof(bz_stream_));
147 err_ = BZ2_bzDecompressInit(
152 buffer_.resize(2 * 1024 * 1024);
153 bz_stream_.next_in = buffer_.data();
154 bz_stream_.avail_in = 0;
163 ssize_t read(
void* data,
size_t size)
final {
164 bz_stream_.next_out =
const_cast<char*
>(
165 reinterpret_cast<const char*
>(data));
166 bz_stream_.avail_out = size;
170 if (bz_stream_.avail_in == 0) {
172 bz_stream_.avail_in = input_->read(
173 buffer_.data(), buffer_.size());
174 bz_stream_.next_in = buffer_.data();
176 if (bz_stream_.avail_in == 0) {
177 return size - bz_stream_.avail_out;
181 err_ = BZ2_bzDecompress(&bz_stream_);
183 if (err_ == BZ_STREAM_END)
184 return size - bz_stream_.avail_out;
186 while (err_ == BZ_OK && bz_stream_.avail_out != 0);
194 if (!initialized_)
return;
196 BZ2_bzDecompressEnd(&bz_stream_);
199 initialized_ =
false;
207 bz_stream bz_stream_;
213 std::vector<char> buffer_;
221 return tlx::make_counting<BZip2ReadFilter>(stream);
226 #else // !THRILL_HAVE_BZIP2 229 die(
".bz2 decompression is not available, " 230 "because Thrill was built without libbz2.");
234 die(
".bz2 decompression is not available, " 235 "because Thrill was built without libbz2.");
Writer object to output data to any supported URI.
Reader object from any source.
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
ReadStreamPtr MakeBZip2ReadFilter(const ReadStreamPtr &)
#define die_unequal(X, Y)
WriteStreamPtr MakeBZip2WriteFilter(const WriteStreamPtr &)