-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Expand file tree
/
Copy patharrow_writer.py
More file actions
302 lines (262 loc) · 12.5 KB
/
arrow_writer.py
File metadata and controls
302 lines (262 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# coding=utf-8
# Copyright 2020 The HuggingFace NLP Authors and the TensorFlow Datasets Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""To write records into Parquet files."""
import errno
import logging
import os
import socket
from typing import Any, Dict, List, Optional
import pyarrow as pa
from .features import Features
from .utils.file_utils import HF_DATASETS_CACHE, hash_url_to_filename
from .utils.py_utils import map_all_sequences_to_lists
logger = logging.getLogger(__name__)
# Batch size constants. For more info, see:
# https://github.com/apache/arrow/blob/master/docs/source/cpp/arrays.rst#size-limitations-and-recommendations)
DEFAULT_MAX_BATCH_SIZE = 10_000 # hopefully it doesn't write too much at once (max is 2GB)
class ArrowWriter(object):
"""Shuffles and writes Examples to Arrow files.
"""
def __init__(
self,
data_type: Optional[pa.DataType] = None,
schema: Optional[pa.Schema] = None,
features: Optional[Features] = None,
path: Optional[str] = None,
stream: Optional[pa.NativeFile] = None,
writer_batch_size: Optional[int] = None,
disable_nullable: bool = True,
):
if path is None and stream is None:
raise ValueError("At least one of path and stream must be provided.")
if features is not None:
self._features = features
self._schema = pa.schema(features.type) if features is not None else None
self._type: pa.DataType = pa.struct(field for field in self._schema)
elif data_type is not None:
self._type: pa.DataType = data_type
self._schema: pa.Schema = pa.schema(field for field in self._type)
self._features = Features.from_arrow_schema(self._schema)
elif schema is not None:
self._schema: pa.Schema = schema
self._type: pa.DataType = pa.struct(field for field in self._schema)
self._features = Features.from_arrow_schema(self._schema)
else:
self._features = None
self._schema = None
self._type = None
if disable_nullable and self._schema is not None:
self._schema = pa.schema(pa.field(field.name, field.type, nullable=False) for field in self._type)
self._type = pa.struct(pa.field(field.name, field.type, nullable=False) for field in self._type)
self._features = Features.from_arrow_schema(self._schema)
self._path = path
if stream is None:
self.stream = pa.OSFile(self._path, "wb")
else:
self.stream = stream
self.writer_batch_size = writer_batch_size or DEFAULT_MAX_BATCH_SIZE
self._num_examples = 0
self._num_bytes = 0
self.current_rows = []
self.pa_writer: Optional[pa.RecordBatchStreamWriter] = None
if self._schema is not None:
self._build_writer(schema=self._schema)
def _build_writer(self, schema: pa.Schema):
self._schema: pa.Schema = schema
self._type: pa.DataType = pa.struct(field for field in self._schema)
self._features = Features.from_arrow_schema(self._schema)
self.pa_writer = pa.RecordBatchStreamWriter(self.stream, schema)
@property
def schema(self):
return self._schema if self._schema is not None else []
def _write_array_on_file(self, pa_array):
"""Write a PyArrow Array"""
pa_batch = pa.RecordBatch.from_struct_array(pa_array)
self._num_bytes += pa_array.nbytes
if self.pa_writer is None:
pa_table = pa.Table.from_batches([pa_batch])
self._build_writer(schema=pa_table.schema)
self.pa_writer.write_batch(pa_batch)
def write_on_file(self):
""" Write stored examples
"""
if self.current_rows:
pa_array = pa.array(self.current_rows, type=self._type)
first_example = pa.array(self.current_rows[0:1], type=self._type)[0]
# Sanity check
if pa_array[0] != first_example:
# There was an Overflow in StructArray. Let's reduce the batch_size
while pa_array[0] != first_example:
new_batch_size = self.writer_batch_size // 2
pa_array = pa.array(self.current_rows[:new_batch_size], type=self._type)
logger.warning(
"Batch size is too big (>2GB). Reducing it from {} to {}".format(
self.writer_batch_size, new_batch_size
)
)
self.writer_batch_size = new_batch_size
n_batches = len(self.current_rows) // new_batch_size
n_batches += int(len(self.current_rows) % new_batch_size != 0)
for i in range(n_batches):
pa_array = pa.array(
self.current_rows[i * new_batch_size : (i + 1) * new_batch_size], type=self._type,
)
self._write_array_on_file(pa_array)
else:
# All good
self._write_array_on_file(pa_array)
self.current_rows = []
def write(self, example: Dict[str, Any], writer_batch_size: Optional[int] = None):
""" Add a given Example to the write-pool which is written to file.
Args:
example: the Example to add.
"""
example = map_all_sequences_to_lists(example)
self.current_rows.append(example)
self._num_examples += 1
if writer_batch_size is None:
writer_batch_size = self.writer_batch_size
if writer_batch_size is not None and len(self.current_rows) >= writer_batch_size:
self.write_on_file()
def write_batch(
self, batch_examples: Dict[str, List[Any]], writer_batch_size: Optional[int] = None,
):
""" Write a batch of Example to file.
Args:
example: the Example to add.
"""
batch_examples = map_all_sequences_to_lists(batch_examples)
if self.pa_writer is None:
self._build_writer(schema=pa.Table.from_pydict(batch_examples).schema)
pa_table: pa.Table = pa.Table.from_pydict(batch_examples, schema=self._schema)
if writer_batch_size is None:
writer_batch_size = self.writer_batch_size
batches: List[pa.RecordBatch] = pa_table.to_batches(max_chunksize=writer_batch_size)
self._num_bytes += sum(batch.nbytes for batch in batches)
self._num_examples += pa_table.num_rows
for batch in batches:
self.pa_writer.write_batch(batch)
def write_table(self, pa_table: pa.Table, writer_batch_size: Optional[int] = None):
""" Write a batch of Example to file.
Args:
example: the Example to add.
"""
if writer_batch_size is None:
writer_batch_size = self.writer_batch_size
if self.pa_writer is None:
self._build_writer(schema=pa_table.schema)
batches: List[pa.RecordBatch] = pa_table.to_batches(max_chunksize=writer_batch_size)
self._num_bytes += sum(batch.nbytes for batch in batches)
self._num_examples += pa_table.num_rows
for batch in batches:
self.pa_writer.write_batch(batch)
def finalize(self, close_stream=True):
self.write_on_file()
self.pa_writer.close()
if close_stream:
self.stream.close()
logger.info(
"Done writing %s examples in %s bytes %s.",
self._num_examples,
self._num_bytes,
self._path if self._path else "",
)
return self._num_examples, self._num_bytes
class BeamWriter(object):
"""
Shuffles and writes Examples to Arrow files.
The Arrow files are converted from Parquet files that are the output of Apache Beam pipelines.
"""
def __init__(
self,
data_type: Optional[pa.DataType] = None,
schema: Optional[pa.Schema] = None,
path: Optional[str] = None,
namespace: Optional[str] = None,
cache_dir: Optional[str] = None,
):
if data_type is None and schema is None:
raise ValueError("At least one of data_type and schema must be provided.")
if path is None:
raise ValueError("Path must be provided.")
if data_type is not None:
self._type: pa.DataType = data_type
self._schema: pa.Schema = pa.schema(field for field in self._type)
else:
self._schema: pa.Schema = schema
self._type: pa.DataType = pa.struct(field for field in self._schema)
self._path = path
self._parquet_path = os.path.splitext(path)[0] + ".parquet"
self._namespace = namespace or "default"
self._num_examples = None
self._cache_dir = cache_dir or HF_DATASETS_CACHE
def write_from_pcollection(self, pcoll_examples):
"""Add the final steps of the beam pipeline: write to parquet files."""
import apache_beam as beam
from .utils.beam_utils import WriteToParquet
def inc_num_examples(example):
beam.metrics.Metrics.counter(self._namespace, "num_examples").inc()
# count examples
_ = pcoll_examples | "Count N. Examples" >> beam.Map(inc_num_examples)
# save dataset
return (
pcoll_examples
| "Get values" >> beam.Values()
| "Save to parquet"
>> WriteToParquet(self._parquet_path, self._schema, num_shards=1, shard_name_template="")
)
def finalize(self, metrics_query_result: dict):
"""
Run after the pipeline has finished.
It converts the resulting parquet files to arrow and it completes the info from the pipeline metrics.
Args:
metrics_query_result: `dict` obtained from pipeline_results.metrics().query(m_filter). Make sure
that the filter keeps only the metrics for the considered split, under the namespace `split_name`.
"""
import apache_beam as beam
from .utils import beam_utils
# Convert to arrow
logger.info("Converting parquet file {} to arrow {}".format(self._parquet_path, self._path))
try: # stream conversion
with beam.io.filesystems.FileSystems.open(self._parquet_path) as src:
with beam.io.filesystems.FileSystems.create(self._path) as dest:
parquet_to_arrow(src, dest)
except socket.error as e: # broken pipe can happen if the connection is unstable, do local conversion instead
if e.errno != errno.EPIPE: # not a broken pipe
raise e
logger.warning("Broken Pipe during stream conversion from parquet to arrow. Using local convert instead")
local_convert_dir = os.path.join(self._cache_dir, "beam_convert")
os.makedirs(local_convert_dir, exist_ok=True)
local_parquet_path = os.path.join(local_convert_dir, hash_url_to_filename(self._parquet_path) + ".parquet")
local_arrow_path = os.path.splitext(local_parquet_path)[0] + ".arrow"
beam_utils.download_remote_to_local(self._parquet_path, local_parquet_path)
parquet_to_arrow(local_parquet_path, local_arrow_path)
beam_utils.upload_local_to_remote(local_arrow_path, self._path)
# Save metrics
counters_dict = {metric.key.metric.name: metric.result for metric in metrics_query_result["counters"]}
self._num_examples = counters_dict["num_examples"]
output_file_metadata = beam.io.filesystems.FileSystems.match([self._path], limits=[1])[0].metadata_list[0]
self._num_bytes = output_file_metadata.size_in_bytes
return self._num_examples, self._num_bytes
def parquet_to_arrow(source, destination):
"""Convert parquet file to arrow file. Inputs can be str paths or file-like objects"""
pf = pa.parquet.ParquetFile(source)
stream = None if isinstance(destination, str) else destination
writer = ArrowWriter(path=destination, stream=stream)
for i in range(pf.num_row_groups):
row_group_table = pf.read_row_group(i)
writer.write_table(row_group_table)
return destination