Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion petastorm/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,12 @@ def make_batch_reader(dataset_url_or_urls,
Use :func:`~petastorm.make_reader` to read Petastorm Parquet stores generated with
:func:`~petastorm.etl.dataset_metadata.materialize_dataset`.

NOTE: only scalar columns are currently supported.
NOTE: only scalar columns or array type (of primitive type element) columns are currently supported.

NOTE: If without `schema_fields` specified, the reader schema will be inferred from parquet dataset. then the
reader schema fields order will preserve parqeut dataset fields order (partition column come first), but if
setting `transform_spec` and specified `TransformSpec.selected_fields`, then the reader schema fields order
will be the order of 'selected_fields'.

:param dataset_url_or_urls: a url to a parquet directory or a url list (with the same scheme) to parquet files.
e.g. ``'hdfs://some_hdfs_cluster/user/yevgeni/parquet8'``, or ``'file:///tmp/mydataset'``,
Expand Down
6 changes: 6 additions & 0 deletions petastorm/tests/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, ShortType, StringType

try:
from mock import mock
except ImportError:
from unittest import mock

from petastorm import make_reader, make_batch_reader, TransformSpec
from petastorm.codecs import ScalarCodec, CompressedImageCodec
from petastorm.errors import NoDataAvailableError
Expand Down Expand Up @@ -216,6 +221,7 @@ def double_matrix(sample):
np.testing.assert_equal(expected_matrix, actual.double_matrix)


@mock.patch('petastorm.unischema._UNISCHEMA_FIELD_ORDER', 'alphabetical')
def test_transform_function_batched(scalar_dataset):
def double_float64(sample):
sample['float64'] *= 2
Expand Down
12 changes: 12 additions & 0 deletions petastorm/tests/test_ngram_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import tensorflow as tf
from tensorflow.python.framework.errors_impl import OutOfRangeError

try:
from mock import mock
except ImportError:
from unittest import mock

from petastorm import make_reader
from petastorm.ngram import NGram
from petastorm.tests.conftest import SyntheticDataset, maybe_cached_dataset
Expand Down Expand Up @@ -209,6 +214,7 @@ def test_ngram_basic_tf(dataset_num_files_1, reader_factory):


@pytest.mark.parametrize('reader_factory', READER_FACTORIES)
@mock.patch('petastorm.unischema._UNISCHEMA_FIELD_ORDER', 'alphabetical')
def test_ngram_basic(dataset_num_files_1, reader_factory):
"""Tests basic ngram with no delta threshold with no shuffle and in the same partition."""
fields = {
Expand All @@ -233,6 +239,7 @@ def test_ngram_basic_longer_tf(dataset_num_files_1, reader_factory):


@pytest.mark.parametrize('reader_factory', READER_FACTORIES)
@mock.patch('petastorm.unischema._UNISCHEMA_FIELD_ORDER', 'alphabetical')
def test_ngram_basic_longer(dataset_num_files_1, reader_factory):
"""Tests basic ngram with no delta threshold with no shuffle and in the same partition."""
fields = {
Expand All @@ -257,6 +264,7 @@ def test_ngram_basic_shuffle_multi_partition_tf(synthetic_dataset, reader_factor


@pytest.mark.parametrize('reader_factory', READER_FACTORIES)
@mock.patch('petastorm.unischema._UNISCHEMA_FIELD_ORDER', 'alphabetical')
def test_ngram_basic_shuffle_multi_partition(synthetic_dataset, reader_factory):
"""Tests basic ngram with no delta threshold with shuffle and in many partitions."""
fields = {
Expand All @@ -281,6 +289,7 @@ def test_ngram_basic_longer_shuffle_multi_partition_tf(synthetic_dataset, reader


@pytest.mark.parametrize('reader_factory', READER_FACTORIES)
@mock.patch('petastorm.unischema._UNISCHEMA_FIELD_ORDER', 'alphabetical')
def test_ngram_basic_longer_shuffle_multi_partition(synthetic_dataset, reader_factory):
"""Tests basic ngram with no delta threshold with shuffle and in many partitions."""
fields = {
Expand All @@ -294,6 +303,7 @@ def test_ngram_basic_longer_shuffle_multi_partition(synthetic_dataset, reader_fa


@pytest.mark.parametrize('reader_factory', READER_FACTORIES)
@mock.patch('petastorm.unischema._UNISCHEMA_FIELD_ORDER', 'alphabetical')
def test_ngram_basic_longer_no_overlap(synthetic_dataset, reader_factory):
"""Tests basic ngram with no delta threshold with no overlaps of timestamps."""
fields = {
Expand Down Expand Up @@ -366,6 +376,7 @@ def test_ngram_delta_threshold_tf(dataset_0_3_8_10_11_20_23, reader_factory):


@pytest.mark.parametrize('reader_factory', READER_FACTORIES)
@mock.patch('petastorm.unischema._UNISCHEMA_FIELD_ORDER', 'alphabetical')
def test_ngram_delta_threshold(dataset_0_3_8_10_11_20_23, reader_factory):
"""Test to verify that delta threshold work as expected in one partition in the same ngram
and between consecutive ngrams. delta threshold here refers that each ngram must not be
Expand Down Expand Up @@ -557,6 +568,7 @@ def _test_continuous_ngram_returns(ngram_fields, ts_field, dataset_num_files_1,


@pytest.mark.parametrize('reader_factory', READER_FACTORIES)
@mock.patch('petastorm.unischema._UNISCHEMA_FIELD_ORDER', 'alphabetical')
def test_ngram_with_regex_fields(dataset_num_files_1, reader_factory):
"""Tests to verify fields and timestamp field can be regular expressions and work with a reader
"""
Expand Down
8 changes: 8 additions & 0 deletions petastorm/tests/test_tf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import pytest
import tensorflow as tf

try:
from mock import mock
except ImportError:
from unittest import mock

from petastorm import make_reader, make_batch_reader, TransformSpec
from petastorm.ngram import NGram
from petastorm.tests.test_common import TestSchema
Expand Down Expand Up @@ -93,6 +98,7 @@ def test_unknown_type():
_numpy_to_tf_dtypes(np.uint64)


@mock.patch('petastorm.unischema._UNISCHEMA_FIELD_ORDER', 'alphabetical')
def test_schema_to_dtype_list():
TestSchema = Unischema('TestSchema', [
UnischemaField('int32', np.int32, (), None, False),
Expand Down Expand Up @@ -318,6 +324,7 @@ def test_shuffling_queue_with_make_batch_reader(scalar_dataset):
tf_tensors(reader, 100, 90)


@mock.patch('petastorm.unischema._UNISCHEMA_FIELD_ORDER', 'alphabetical')
def test_transform_function_new_field(synthetic_dataset):
def double_matrix(sample):
sample['double_matrix'] = sample['matrix'] * 2
Expand All @@ -337,6 +344,7 @@ def double_matrix(sample):
np.testing.assert_equal(expected_matrix, actual.double_matrix)


@mock.patch('petastorm.unischema._UNISCHEMA_FIELD_ORDER', 'alphabetical')
def test_transform_function_new_field_batched(scalar_dataset):
def double_float64(sample):
sample['new_float64'] = sample['float64'] * 2
Expand Down
14 changes: 13 additions & 1 deletion petastorm/tests/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

def test_noop_transform():
transformed_schema = transform_schema(TestSchema, TransformSpec(lambda x: x, edit_fields=None, removed_fields=None))
assert transformed_schema.fields == TestSchema.fields
assert set(transformed_schema.fields) == set(TestSchema.fields)


def test_remove_field_transform():
Expand All @@ -40,6 +40,18 @@ def test_remove_field_transform():
assert set(two_removed.fields.keys()) == {'string'}


def test_select_field_transform():
test_list = [
['string', 'double', 'int'],
['int', 'string', 'double'],
['string', 'int'],
['int']
]
for selected_fields in test_list:
transformed = transform_schema(TestSchema, TransformSpec(selected_fields=selected_fields))
assert list(transformed.fields.keys()) == selected_fields


def test_add_field_transform():
one_added = transform_schema(TestSchema,
TransformSpec(lambda x: x,
Expand Down
12 changes: 9 additions & 3 deletions petastorm/tests/test_unischema.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def test_match_unischema_fields_legacy_warning():


def test_arrow_schema_convertion():
arrow_schema = pa.schema([
fields = [
pa.field('string', pa.string()),
pa.field('int8', pa.int8()),
pa.field('int16', pa.int16()),
Expand All @@ -335,20 +335,26 @@ def test_arrow_schema_convertion():
pa.field('timestamp_s', pa.timestamp('s')),
pa.field('timestamp_ns', pa.timestamp('ns')),
pa.field('date_32', pa.date32()),
pa.field('date_64', pa.date64()),
])
pa.field('date_64', pa.date64())
]
arrow_schema = pa.schema(fields)

mock_dataset = _mock_parquet_dataset([], arrow_schema)

unischema = Unischema.from_arrow_schema(mock_dataset)
for name in arrow_schema.names:
assert getattr(unischema, name).name == name
assert getattr(unischema, name).codec is None

if name == 'bool':
assert not getattr(unischema, name).nullable
else:
assert getattr(unischema, name).nullable

# Test schema preserve fields order
field_name_list = [f.name for f in fields]
assert list(unischema.fields.keys()) == field_name_list


def test_arrow_schema_convertion_with_string_partitions():
arrow_schema = pa.schema([
Expand Down
19 changes: 18 additions & 1 deletion petastorm/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


class TransformSpec(object):
def __init__(self, func=None, edit_fields=None, removed_fields=None):
def __init__(self, func=None, edit_fields=None, removed_fields=None, selected_fields=None):
"""TransformSpec defines a user transformation that is applied to a loaded row on a worker thread/process.

The object defines the function (callable) that perform the transform as well as the
Expand All @@ -34,10 +34,19 @@ def __init__(self, func=None, edit_fields=None, removed_fields=None):
:param edit_fields: Optional. A list of 4-tuples with the following fields:
``(name, numpy_dtype, shape, is_nullable)``
:param removed_fields: Optional[list]. A list of field names that will be removed from the original schema.
:param selected_fields: Optional[list]. A list of field names specify the fields to be selected.
If selected_fields specified, The reader schema will preserve the field order in selected_fields.

Note: For param `removed_fields` and `selected_fields`, user can only specify one of them.
"""
self.func = func
self.edit_fields = edit_fields or []

if removed_fields is not None and selected_fields is not None:
raise ValueError('User can only specify one of removed_fields and selected_fields in TransformSpec.')

self.removed_fields = removed_fields or []
self.selected_fields = selected_fields


def transform_schema(schema, transform_spec):
Expand All @@ -61,4 +70,12 @@ def transform_schema(schema, transform_spec):
shape=field_to_edit[2], codec=None, nullable=field_to_edit[3])
fields.append(edited_unischema_field)

if transform_spec.selected_fields is not None:
unknown_field_names = set(transform_spec.selected_fields) - set(f.name for f in fields)
if unknown_field_names:
warnings.warn('selected_fields specified some field names that are not part of the schema. '
'These field names will be ignored "{}". '.format(', '.join(unknown_field_names)))
fields = [f for f in fields if f.name in transform_spec.selected_fields]
fields = sorted(fields, key=lambda f: transform_spec.selected_fields.index(f.name))

return Unischema(schema._name + '_transformed', fields)
23 changes: 17 additions & 6 deletions petastorm/unischema.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@

from petastorm.compat import compat_get_metadata

# _UNISCHEMA_FIELD_ORDER available values are 'preserve_input_order' or 'alphabetical'
# Current default behavior is 'preserve_input_order', the legacy behavior is 'alphabetical', which is deprecated and
# will be removed in future versions.
_UNISCHEMA_FIELD_ORDER = 'preserve_input_order'


def _fields_as_tuple(field):
"""Common representation of UnischemaField for equality and hash operators.
Expand Down Expand Up @@ -95,11 +100,14 @@ def get(parent_schema_name, field_names):
:return: A namedtuple with field names defined by `field_names`
"""
# Cache key is a combination of schema name and all field names
sorted_names = list(sorted(field_names))
key = ' '.join([parent_schema_name] + sorted_names)
if _UNISCHEMA_FIELD_ORDER.lower() == 'alphabetical':
field_names = list(sorted(field_names))
else:
field_names = list(field_names)
key = ' '.join([parent_schema_name] + field_names)
if key not in _NamedtupleCache._store:
_NamedtupleCache._store[key] = \
_new_gt_255_compatible_namedtuple('{}_view'.format(parent_schema_name), sorted_names)
_new_gt_255_compatible_namedtuple('{}_view'.format(parent_schema_name), field_names)
return _NamedtupleCache._store[key]


Expand Down Expand Up @@ -172,11 +180,14 @@ def __init__(self, name, fields):
"""Creates an instance of a Unischema object.

:param name: name of the schema
:param fields: a list of ``UnischemaField`` instances describing the fields. The order of the fields is
not important - they are stored sorted by name internally.
:param fields: a list of ``UnischemaField`` instances describing the fields. The element order in the list
represent the schema field order.
"""
self._name = name
self._fields = OrderedDict([(f.name, f) for f in sorted(fields, key=lambda t: t.name)])
if _UNISCHEMA_FIELD_ORDER.lower() == 'alphabetical':
fields = sorted(fields, key=lambda t: t.name)

self._fields = OrderedDict([(f.name, f) for f in fields])
# Generates attributes named by the field names as an access syntax sugar.
for f in fields:
if not hasattr(self, f.name):
Expand Down