28 const char * Z_ERROR_to_string(
int err) {
34 return "Z_STREAM_END";
40 return "Z_STREAM_ERROR";
42 return "Z_DATA_ERROR";
48 return "Z_VERSION_ERROR";
57 class GZipWriteFilter final :
public virtual WriteStream
62 memset(&z_stream_, 0,
sizeof(z_stream_));
65 int window_size = 15 + 16;
66 int err = deflateInit2(&z_stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
72 buffer_.resize(2 * 1024 * 1024);
73 z_stream_.next_out = buffer_.data();
74 z_stream_.avail_out =
static_cast<uInt
>(buffer_.size());
83 ssize_t write(
const void* data,
const size_t size)
final {
86 z_stream_.next_in =
const_cast<Bytef*
>(
87 reinterpret_cast<const Bytef*
>(data));
88 z_stream_.avail_in = size;
92 err = deflate(&z_stream_, Z_NO_FLUSH);
94 if (err == Z_OK && z_stream_.avail_in != 0)
97 buffer_.size() - z_stream_.avail_out;
100 output_->write(buffer_.data(), written_size);
102 z_stream_.next_out = buffer_.data();
103 z_stream_.avail_out = buffer_.size();
106 while (z_stream_.avail_in != 0 && err == Z_OK);
114 if (!initialized_)
return;
120 err = deflate(&z_stream_, Z_FINISH);
122 if (err == Z_OK && z_stream_.avail_in != 0)
125 buffer_.size() - z_stream_.avail_out;
128 output_->write(buffer_.data(), written_size);
130 z_stream_.next_out = buffer_.data();
131 z_stream_.avail_out = buffer_.size();
137 uInt written_size = buffer_.size() - z_stream_.avail_out;
138 output_->write(buffer_.data(), written_size);
142 deflateEnd(&z_stream_);
143 initialized_ =
false;
154 std::vector<Bytef> buffer_;
162 return tlx::make_counting<GZipWriteFilter>(stream);
168 class GZipReadFilter :
public virtual ReadStream
170 static constexpr
bool debug =
false;
175 memset(&z_stream_, 0,
sizeof(z_stream_));
178 int window_size = 15 + 32;
179 err_ = inflateInit2(&z_stream_, window_size);
183 buffer_.resize(2 * 1024 * 1024);
184 z_stream_.next_in = buffer_.data();
185 z_stream_.avail_in = 0;
194 ssize_t read(
void* data,
size_t size)
final {
195 z_stream_.next_out =
const_cast<Bytef*
>(
196 reinterpret_cast<const Bytef*
>(data));
197 z_stream_.avail_out = size;
201 if (z_stream_.avail_in == 0) {
203 z_stream_.avail_in = input_->read(
204 buffer_.data(), buffer_.size());
205 z_stream_.next_in = buffer_.data();
207 if (z_stream_.avail_in == 0) {
208 return size - z_stream_.avail_out;
212 if (err_ == Z_STREAM_END) {
213 LOG <<
"GZipReadFilter: inflateReset()";
214 inflateReset(&z_stream_);
217 err_ = inflate(&z_stream_, Z_SYNC_FLUSH);
218 }
while ((err_ == Z_OK || err_ == Z_STREAM_END) &&
219 z_stream_.avail_out != 0);
221 if (err_ != Z_OK && err_ != Z_STREAM_END) {
222 die(
"GZipReadFilter: " << Z_ERROR_to_string(err_) <<
232 if (!initialized_)
return;
234 inflateEnd(&z_stream_);
237 initialized_ =
false;
251 std::vector<Bytef> buffer_;
259 return tlx::make_counting<GZipReadFilter>(stream);
264 #else // !THRILL_HAVE_ZLIB 267 die(
".gz decompression is not available, " 268 "because Thrill was built without zlib.");
272 die(
".gz decompression is not available, " 273 "because Thrill was built without zlib.");
tlx::CountingPtr< WriteStream > WriteStreamPtr
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
#define die_unequal(X, Y)
ReadStreamPtr MakeGZipReadFilter(const ReadStreamPtr &)
static constexpr bool debug
WriteStreamPtr MakeGZipWriteFilter(const WriteStreamPtr &)
tlx::CountingPtr< ReadStream > ReadStreamPtr
#define LOG
Default logging method: output if the local debug variable is true.