Skip to content
Merged
7 changes: 7 additions & 0 deletions orangecontrib/imageanalytics/http2_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@
from hyper import HTTP20Connection
from hyper.http20.exceptions import StreamResetError

import hyper.http20.stream
from .hyper import stream as local_stream

log = logging.getLogger(__name__)

if hyper.__version__ < '0.7.1': # TODO: remove when version > 0.7.0
hyper.http20.stream.Stream.send_data = local_stream.Stream.send_data
hyper.http20.stream.Stream._send_chunk = local_stream.Stream._send_chunk


class MaxNumberOfRequestsError(Exception):
"""Thrown when remote peer closes the connection because
Expand Down
73 changes: 73 additions & 0 deletions orangecontrib/imageanalytics/hyper/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
"""
Temporary fix for hyper library until next version from 0.7.0 is out
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is under MIT license from hyper.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing to consider: development branch of hyper has a lot of new commits after the last version 0.7.0 (see python-hyper/hyper@v0.7.0...development). You are fixing just one bug. Maybe it would be better to base our usage on development branch and not on 0.7.0?

In the mean time, as we're waiting for upstream to merge python-hyper/hyper#356, we can use in requirements.txt your fixed branch
e.g.

pip install git+https://github.com/PrimozGodec/hyper@d8ed47aadbc1b66e8493726beeffcf0361ae8729

We switch to the upstream's version as soon the fix is merged. And nudge Lukasa to make another official version ;)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we plan to release a version of image-analytics add-on with this fix, I would rather have it base on a released version of the hyper package.

Otherwise, we are installing development version of hyper package into users environment, If we distribute over conda, we need to make sure hyper-devel builds are available, ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok if we are waiting for new release do I use my fixed branch anyway or leave as it is?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@astaric I am not sure if we want to wait until next release. Hyper had a last release 0.7.0 more than one year ago. What means that releases are not so frequent. Is there any other solution since we are in hurry in margin this?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I've asked to make e new release, but I don't know when it will be done. We can wait few days. Otherwise, we can just use a fixed commit ID in hyper and upgrade later?

git+https://github.com/Lukasa/hyper@a8109c3aaf8e2aa7314f23bf46e20af2bc241cd7

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to use git+... in requirements and merge this PR.

But please do not create a (pypi) release that depends on development versions of packages.

import logging

log = logging.getLogger(__name__)

# Define the largest chunk of data we'll send in one go. Realistically, we
# should take the MSS into account but that's pretty dull, so let's just say
# 1kB and call it a day.
MAX_CHUNK = 1024


class Stream(object):

def send_data(self, data, final):
"""
Send some data on the stream. If this is the end of the data to be
sent, the ``final`` flag _must_ be set to True. If no data is to be
sent, set ``data`` to ``None``.
"""
# Define a utility iterator for file objects.
def file_iterator(fobj):
while True:
data = fobj.read(MAX_CHUNK)
yield data
if len(data) < MAX_CHUNK:
break

# Build the appropriate iterator for the data, in chunks of CHUNK_SIZE.
if hasattr(data, 'read'):
chunks = file_iterator(data)
else:
chunks = (data[i:i+MAX_CHUNK]
for i in range(0, len(data), MAX_CHUNK))

# since we need to know when we have a last package we need to know
# if there is another package in advance
cur_chunk = None
try:
cur_chunk = next(chunks)
while True:
next_chunk = next(chunks)
self._send_chunk(cur_chunk, False)
cur_chunk = next_chunk
except StopIteration:
if cur_chunk is not None: # cur_chunk none when no chunks to send
self._send_chunk(cur_chunk, final)


def _send_chunk(self, data, final):
"""
Implements most of the sending logic.
Takes a single chunk of size at most MAX_CHUNK, wraps it in a frame and
sends it. Optionally sets the END_STREAM flag if this is the last chunk
(determined by being of size less than MAX_CHUNK) and no more data is
to be sent.
"""
# If we don't fit in the connection window, try popping frames off the
# connection in hope that one might be a window update frame.
while len(data) > self._out_flow_control_window:
self._recv_cb()

