Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
adc132d
Initial implementation of replacement of sparkdl's version of ImageSc…
tomasatdatabricks Nov 30, 2017
fd9a38f
Adressed Sid's and Sue Ann's review comments.
tomasatdatabricks Dec 5, 2017
537d1b1
Hack: Per Tim's suggestion I copied Spark2.3's ImageSchema files into…
tomasatdatabricks Dec 5, 2017
5516b50
Minor update.
tomasatdatabricks Dec 6, 2017
327ddc8
Addressed some review comments:
tomasatdatabricks Dec 7, 2017
a5f2ff1
Added comments explaining why reveresing color channels is necessary …
tomasatdatabricks Dec 8, 2017
0ea3761
Per review comments and offline discusion, removed reordering of outp…
tomasatdatabricks Dec 12, 2017
936838a
Some cosmetic changes address review comments.
tomasatdatabricks Dec 12, 2017
1ab34bf
Addressed review comments and follow up offline discussion: Added par…
tomasatdatabricks Dec 13, 2017
c2e803b
Per reviewer's request, moved copy-pasted ImageSchema file from pyspa…
tomasatdatabricks Dec 15, 2017
a3f4d08
Cosmetic changes addressing review comments
tomasatdatabricks Dec 15, 2017
421c924
Added convertor to check channel order parameter.
tomasatdatabricks Dec 18, 2017
1a117b4
minor changes addressing review comments
tomasatdatabricks Dec 19, 2017
005fd61
Ran autopep8 on imageIO.py
tomasatdatabricks Dec 19, 2017
10c182c
Ran autopep8 on all sparkdl and test folders
tomasatdatabricks Dec 19, 2017
def1e0e
Minor test fix: added missing cls arg in named_image_test's setup
tomasatdatabricks Dec 19, 2017
5ef9a6b
Minor fix, added extra channel order to error message
tomasatdatabricks Dec 19, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,23 @@ To try running the examples below, check out the Databricks notebook [DeepLearni

### Working with images in Spark

The first step to applying deep learning on images is the ability to load the images. Deep Learning Pipelines includes utility functions that can load millions of images into a Spark DataFrame and decode them automatically in a distributed fashion, allowing manipulation at scale.
The first step to applying deep learning on images is the ability to load the images. Spark and Deep Learning Pipelines include utility functions that can load millions of images into a Spark DataFrame and decode them automatically in a distributed fashion, allowing manipulation at scale.

Using Spark's ImageSchema

```python
from sparkdl.image.image import ImageSchema
image_df = ImageSchema.readImages("/data/myimages")
```

or if custom image library is needed:

```python
from sparkdl import readImages
image_df = readImages("/data/myimages")
from sparkdl.image import imageIO as imageIO
image_df = imageIO.readImagesWithCustomFn("/data/myimages",decode_f=<your image library, see imageIO.PIL_decode>)
```

The resulting DataFrame contains a string column named "filePath" containing the path to each image file, and a image struct ("`SpImage`") column named "image" containing the decoded image data.
The resulting DataFrame contains a string column named "image" containing an image struct with schema == ImageSchema.

```python
image_df.show()
Expand All @@ -109,7 +118,7 @@ featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelNa
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])

model = p.fit(train_images_df) # train_images_df is a dataset of images (SpImage) and labels
model = p.fit(train_images_df) # train_images_df is a dataset of images and labels

# Inspect training error
df = model.transform(train_images_df.limit(10)).select("image", "probability", "uri", "label")
Expand All @@ -127,11 +136,13 @@ Spark DataFrames are a natural construct for applying deep learning models to a
There are many well-known deep learning models for images. If the task at hand is very similar to what the models provide (e.g. object recognition with ImageNet classes), or for pure exploration, one can use the Transformer `DeepImagePredictor` by simply specifying the model name.

```python
from sparkdl import readImages, DeepImagePredictor
from sparkdl.image.image import ImageSchema

from sparkdl import DeepImagePredictor

predictor = DeepImagePredictor(inputCol="image", outputCol="predicted_labels",
modelName="InceptionV3", decodePredictions=True, topK=10)
image_df = readImages("/data/myimages")
image_df = ImageSchema.readImages("/data/myimages")
predictions_df = predictor.transform(image_df)
```

Expand All @@ -140,7 +151,8 @@ Spark DataFrames are a natural construct for applying deep learning models to a
Deep Learning Pipelines provides a Transformer that will apply the given TensorFlow Graph to a DataFrame containing a column of images (e.g. loaded using the utilities described in the previous section). Here is a very simple example of how a TensorFlow Graph can be used with the Transformer. In practice, the TensorFlow Graph will likely be restored from files before calling `TFImageTransformer`.

```python
from sparkdl import readImages, TFImageTransformer
from sparkdl.image.image import ImageSchema
from sparkdl import TFImageTransformer
import sparkdl.graph.utils as tfx
from sparkdl.transformers import utils
import tensorflow as tf
Expand All @@ -155,7 +167,7 @@ Spark DataFrames are a natural construct for applying deep learning models to a
transformer = TFImageTransformer(inputCol="image", outputCol="predictions", graph=frozen_graph,
inputTensor=image_arr, outputTensor=resized_images,
outputMode="image")
image_df = readImages("/data/myimages")
image_df = ImageSchema.readImages("/data/myimages")
processed_image_df = transformer.transform(image_df)
```

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ sparkComponents ++= Seq("mllib-local", "mllib", "sql")
// add any Spark Package dependencies using spDependencies.
// e.g. spDependencies += "databricks/spark-avro:0.1"
spDependencies += s"databricks/tensorframes:0.2.9-s_${scalaMajorVersion}"
spDependencies += "Microsoft/spark-images:0.1"


// These versions are ancient, but they cross-compile around scala 2.10 and 2.11.
// Update them when dropping support for scala 2.10
Expand Down
3 changes: 0 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
// You may use this file to add plugin dependencies for sbt.
resolvers += "Spark Packages repo" at "https://dl.bintray.com/spark-packages/maven/"

addSbtPlugin("org.spark-packages" %% "sbt-spark-package" % "0.2.5")

// scalacOptions in (Compile,doc) := Seq("-groups", "-implicits")

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
2 changes: 0 additions & 2 deletions python/sparkdl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

from .graph.input import TFInputGraph
from .image.imageIO import imageSchema, imageType, readImages
from .transformers.keras_image import KerasImageFileTransformer
from .transformers.named_image import DeepImagePredictor, DeepImageFeaturizer
from .transformers.tf_image import TFImageTransformer
Expand Down
1 change: 1 addition & 0 deletions python/sparkdl/estimators/keras_image_file_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

logger = logging.getLogger('sparkdl')


class KerasImageFileEstimator(Estimator, HasInputCol, HasInputImageNodeName,
HasOutputCol, HasOutputNodeName, HasLabelCol,
HasKerasModel, HasKerasOptimizer, HasKerasLoss,
Expand Down
7 changes: 5 additions & 2 deletions python/sparkdl/graph/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

logger = logging.getLogger('sparkdl')


class IsolatedSession(object):
"""
Provide an isolated session to work with mixed Keras and TensorFlow
Expand All @@ -43,6 +44,7 @@ class IsolatedSession(object):
In this case, all Keras models loaded in this session will be accessible
as a subgraph of of `graph`
"""

def __init__(self, graph=None, using_keras=False):
self.graph = graph or tf.Graph()
self.sess = tf.Session(graph=self.graph)
Expand Down Expand Up @@ -166,7 +168,7 @@ def _fromKerasModelFile(cls, file_path):
'Keras model must be specified as HDF5 file'

with IsolatedSession(using_keras=True) as issn:
K.set_learning_phase(0) # Testing phase
K.set_learning_phase(0) # Testing phase
model = load_model(file_path)
gfn = issn.asGraphFunction(model.inputs, model.outputs)

Expand Down Expand Up @@ -223,7 +225,8 @@ def fromList(cls, functions):
# We currently only support single input/output for intermediary stages
# The functions could still take multi-dimensional tensor, but only one
if len(gfn_out.input_names) != 1:
raise NotImplementedError("Only support single input/output for intermediary layers")
raise NotImplementedError(
"Only support single input/output for intermediary layers")

# Acquire initial placeholders' properties
# We want the input names of the merged function are not under scoped
Expand Down
3 changes: 2 additions & 1 deletion python/sparkdl/graph/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

# pylint: disable=invalid-name,wrong-spelling-in-comment,wrong-spelling-in-docstring


class TFInputGraph(object):
"""
An opaque object containing TensorFlow graph.
Expand Down Expand Up @@ -84,7 +85,6 @@ class TFInputGraph(object):
Please see the example above.
"""


def __init__(self, graph_def, input_tensor_name_from_signature,
output_tensor_name_from_signature):
self.graph_def = graph_def
Expand Down Expand Up @@ -281,6 +281,7 @@ def _from_checkpoint_impl(checkpoint_dir, signature_def_key, feed_names, fetch_n
return _build_with_feeds_fetches(sess=sess, graph=graph, feed_names=feed_names,
fetch_names=fetch_names)


def _from_saved_model_impl(saved_model_dir, tag_set, signature_def_key, feed_names, fetch_names):
"""
Construct a TFInputGraph from a SavedModel.
Expand Down
21 changes: 12 additions & 9 deletions python/sparkdl/graph/pieces.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import tensorflow as tf

from sparkdl.graph.builder import IsolatedSession
from sparkdl.image.imageIO import SparkMode
from sparkdl.image import imageIO

logger = logging.getLogger('sparkdl')

Expand All @@ -29,7 +29,8 @@
Deserializing ProtocolBuffer bytes is in general faster than directly loading Keras models.
"""

def buildSpImageConverter(img_dtype):

def buildSpImageConverter(channelOrder, img_dtype):
"""
Convert a imageIO byte encoded image into a image tensor suitable as input to ConvNets
The name of the input must be a subset of those specified in `image.imageIO.imageSchema`.
Expand All @@ -48,23 +49,25 @@ def buildSpImageConverter(img_dtype):
# This is the default behavior of Python Image Library
shape = tf.reshape(tf.stack([height, width, num_channels], axis=0),
shape=(3,), name='shape')
if img_dtype == SparkMode.RGB:
if img_dtype == 'uint8':
image_uint8 = tf.decode_raw(image_buffer, tf.uint8, name="decode_raw")
image_float = tf.to_float(image_uint8)
else:
assert img_dtype == SparkMode.RGB_FLOAT32, \
"Unsupported dtype for image: {}".format(img_dtype)
elif img_dtype == 'float32':
image_float = tf.decode_raw(image_buffer, tf.float32, name="decode_raw")

else:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the new schema, are there legitimate types that have float64 (or any other dtypes) as img_dtype?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK the schema does not specify types. It only specifies a field with OpenCv type number. There are open CV types which have float64. Technically the schema includes openCvTypes map with only a subset of types, however we already need types outside of this subset (Tf produced images are stored as float32)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So does ImageSchema support OpenCV types that have float64? If so, should we support them here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently as far as I know there is no way how you can get float64 image.

ImageSchema as a data format supports it in that it has a mode field which is supposed to have OpenCV type in it and there are OpenCV types with float64. However, it is not listed in the list of openCV types in their scala code (and neither are any float32 which we need) and as it stands now, readImages can only ever produce images stored in unsigned bytes (both scala an PIL version) so one of CV_8U* formats. We also need the float32 formats since thats' what we return when returning images from TF so I added those to our python side.

The python code from image schema can only handle unsigned byte images, thats why I use our own version in imageIO (imageArrayToStruct and imageStructToArray).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From offline discussion: The ImageSchema utilities in Spark only support uint8 types. Ideally float32 types would also be supported natively in Spark so we don't have to have special logic in this package to handle it. We'll create a Jira in Spark for that and try to address it there.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a Jira for this already? If so, could you link from here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we do. https://issues.apache.org/jira/browse/SPARK-22730

You mean you want it in the code? That would probably go to imageIO, I'll put it there

raise ValueError(
'unsupported image data type "%s", currently only know how to handle uint8 and float32' % img_dtype)
image_reshaped = tf.reshape(image_float, shape, name="reshaped")
image_reshaped = imageIO.fixColorChannelOrdering(channelOrder, image_reshaped)
image_input = tf.expand_dims(image_reshaped, 0, name="image_input")
gfn = issn.asGraphFunction([height, width, image_buffer, num_channels], [image_input])

return gfn


def buildFlattener():
"""
Build a flattening layer to remove the extra leading tensor dimension.
"""
Build a flattening layer to remove the extra leading tensor dimension.
e.g. a tensor of shape [1, W, H, C] will have a shape [W, H, C] after applying this.
"""
with IsolatedSession() as issn:
Expand Down
1 change: 1 addition & 0 deletions python/sparkdl/graph/tensorframes_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

logger = logging.getLogger('sparkdl')


def makeGraphUDF(graph, udf_name, fetches, feeds_to_fields_map=None, blocked=False, register=True):
"""
Create a Spark SQL UserDefinedFunction from a given TensorFlow Graph
Expand Down
12 changes: 11 additions & 1 deletion python/sparkdl/graph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
one of the four target variants.
"""


def validated_graph(graph):
"""
Check if the input is a valid :py:class:`tf.Graph` and return it.
Expand All @@ -41,6 +42,7 @@ def validated_graph(graph):
assert isinstance(graph, tf.Graph), 'must provide tf.Graph, but get {}'.format(type(graph))
return graph


def get_shape(tfobj_or_name, graph):
"""
Return the shape of the tensor as a list
Expand All @@ -52,6 +54,7 @@ def get_shape(tfobj_or_name, graph):
_shape = get_tensor(tfobj_or_name, graph).get_shape().as_list()
return [-1 if x is None else x for x in _shape]


def get_op(tfobj_or_name, graph):
"""
Get a :py:class:`tf.Operation` object.
Expand All @@ -76,6 +79,7 @@ def get_op(tfobj_or_name, graph):
assert isinstance(op, tf.Operation), err_msg.format(_op_name, type(op), op)
return op


def get_tensor(tfobj_or_name, graph):
"""
Get a :py:class:`tf.Tensor` object
Expand All @@ -100,6 +104,7 @@ def get_tensor(tfobj_or_name, graph):
assert isinstance(tnsr, tf.Tensor), err_msg.format(_tensor_name, type(tnsr), tnsr)
return tnsr


def tensor_name(tfobj_or_name, graph=None):
"""
Derive the :py:class:`tf.Tensor` name from a :py:class:`tf.Operation` or :py:class:`tf.Tensor`
Expand Down Expand Up @@ -130,6 +135,7 @@ def tensor_name(tfobj_or_name, graph=None):
else:
raise TypeError('invalid tf.Tensor name query type {}'.format(type(tfobj_or_name)))


def op_name(tfobj_or_name, graph=None):
"""
Derive the :py:class:`tf.Operation` name from a :py:class:`tf.Operation` or
Expand Down Expand Up @@ -158,9 +164,11 @@ def op_name(tfobj_or_name, graph=None):
else:
raise TypeError('invalid tf.Operation name query type {}'.format(type(tfobj_or_name)))


def add_scope_to_name(scope, name):
""" Prepends the provided scope to the passed-in op or tensor name. """
return "%s/%s"%(scope, name)
return "%s/%s" % (scope, name)


def validated_output(tfobj_or_name, graph):
"""
Expand All @@ -172,6 +180,7 @@ def validated_output(tfobj_or_name, graph):
graph = validated_graph(graph)
return op_name(tfobj_or_name, graph)


def validated_input(tfobj_or_name, graph):
"""
Validate and return the input names useable GraphFunction
Expand All @@ -186,6 +195,7 @@ def validated_input(tfobj_or_name, graph):
('input must be Placeholder, but get', op.type)
return name


def strip_and_freeze_until(fetches, graph, sess=None, return_graph=False):
"""
Create a static view of the graph by
Expand Down
Loading