Skip to content

Commit d7cc165

Browse files
Test queue worker and refactor end to end tests
Signed-off-by: andreasjansson <[email protected]>
1 parent bb59a2e commit d7cc165

File tree

7 files changed

+368
-142
lines changed

7 files changed

+368
-142
lines changed

end-to-end-test/end_to_end_test/__init__.py

Whitespace-only changes.
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import os
2+
import subprocess
3+
import tempfile
4+
from waiting import wait
5+
import requests
6+
import pytest
7+
8+
from .util import random_string, find_free_port, docker_run
9+
10+
11+
@pytest.fixture
12+
def cog_server_port():
13+
old_cwd = os.getcwd()
14+
with tempfile.TemporaryDirectory() as cog_dir:
15+
os.chdir(cog_dir)
16+
port = str(find_free_port())
17+
server_proc = subprocess.Popen(["cog", "server", "--port", port])
18+
resp = wait(
19+
lambda: requests.get("http://localhost:" + port + "/ping"),
20+
timeout_seconds=60,
21+
expected_exceptions=(requests.exceptions.ConnectionError,),
22+
)
23+
assert resp.text == "pong"
24+
25+
yield port
26+
27+
os.chdir(old_cwd)
28+
server_proc.kill()
29+
30+
31+
@pytest.fixture
32+
def project_dir(tmpdir_factory):
33+
tmpdir = tmpdir_factory.mktemp("project")
34+
with open(tmpdir / "infer.py", "w") as f:
35+
f.write(
36+
"""
37+
import time
38+
import tempfile
39+
from pathlib import Path
40+
import cog
41+
42+
class Model(cog.Model):
43+
def setup(self):
44+
self.foo = "foo"
45+
46+
@cog.input("text", type=str)
47+
@cog.input("path", type=Path)
48+
@cog.input("output_file", type=bool, default=False)
49+
def run(self, text, path, output_file):
50+
time.sleep(1)
51+
with open(path) as f:
52+
output = self.foo + text + f.read()
53+
if output_file:
54+
tmp = tempfile.NamedTemporaryFile(suffix=".txt")
55+
tmp.close()
56+
tmp_path = Path(tmp.name)
57+
with tmp_path.open("w") as f:
58+
f.write(output)
59+
return tmp_path
60+
return output
61+
"""
62+
)
63+
with open(tmpdir / "cog.yaml", "w") as f:
64+
cog_yaml = """
65+
name: andreas/hello-world
66+
model: infer.py:Model
67+
examples:
68+
- input:
69+
text: "foo"
70+
path: "@myfile.txt"
71+
output: "foofoobaz"
72+
- input:
73+
text: "bar"
74+
path: "@myfile.txt"
75+
output: "foobarbaz"
76+
- input:
77+
text: "qux"
78+
path: "@myfile.txt"
79+
environment:
80+
architectures:
81+
- cpu
82+
"""
83+
f.write(cog_yaml)
84+
85+
with open(tmpdir / "myfile.txt", "w") as f:
86+
f.write("baz")
87+
88+
return tmpdir
89+
90+
91+
@pytest.fixture
92+
def redis_port():
93+
container_name = "cog-test-redis-" + random_string(10)
94+
port = find_free_port()
95+
with docker_run(
96+
"redis",
97+
name=container_name,
98+
publish=[{"host": port, "container": 6379}],
99+
detach=True,
100+
):
101+
yield port
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import json
2+
import redis
3+
from contextlib import contextmanager
4+
import multiprocessing
5+
from flask import Flask, request, jsonify, Response
6+
7+
from .util import (
8+
random_string,
9+
set_model_url,
10+
show_version,
11+
push_with_log,
12+
find_free_port,
13+
docker_run,
14+
get_local_ip,
15+
wait_for_port,
16+
)
17+
18+
19+
def test_queue_worker(cog_server_port, project_dir, redis_port, tmpdir_factory):
20+
user = random_string(10)
21+
model_name = random_string(10)
22+
model_url = f"http://localhost:{cog_server_port}/{user}/{model_name}"
23+
24+
set_model_url(model_url, project_dir)
25+
version_id = push_with_log(project_dir)
26+
version_data = show_version(model_url, version_id)
27+
28+
input_queue = multiprocessing.Queue()
29+
output_queue = multiprocessing.Queue()
30+
controller_port = find_free_port()
31+
local_ip = get_local_ip()
32+
upload_url = f"http://{local_ip}:{controller_port}/upload"
33+
redis_host = local_ip
34+
worker_name = "test-worker"
35+
infer_queue_name = "infer-queue"
36+
response_queue_name = "response-queue"
37+
38+
wait_for_port(redis_host, redis_port)
39+
40+
redis_client = redis.Redis(host=redis_host, port=redis_port, db=0)
41+
42+
with queue_controller(input_queue, output_queue, controller_port), docker_run(
43+
image=version_data["images"][0]["uri"],
44+
interactive=True,
45+
command=[
46+
"cog-redis-queue-worker",
47+
redis_host,
48+
str(redis_port),
49+
infer_queue_name,
50+
upload_url,
51+
worker_name,
52+
],
53+
):
54+
redis_client.xgroup_create(
55+
mkstream=True, groupname=infer_queue_name, name=infer_queue_name, id="$"
56+
)
57+
58+
infer_id = random_string(10)
59+
redis_client.xadd(
60+
name=infer_queue_name,
61+
fields={
62+
"value": json.dumps(
63+
{
64+
"id": infer_id,
65+
"inputs": {
66+
"text": {"value": "bar"},
67+
"path": {
68+
"file": {
69+
"name": "myinput.txt",
70+
"url": f"http://{local_ip}:{controller_port}/download",
71+
}
72+
},
73+
},
74+
"response_queue": response_queue_name,
75+
}
76+
),
77+
},
78+
)
79+
input_queue.put("test")
80+
response = json.loads(redis_client.brpop(response_queue_name)[1])["value"]
81+
assert response == "foobartest"
82+
83+
infer_id = random_string(10)
84+
redis_client.xadd(
85+
name=infer_queue_name,
86+
fields={
87+
"value": json.dumps(
88+
{
89+
"id": infer_id,
90+
"inputs": {
91+
"text": {"value": "bar"},
92+
"output_file": {"value": "true"},
93+
"path": {
94+
"file": {
95+
"name": "myinput.txt",
96+
"url": f"http://{local_ip}:{controller_port}/download",
97+
}
98+
},
99+
},
100+
"response_queue": response_queue_name,
101+
}
102+
),
103+
},
104+
)
105+
input_queue.put("test")
106+
response_contents = output_queue.get()
107+
response = json.loads(redis_client.brpop(response_queue_name)[1])["file"]
108+
assert response_contents.decode() == "foobartest"
109+
assert response["url"] == "uploaded.txt"
110+
111+
112+
@contextmanager
113+
def queue_controller(input_queue, output_queue, controller_port):
114+
controller = QueueController(input_queue, output_queue, controller_port)
115+
controller.start()
116+
yield controller
117+
controller.kill()
118+
119+
120+
class QueueController(multiprocessing.Process):
121+
def __init__(self, input_queue, output_queue, port):
122+
super().__init__()
123+
self.input_queue = input_queue
124+
self.output_queue = output_queue
125+
self.port = port
126+
127+
def run(self):
128+
app = Flask("test-queue-controller")
129+
130+
@app.route("/", methods=["GET"])
131+
def handle_index():
132+
return "OK"
133+
134+
@app.route("/upload", methods=["PUT"])
135+
def handle_upload():
136+
f = request.files["file"]
137+
contents = f.read()
138+
self.output_queue.put(contents)
139+
return jsonify({"url": "uploaded.txt"})
140+
141+
@app.route("/download", methods=["GET"])
142+
def handle_download():
143+
contents = self.input_queue.get()
144+
return Response(
145+
contents,
146+
mimetype="text/plain",
147+
headers={"Content-disposition": "attachment; filename=myinput.txt"},
148+
)
149+
150+
app.run(host="0.0.0.0", port=self.port, debug=False)

0 commit comments

Comments
 (0)