|
7 | 7 |
|
8 | 8 | from boto.s3.connection import S3Connection |
9 | 9 | from boto.s3.key import Key |
| 10 | +import dcos.config |
| 11 | +import dcos.http |
| 12 | +import dcos.package |
10 | 13 | import os |
11 | 14 | import pytest |
12 | 15 | import re |
13 | 16 | import shakedown |
14 | 17 | import subprocess |
| 18 | +import urllib |
| 19 | + |
| 20 | + |
| 21 | +def setup_module(module): |
| 22 | + _require_package('hdfs') |
| 23 | + _install_spark() |
| 24 | + |
| 25 | + |
| 26 | +def _install_spark(): |
| 27 | + options = {"hdfs": |
| 28 | + {"config-url": |
| 29 | + "http://hdfs.marathon.mesos:9000/v1/connection"}} |
| 30 | + |
| 31 | + if os.environ.get('SECURITY') == 'strict': |
| 32 | + options['service'] = {"user": "nobody", |
| 33 | + "principal": "service-acct", |
| 34 | + "secret_name": "secret" } |
| 35 | + |
| 36 | + shakedown.install_package('spark', options_json=options, wait_for_completion=True) |
| 37 | + |
| 38 | + def pred(): |
| 39 | + dcos_url = dcos.config.get_config_val("core.dcos_url") |
| 40 | + spark_url = urllib.parse.urljoin(dcos_url, "/service/spark") |
| 41 | + status_code = dcos.http.get(spark_url).status_code |
| 42 | + return status_code == 200 |
| 43 | + |
| 44 | + shakedown.spinner.wait_for(pred) |
| 45 | + |
| 46 | + |
| 47 | +def _require_package(pkg_name): |
| 48 | + pkg_manager = dcos.package.get_package_manager() |
| 49 | + installed_pkgs = dcos.package.installed_packages(pkg_manager, None, None, False) |
| 50 | + if not any(pkg['name'] == pkg_name for pkg in installed_pkgs): |
| 51 | + shakedown.install_package(pkg_name, wait_for_completion=True) |
| 52 | + shakedown.wait_for(_is_hdfs_ready, ignore_exceptions=False, timeout_seconds=600) |
| 53 | + |
| 54 | + |
| 55 | +DEFAULT_HDFS_TASK_COUNT=8 |
| 56 | +def _is_hdfs_ready(expected_tasks = DEFAULT_HDFS_TASK_COUNT): |
| 57 | + running_tasks = [t for t in shakedown.get_service_tasks('hdfs') \ |
| 58 | + if t['state'] == 'TASK_RUNNING'] |
| 59 | + return len(running_tasks) >= expected_tasks |
| 60 | + |
| 61 | + |
| 62 | +def test_teragen(): |
| 63 | + jar_url = "https://downloads.mesosphere.io/spark/examples/spark-terasort-1.0-jar-with-dependencies_2.11.jar" |
| 64 | + _run_tests(jar_url, |
| 65 | + "1g hdfs:///terasort_in", |
| 66 | + "Number of records written", |
| 67 | + {"--class": "com.github.ehiggs.spark.terasort.TeraGen"}) |
15 | 68 |
|
16 | 69 |
|
17 | 70 | def test_jar(): |
|
0 commit comments