-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_experiments.py
More file actions
executable file
·108 lines (84 loc) · 3 KB
/
run_experiments.py
File metadata and controls
executable file
·108 lines (84 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import os
import subprocess
import time
args = {
"messages_per_burst": 10,
"sleeps_per_burst": 10,
"sleep_time": 0.09,
"seconds_per_burst": 1,
"bursts": 100
}
mps_1 = {
**args,
"messages_per_burst": 1,
"sleeps_per_burst": 1,
"sleep_time": 0.9,
}
mps_20 = {
**args,
"messages_per_burst": 20,
"sleeps_per_burst": 20,
"sleep_time": 0.09/2,
}
mps_30 = {
**args,
"messages_per_burst": 30,
"sleeps_per_burst": 30,
"sleep_time": 0.09/3,
}
mps_50 = {
**args,
"messages_per_burst": 50,
"sleeps_per_burst": 50,
"sleep_time": 0.09/5,
}
# Define experiment parameters as a list of dictionaries
experiments = [
# {"parallelism": 16, "benchmark_args": {**args}},
# {"parallelism": 8, "benchmark_args": {**args}},
# {"parallelism": 4, "benchmark_args": {**args}},
# {"parallelism": 2, "benchmark_args": {**args}},
# {"parallelism": 1, "benchmark_args": {**args}},
# {"parallelism": 16, "benchmark_args": {**mps_20}},
# {"parallelism": 8, "benchmark_args": {**mps_20}},
# {"parallelism": 4, "benchmark_args": {**mps_20}},
# {"parallelism": 2, "benchmark_args": {**mps_20}},
# {"parallelism": 1, "benchmark_args": {**mps_20}},
{"parallelism": 16, "benchmark_args": {**mps_50}},
{"parallelism": 8, "benchmark_args": {**mps_50}},
{"parallelism": 4, "benchmark_args": {**mps_50}},
{"parallelism": 2, "benchmark_args": {**mps_50}},
{"parallelism": 1, "benchmark_args": {**mps_50}},
]
print("Tearing down docker containers")
subprocess.run(["docker", "compose", "down"], check=True)
for e in ["pipelined", "parallel", "baseline"]:
for exp in experiments:
print(f"Starting experiment {exp}")
# Start docker compose
subprocess.run(["docker", "compose", "up", "-d"], check=True)
time.sleep(10)
# Run Flink job
flink_cmd = [
"flink", "run", "--pyFiles", "/home/lvanmol/cascade/src,/home/lvanmol/cascade",
"--pyModule", "deathstar_movie_review.demo", "-d", "-p", str(exp['parallelism'])
]
env = os.environ
env["EXPERIMENT"] = e
subprocess.run(flink_cmd, check=True, env=env)
# Start benchmark
filename = f"{e}_p-{exp['parallelism']}_mps-{exp['benchmark_args']['messages_per_burst']}.pkl"
benchmark_cmd = [
"python", "-u", "-m", "deathstar_movie_review.start_benchmark", "--output", filename, "--experiment", e
]
for arg, val in exp['benchmark_args'].items():
benchmark_cmd.append(f"--{arg}")
benchmark_cmd.append(str(val))
subprocess.run(benchmark_cmd, check=True)
# Sleep for experiment duration
# print(f"Sleeping for {exp['sleep']} seconds...")
# time.sleep(exp['sleep'])
# Stop docker compose
subprocess.run(["docker", "compose", "down"], check=True)
print(f"Experiment completed.")
print("All experiments completed.")