Thrill  0.1
bzip2_filter.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/vfs/bzip2_filter.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
12 
13 #include <tlx/die.hpp>
14 
15 #if THRILL_HAVE_BZIP2
16 #include <bzlib.h>
17 #endif
18 
19 #include <vector>
20 
21 namespace thrill {
22 namespace vfs {
23 
24 #if THRILL_HAVE_BZIP2
25 
26 /******************************************************************************/
27 // BZip2WriteFilter - on-the-fly bzip2 compressor
28 
29 class BZip2WriteFilter final : public virtual WriteStream
30 {
31 public:
32  explicit BZip2WriteFilter(const WriteStreamPtr& output)
33  : output_(output) {
34  memset(&bz_stream_, 0, sizeof(bz_stream_));
35 
36  int err = BZ2_bzCompressInit(
37  &bz_stream_,
38  /* blockSize100k */ 9, /* verbosity */ 0, /* workFactor */ 0);
39  die_unequal(err, BZ_OK);
40 
41  // output buffer
42  buffer_.resize(2 * 1024 * 1024);
43  bz_stream_.next_out = buffer_.data();
44  bz_stream_.avail_out = static_cast<unsigned>(buffer_.size());
45 
46  initialized_ = true;
47  }
48 
49  ~BZip2WriteFilter() {
50  close();
51  }
52 
53  ssize_t write(const void* data, const size_t size) final {
54  int err;
55 
56  bz_stream_.next_in = const_cast<char*>(
57  reinterpret_cast<const char*>(data));
58  bz_stream_.avail_in = size;
59 
60  do
61  {
62  err = BZ2_bzCompress(&bz_stream_, BZ_RUN);
63 
64  if (err == BZ_OK && bz_stream_.avail_in != 0)
65  {
66  unsigned written_size =
67  buffer_.size() - bz_stream_.avail_out;
68 
69  // buffer is full, write to output
70  output_->write(buffer_.data(), written_size);
71 
72  bz_stream_.next_out = buffer_.data();
73  bz_stream_.avail_out = buffer_.size();
74  }
75  }
76  while (bz_stream_.avail_in != 0 && err == BZ_RUN_OK); // NOLINT
77 
78  die_unequal(err, BZ_RUN_OK);
79 
80  return size;
81  }
82 
83  void close() final {
84  if (!initialized_) return;
85 
86  int err;
87 
88  do
89  {
90  err = BZ2_bzCompress(&bz_stream_, BZ_FINISH);
91 
92  if (err == BZ_FINISH_OK && bz_stream_.avail_in != 0)
93  {
94  unsigned written_size =
95  buffer_.size() - bz_stream_.avail_out;
96 
97  // buffer is full, write to output
98  output_->write(buffer_.data(), written_size);
99 
100  bz_stream_.next_out = buffer_.data();
101  bz_stream_.avail_out = buffer_.size();
102  }
103  }
104  while (err == BZ_FINISH_OK); // NOLINT
105 
106  die_unequal(err, BZ_STREAM_END);
107 
108  // write remaining data
109  unsigned written_size = buffer_.size() - bz_stream_.avail_out;
110  output_->write(buffer_.data(), written_size);
111 
112  output_->close();
113 
114  BZ2_bzCompressEnd(&bz_stream_);
115  initialized_ = false;
116  }
117 
118 private:
119  //! if bz_stream_ is initialized
120  bool initialized_;
121 
122  //! bzip2 context
123  bz_stream bz_stream_;
124 
125  //! compression buffer, flushed to output when full
126  std::vector<char> buffer_;
127 
128  //! output stream for writing data somewhere
129  WriteStreamPtr output_;
130 };
131 
133  die_unless(stream);
134  return tlx::make_counting<BZip2WriteFilter>(stream);
135 }
136 
137 /******************************************************************************/
138 // BZip2ReadFilter - on-the-fly bzip2 decompressor
139 
140 class BZip2ReadFilter : public virtual ReadStream
141 {
142 public:
143  explicit BZip2ReadFilter(const ReadStreamPtr& input)
144  : input_(input) {
145  memset(&bz_stream_, 0, sizeof(bz_stream_));
146 
147  err_ = BZ2_bzDecompressInit(
148  &bz_stream_, /* verbosity */ 0, /* small */ 0);
149  die_unequal(err_, BZ_OK);
150 
151  // output buffer
152  buffer_.resize(2 * 1024 * 1024);
153  bz_stream_.next_in = buffer_.data();
154  bz_stream_.avail_in = 0;
155 
156  initialized_ = true;
157  }
158 
159  ~BZip2ReadFilter() {
160  close();
161  }
162 
163  ssize_t read(void* data, size_t size) final {
164  bz_stream_.next_out = const_cast<char*>(
165  reinterpret_cast<const char*>(data));
166  bz_stream_.avail_out = size;
167 
168  do
169  {
170  if (bz_stream_.avail_in == 0) {
171  // input buffer empty, so read from input_
172  bz_stream_.avail_in = input_->read(
173  buffer_.data(), buffer_.size());
174  bz_stream_.next_in = buffer_.data();
175 
176  if (bz_stream_.avail_in == 0) {
177  return size - bz_stream_.avail_out;
178  }
179  }
180 
181  err_ = BZ2_bzDecompress(&bz_stream_);
182 
183  if (err_ == BZ_STREAM_END)
184  return size - bz_stream_.avail_out;
185  }
186  while (err_ == BZ_OK && bz_stream_.avail_out != 0); // NOLINT
187 
188  die_unequal(bz_stream_.avail_out, 0u);
189 
190  return size;
191  }
192 
193  void close() final {
194  if (!initialized_) return;
195 
196  BZ2_bzDecompressEnd(&bz_stream_);
197  input_->close();
198 
199  initialized_ = false;
200  }
201 
202 private:
203  //! if bz_stream_ is initialized
204  bool initialized_;
205 
206  //! bzip2 context
207  bz_stream bz_stream_;
208 
209  //! current error code
210  int err_;
211 
212  //! decompression buffer, filled from the input when empty
213  std::vector<char> buffer_;
214 
215  //! input stream for reading data from somewhere
216  ReadStreamPtr input_;
217 };
218 
220  die_unless(stream);
221  return tlx::make_counting<BZip2ReadFilter>(stream);
222 }
223 
224 /******************************************************************************/
225 
226 #else // !THRILL_HAVE_BZIP2
227 
229  die(".bz2 decompression is not available, "
230  "because Thrill was built without libbz2.");
231 }
232 
234  die(".bz2 decompression is not available, "
235  "because Thrill was built without libbz2.");
236 }
237 
238 #endif
239 
240 } // namespace vfs
241 } // namespace thrill
242 
243 /******************************************************************************/
Writer object to output data to any supported URI.
Definition: file_io.hpp:135
Reader object from any source.
Definition: file_io.hpp:120
#define die_unless(X)
Definition: die.hpp:27
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
ReadStreamPtr MakeBZip2ReadFilter(const ReadStreamPtr &)
#define die_unequal(X, Y)
Definition: die.hpp:50
WriteStreamPtr MakeBZip2WriteFilter(const WriteStreamPtr &)