# Send the frame and decrement the flow control window.
with self._conn as conn:
conn.send_data(
stream_id=self.stream_id, data=data, end_stream=final
)
self._send_outstanding_data()

if final:
self.local_closed = True
99 changes: 76 additions & 23 deletions orangecontrib/imageanalytics/image_embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,38 @@
'name': 'Inception v3',
'description': 'Google\'s Inception v3 model trained on ImageNet.',
'target_image_size': (299, 299),
'layers': ['penultimate']
'layers': ['penultimate'],
'order': 0
},
'painters': {
'name': 'Painters',
'description':
'A model trained to predict painters from artwork images.',
'target_image_size': (256, 256),
'layers': ['penultimate']
'layers': ['penultimate'],
'order': 3
},
'deeploc': {
'name': 'DeepLoc',
'description': 'A model trained to analyze yeast cell images.',
'target_image_size': (64, 64),
'layers': ['penultimate'],
'order': 4
},
'vgg16': {
'name': 'VGG-16',
'description': '16-layer image recognition model trained on ImageNet.',
'target_image_size': (224, 224),
'layers': ['penultimate'],
'order': 1
},
'vgg19': {
'name': 'VGG-19',
'description': '19-layer image recognition model trained on ImageNet.',
'target_image_size': (224, 224),
'layers': ['penultimate'],
'order': 2
}
}


