Skip to content

Commit c9fca18

Browse files
Support streaming tar files (#2800)
* Support streaming tar files * Implement custom readline for io.RawIOBase like * Use custom readline in json script
1 parent da58e65 commit c9fca18

3 files changed

Lines changed: 24 additions & 4 deletions

File tree

src/datasets/packaged_modules/json/json.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
# coding=utf-8
2-
2+
import io
33
import json
44
from dataclasses import dataclass
5-
from io import BytesIO
65
from typing import Optional
76

87
import pyarrow as pa
98
import pyarrow.json as paj
109

1110
import datasets
11+
from datasets.utils.file_utils import readline
1212

1313

1414
logger = datasets.utils.logging.get_logger(__name__)
@@ -107,12 +107,16 @@ def _generate_tables(self, files):
107107
batch = f.read(self.config.chunksize)
108108
if not batch:
109109
break
110-
batch += f.readline() # finish current line
110+
# Finish current line
111+
try:
112+
batch += f.readline()
113+
except (AttributeError, io.UnsupportedOperation):
114+
batch += readline(f)
111115
try:
112116
while True:
113117
try:
114118
pa_table = paj.read_json(
115-
BytesIO(batch), read_options=paj.ReadOptions(block_size=block_size)
119+
io.BytesIO(batch), read_options=paj.ReadOptions(block_size=block_size)
116120
)
117121
break
118122
except pa.ArrowInvalid as e:

src/datasets/utils/file_utils.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"""
66

77
import copy
8+
import io
89
import json
910
import os
1011
import re
@@ -682,3 +683,16 @@ def docstring_decorator(fn):
682683

683684
def estimate_dataset_size(paths):
684685
return sum(path.stat().st_size for path in paths)
686+
687+
688+
def readline(f: io.RawIOBase):
689+
# From: https://github.com/python/cpython/blob/d27e2f4d118e7a9909b6a3e5da06c5ff95806a85/Lib/_pyio.py#L525
690+
res = bytearray()
691+
while True:
692+
b = f.read(1)
693+
if not b:
694+
break
695+
res += b
696+
if res.endswith(b"\n"):
697+
break
698+
return bytes(res)

src/datasets/utils/streaming_download_manager.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ def _get_extraction_protocol(self, urlpath) -> Optional[str]:
134134
return None
135135
elif path.endswith(".gz") and not path.endswith(".tar.gz"):
136136
return "gzip"
137+
elif path.endswith(".tar"):
138+
return "tar"
137139
elif path.endswith(".zip"):
138140
return "zip"
139141
raise NotImplementedError(f"Extraction protocol for file at {urlpath} is not implemented yet")

0 commit comments

Comments
 (0)