Thrill  0.1
gzip_filter.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/vfs/gzip_filter.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
12 
13 #include <tlx/die.hpp>
14 
15 #if THRILL_HAVE_ZLIB
16 #include <zlib.h>
17 #endif
18 
19 #include <vector>
20 
21 namespace thrill {
22 namespace vfs {
23 
24 #if THRILL_HAVE_ZLIB
25 
26 /******************************************************************************/
27 
28 const char * Z_ERROR_to_string(int err) {
29  switch (err)
30  {
31  case Z_OK:
32  return "Z_OK";
33  case Z_STREAM_END:
34  return "Z_STREAM_END";
35  case Z_NEED_DICT:
36  return "Z_NEED_DICT";
37  case Z_ERRNO:
38  return "Z_ERRNO";
39  case Z_STREAM_ERROR:
40  return "Z_STREAM_ERROR";
41  case Z_DATA_ERROR:
42  return "Z_DATA_ERROR";
43  case Z_MEM_ERROR:
44  return "Z_MEM_ERROR";
45  case Z_BUF_ERROR:
46  return "Z_BUF_ERROR";
47  case Z_VERSION_ERROR:
48  return "Z_VERSION_ERROR";
49  default:
50  return "UNKNOWN";
51  }
52 }
53 
54 /******************************************************************************/
55 // GZipWriteFilter - on-the-fly gzip compressor
56 
57 class GZipWriteFilter final : public virtual WriteStream
58 {
59 public:
60  explicit GZipWriteFilter(const WriteStreamPtr& output)
61  : output_(output) {
62  memset(&z_stream_, 0, sizeof(z_stream_));
63 
64  // windowBits = 15 (largest allocation) + 16 (gzip header)
65  int window_size = 15 + 16;
66  int err = deflateInit2(&z_stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
67  window_size, /* memLevel */ 8,
68  Z_DEFAULT_STRATEGY);
69  die_unequal(err, Z_OK);
70 
71  // output buffer
72  buffer_.resize(2 * 1024 * 1024);
73  z_stream_.next_out = buffer_.data();
74  z_stream_.avail_out = static_cast<uInt>(buffer_.size());
75 
76  initialized_ = true;
77  }
78 
79  ~GZipWriteFilter() {
80  close();
81  }
82 
83  ssize_t write(const void* data, const size_t size) final {
84  int err;
85 
86  z_stream_.next_in = const_cast<Bytef*>(
87  reinterpret_cast<const Bytef*>(data));
88  z_stream_.avail_in = size;
89 
90  do
91  {
92  err = deflate(&z_stream_, Z_NO_FLUSH);
93 
94  if (err == Z_OK && z_stream_.avail_in != 0)
95  {
96  uInt written_size =
97  buffer_.size() - z_stream_.avail_out;
98 
99  // buffer is full, write to output
100  output_->write(buffer_.data(), written_size);
101 
102  z_stream_.next_out = buffer_.data();
103  z_stream_.avail_out = buffer_.size();
104  }
105  }
106  while (z_stream_.avail_in != 0 && err == Z_OK); // NOLINT
107 
108  die_unequal(err, Z_OK);
109 
110  return size;
111  }
112 
113  void close() final {
114  if (!initialized_) return;
115 
116  int err;
117 
118  do
119  {
120  err = deflate(&z_stream_, Z_FINISH);
121 
122  if (err == Z_OK && z_stream_.avail_in != 0)
123  {
124  uInt written_size =
125  buffer_.size() - z_stream_.avail_out;
126 
127  // buffer is full, write to output
128  output_->write(buffer_.data(), written_size);
129 
130  z_stream_.next_out = buffer_.data();
131  z_stream_.avail_out = buffer_.size();
132  }
133  }
134  while (err == Z_OK); // NOLINT
135 
136  // write remaining data
137  uInt written_size = buffer_.size() - z_stream_.avail_out;
138  output_->write(buffer_.data(), written_size);
139 
140  output_->close();
141 
142  deflateEnd(&z_stream_);
143  initialized_ = false;
144  }
145 
146 private:
147  //! if z_stream_ is initialized
148  bool initialized_;
149 
150  //! zlib context
151  z_stream z_stream_;
152 
153  //! compression buffer, flushed to output when full
154  std::vector<Bytef> buffer_;
155 
156  //! output stream for writing data somewhere
157  WriteStreamPtr output_;
158 };
159 
161  die_unless(stream);
162  return tlx::make_counting<GZipWriteFilter>(stream);
163 }
164 
165 /******************************************************************************/
166 // GZipReadFilter - on-the-fly gzip decompressor
167 
168 class GZipReadFilter : public virtual ReadStream
169 {
170  static constexpr bool debug = false;
171 
172 public:
173  explicit GZipReadFilter(const ReadStreamPtr& input)
174  : input_(input) {
175  memset(&z_stream_, 0, sizeof(z_stream_));
176 
177  /* windowBits = 15 (largest allocation) + 32 (autodetect headers) */
178  int window_size = 15 + 32;
179  err_ = inflateInit2(&z_stream_, window_size);
180  die_unequal(err_, Z_OK);
181 
182  // output buffer
183  buffer_.resize(2 * 1024 * 1024);
184  z_stream_.next_in = buffer_.data();
185  z_stream_.avail_in = 0;
186 
187  initialized_ = true;
188  }
189 
190  ~GZipReadFilter() {
191  close();
192  }
193 
194  ssize_t read(void* data, size_t size) final {
195  z_stream_.next_out = const_cast<Bytef*>(
196  reinterpret_cast<const Bytef*>(data));
197  z_stream_.avail_out = size;
198 
199  do
200  {
201  if (z_stream_.avail_in == 0) {
202  // input buffer empty, so read from input_
203  z_stream_.avail_in = input_->read(
204  buffer_.data(), buffer_.size());
205  z_stream_.next_in = buffer_.data();
206 
207  if (z_stream_.avail_in == 0) {
208  return size - z_stream_.avail_out;
209  }
210  }
211 
212  if (err_ == Z_STREAM_END) {
213  LOG << "GZipReadFilter: inflateReset()";
214  inflateReset(&z_stream_);
215  }
216 
217  err_ = inflate(&z_stream_, Z_SYNC_FLUSH);
218  } while ((err_ == Z_OK || err_ == Z_STREAM_END) && // NOLINT
219  z_stream_.avail_out != 0); // NOLINT
220 
221  if (err_ != Z_OK && err_ != Z_STREAM_END) {
222  die("GZipReadFilter: " << Z_ERROR_to_string(err_) <<
223  " while inflating");
224  }
225 
226  die_unequal(z_stream_.avail_out, 0u);
227 
228  return size;
229  }
230 
231  void close() final {
232  if (!initialized_) return;
233 
234  inflateEnd(&z_stream_);
235  input_->close();
236 
237  initialized_ = false;
238  }
239 
240 private:
241  //! if z_stream_ is initialized
242  bool initialized_;
243 
244  //! zlib context
245  z_stream z_stream_;
246 
247  //! current error code
248  int err_;
249 
250  //! decompression buffer, filled from the input when empty
251  std::vector<Bytef> buffer_;
252 
253  //! input stream for reading data from somewhere
254  ReadStreamPtr input_;
255 };
256 
258  die_unless(stream);
259  return tlx::make_counting<GZipReadFilter>(stream);
260 }
261 
262 /******************************************************************************/
263 
264 #else // !THRILL_HAVE_ZLIB
265 
267  die(".gz decompression is not available, "
268  "because Thrill was built without zlib.");
269 }
270 
272  die(".gz decompression is not available, "
273  "because Thrill was built without zlib.");
274 }
275 
276 #endif
277 
278 } // namespace vfs
279 } // namespace thrill
280 
281 /******************************************************************************/
#define die_unless(X)
Definition: die.hpp:27
tlx::CountingPtr< WriteStream > WriteStreamPtr
Definition: file_io.hpp:146
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
#define die_unequal(X, Y)
Definition: die.hpp:50
ReadStreamPtr MakeGZipReadFilter(const ReadStreamPtr &)
static constexpr bool debug
WriteStreamPtr MakeGZipWriteFilter(const WriteStreamPtr &)
tlx::CountingPtr< ReadStream > ReadStreamPtr
Definition: file_io.hpp:145
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24