Expand Down Expand Up @@ -66,9 +89,11 @@ class ImageEmbedder(Http2Client):
>>> embedded_images, skipped_images, num_skipped = image_embedder(images)
"""
_cache_file_blueprint = '{:s}_{:s}_embeddings.pickle'
MAX_REPEATS = 4
CANNOT_LOAD = "cannot load"

def __init__(self, model="inception-v3", layer="penultimate",
server_url='api.biolab.si:8080'):
server_url='api.garaza.io:443'):
super().__init__(server_url)
model_settings = self._get_model_settings_confidently(model, layer)
self._model = model
Expand Down Expand Up @@ -163,26 +188,45 @@ def from_file_paths(self, file_paths, image_processed_callback=None):
if not self.is_connected_to_server():
self.reconnect_to_server()

all_embeddings = []
all_embeddings = [None] * len(file_paths)
repeats_counter = 0

# repeat while all images has embeddings or
# while counter counts out (prevents cycling)
while len([el for el in all_embeddings if el is None]) > 0 and \
repeats_counter < self.MAX_REPEATS:

# take all images without embeddings yet
selected_indices = [i for i, v in enumerate(all_embeddings)
if v is None]
file_paths_wo_emb = [(file_paths[i], i) for i in selected_indices]

for batch in self._yield_in_batches(file_paths_wo_emb):
b_images, b_indices = zip(*batch)
try:
embeddings = self._send_to_server(
b_images, image_processed_callback
)
except MaxNumberOfRequestsError:
# maximum number of http2 requests through a single
# connection is exceeded and a remote peer has closed
# the connection so establish a new connection and retry
# with the same batch (should happen rarely as the setting
# is usually set to >= 1000 requests in http2)
self.reconnect_to_server()
embeddings = [None] * len(batch)

# insert embeddings into the list
for i, emb in zip(b_indices, embeddings):
all_embeddings[i] = emb

for batch in self._yield_in_batches(file_paths):
try:
embeddings = self._send_to_server(
batch, image_processed_callback
)
except MaxNumberOfRequestsError:
# maximum number of http2 requests through a single
# connection is exceeded and a remote peer has closed
# the connection so establish a new connection and retry
# with the same batch (should happen rarely as the setting
# is usually set to >= 1000 requests in http2)
self.reconnect_to_server()
embeddings = self._send_to_server(
batch, image_processed_callback
)
self.persist_cache()
repeats_counter += 1

all_embeddings += embeddings
self.persist_cache()
# change images that were not loaded from 'cannot loaded' to None
all_embeddings = \
[None if not isinstance(el, np.ndarray) and el == self.CANNOT_LOAD
else el for el in all_embeddings]

return np.array(all_embeddings)

Expand Down Expand Up @@ -304,14 +348,23 @@ def _get_responses_from_server(self, http_streams, cache_keys,
if self.cancelled:
raise EmbeddingCancelledException()

if not stream_id and not cache_key:
# when image cannot be loaded
embeddings.append(self.CANNOT_LOAD)

if image_processed_callback:
image_processed_callback(success=False)
continue


if not stream_id:
# skip rest of the waiting because image was either
# skipped at loading or is present in the local cache
embedding = self._get_cached_result_or_none(cache_key)
embeddings.append(embedding)

if image_processed_callback:
image_processed_callback()
image_processed_callback(success=embedding is not None)
continue

try:
Expand All @@ -331,7 +384,7 @@ def _get_responses_from_server(self, http_streams, cache_keys,
self._cache_dict[cache_key] = embedding

if image_processed_callback:
image_processed_callback()
image_processed_callback(embeddings[-1] is not None)

return embeddings

Expand Down
9 changes: 9 additions & 0 deletions orangecontrib/imageanalytics/tests/test_image_embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,12 @@ def test_embedding_cancelled(self):
self.embedder.cancelled = True
with self.assertRaises(Exception):
self.embedder(self.single_example)

def test_version(self):
"""
Test if new version of a hyper library is published
When this test start to fails remove temporary fix in http2_client
marked with TODO
"""
import hyper
self.assertEqual(hyper.__version__, "0.7.0")
16 changes: 9 additions & 7 deletions orangecontrib/imageanalytics/widgets/owimageembedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class OWImageEmbedding(OWWidget):

def __init__(self):
super().__init__()
self.embedders = sorted(list(EMBEDDERS_INFO))
self.embedders = sorted(list(EMBEDDERS_INFO),
key=lambda k: EMBEDDERS_INFO[k]['order'])
self._image_attributes = None
self._input_data = None
self._log = logging.getLogger(__name__)
Expand Down Expand Up @@ -118,7 +119,7 @@ def _setup_layout(self):
self.cancel_button.clicked.connect(self.cancel)
hbox = hBox(self.controlArea)
hbox.layout().addWidget(self.cancel_button)
self.cancel_button.hide()
self.cancel_button.setDisabled(True)

def _init_server_connection(self):
self.setBlocking(False)
Expand Down Expand Up @@ -192,7 +193,7 @@ def commit(self):
return

self._set_server_info(connected=True)
self.cancel_button.show()
self.cancel_button.setDisabled(False)
self.cb_image_attr.setDisabled(True)
self.cb_embedder.setDisabled(True)

Expand All @@ -211,8 +212,9 @@ def commit(self):
set_progress = qconcurrent.methodinvoke(
self, "__progress_set", (float,))

def advance():
set_progress(next(ticks))
def advance(success=True):
if success:
set_progress(next(ticks))

def cancel():
task.future.cancel()
Expand Down Expand Up @@ -264,7 +266,7 @@ def __set_results(self, f):

task, self._task = self._task, None
self.auto_commit_widget.setDisabled(False)
self.cancel_button.hide()
self.cancel_button.setDisabled(True)
self.cb_image_attr.setDisabled(False)
self.cb_embedder.setDisabled(False)
self.progressBarFinished(processEvents=None)
Expand Down Expand Up @@ -332,7 +334,7 @@ def cancel(self):
pass

self.auto_commit_widget.setDisabled(False)
self.cancel_button.hide()
self.cancel_button.setDisabled(True)
self.progressBarFinished(processEvents=None)
self.setBlocking(False)
self.cb_image_attr.setDisabled(False)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
hyper==0.7.0
hyper>=0.7.0
numpy>=1.10.0
Orange3>=3.3.5
Pillow>=2.7.0
Expand Down