Skip to content

Commit dd4465a

Browse files
committed
Add support for client side certificate authentication and verifying
server certificate using provided CA bundle for message bus (RabbitMQ) connections. Option names are consistent with the same option names for MongoDB. Update affected code so connection and URLs are only retrieved in a single place.
1 parent 64a238f commit dd4465a

File tree

29 files changed

+221
-84
lines changed

29 files changed

+221
-84
lines changed

st2actions/st2actions/notifier/notifier.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
# limitations under the License.
1515

1616
from __future__ import absolute_import
17+
1718
from datetime import datetime
1819
import json
1920

20-
from kombu import Connection
2121
from oslo_config import cfg
2222

2323
from st2common import log as logging
@@ -268,6 +268,6 @@ def _get_runner_ref(self, action_ref):
268268

269269

270270
def get_notifier():
271-
with Connection(transport_utils.get_messaging_urls()) as conn:
271+
with transport_utils.get_connection() as conn:
272272
return Notifier(conn, [NOTIFIER_ACTIONUPDATE_WORK_QUEUE],
273273
trigger_dispatcher=TriggerDispatcher(LOG))

st2actions/st2actions/resultstracker/resultstracker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
# limitations under the License.
1515

1616
from __future__ import absolute_import
17+
1718
import eventlet
1819
import six
1920

2021
from collections import defaultdict
21-
from kombu import Connection
2222

2323
from st2common.query.base import QueryContext
2424
from st2common import log as logging
@@ -111,5 +111,5 @@ def get_querier(self, query_module_name):
111111

112112

113113
def get_tracker():
114-
with Connection(transport_utils.get_messaging_urls()) as conn:
114+
with transport_utils.get_connection() as conn:
115115
return ResultsTracker(conn, [RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE])

st2actions/st2actions/scheduler/entrypoint.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
# limitations under the License.
1515

1616
from __future__ import absolute_import
17-
from kombu import Connection
1817

1918
from st2common import log as logging
2019
from st2common.util import date
@@ -105,5 +104,5 @@ def _create_execution_queue_item_db_from_liveaction(self, liveaction, delay=None
105104

106105

107106
def get_scheduler_entrypoint():
108-
with Connection(transport_utils.get_messaging_urls()) as conn:
107+
with transport_utils.get_connection() as conn:
109108
return SchedulerEntrypoint(conn, [ACTIONSCHEDULER_REQUEST_QUEUE])

st2actions/st2actions/worker.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
import sys
1818
import traceback
1919

20-
from kombu import Connection
21-
2220
from st2actions.container.base import RunnerContainer
2321
from st2common import log as logging
2422
from st2common.constants import action as action_constants
@@ -250,5 +248,5 @@ def _resume_action(self, liveaction_db):
250248

251249

252250
def get_worker():
253-
with Connection(transport_utils.get_messaging_urls()) as conn:
251+
with transport_utils.get_connection() as conn:
254252
return ActionExecutionDispatcher(conn, ACTIONRUNNER_QUEUES)

st2actions/tests/integration/test_action_state_consumer.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020

2121
import mock
2222

23-
from kombu import Connection
24-
2523
from st2common.transport.queues import RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE
2624
from st2actions.resultstracker.resultstracker import ResultsTracker
2725
from st2common.models.db.executionstate import ActionExecutionStateDB
@@ -63,7 +61,7 @@ def setUpClass(cls):
6361

6462
@mock.patch.object(TestQuerier, 'query', mock.MagicMock(return_value=(False, {})))
6563
def test_process_message(self):
66-
with Connection(transport_utils.get_messaging_urls()) as conn:
64+
with transport_utils.get_connection() as conn:
6765
tracker = ResultsTracker(conn, [RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE])
6866
tracker._bootstrap()
6967
state = ActionStateConsumerTests.get_state(

st2common/st2common/config.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,28 @@ def register_opts(ignore_errors=False):
226226
help='How many times should we retry connection before failing.'),
227227
cfg.IntOpt(
228228
'connection_retry_wait', default=10000,
229-
help='How long should we wait between connection retries.')
229+
help='How long should we wait between connection retries.'),
230+
cfg.BoolOpt(
231+
'ssl', default=False,
232+
help='Use SSL / TLS to connection to the messaging server. Same as '
233+
'appending "?ssl=true" at the end of the connection URL string.'),
234+
cfg.StrOpt(
235+
'ssl_keyfile', default=None,
236+
help='Private keyfile used to identify the local connection against RabbitMQ.'),
237+
cfg.StrOpt(
238+
'ssl_certfile', default=None,
239+
help='Certificate file used to identify the local connection (client).'),
240+
cfg.StrOpt(
241+
'ssl_cert_reqs', default=None, choices='none, optional, required',
242+
help='Specifies whether a certificate is required from the other side of the '
243+
'connection, and whether it will be validated if provided.'),
244+
cfg.StrOpt(
245+
'ssl_ca_certs', default=None,
246+
help='ca_certs file contains a set of concatenated CA certificates, which are '
247+
'used to validate certificates passed from RabbitMQ.'),
248+
cfg.StrOpt(
249+
'login_method', default=None,
250+
help='Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.).')
230251
]
231252

232253
do_register_opts(messaging_opts, 'messaging', ignore_errors)

st2common/st2common/persistence/execution.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
from st2common.models.db.execution import ActionExecutionDB
2020
from st2common.models.db.execution import ActionExecutionOutputDB
2121
from st2common.persistence.base import Access
22-
from st2common.transport import utils as transport_utils
2322

2423
__all__ = [
2524
'ActionExecution',
@@ -38,8 +37,7 @@ def _get_impl(cls):
3837
@classmethod
3938
def _get_publisher(cls):
4039
if not cls.publisher:
41-
cls.publisher = transport.execution.ActionExecutionPublisher(
42-
urls=transport_utils.get_messaging_urls())
40+
cls.publisher = transport.execution.ActionExecutionPublisher()
4341
return cls.publisher
4442

4543
@classmethod
@@ -57,8 +55,7 @@ def _get_impl(cls):
5755
@classmethod
5856
def _get_publisher(cls):
5957
if not cls.publisher:
60-
cls.publisher = transport.execution.ActionExecutionOutputPublisher(
61-
urls=transport_utils.get_messaging_urls())
58+
cls.publisher = transport.execution.ActionExecutionOutputPublisher()
6259
return cls.publisher
6360

6461
@classmethod

st2common/st2common/persistence/executionstate.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@
1414
# limitations under the License.
1515

1616
from __future__ import absolute_import
17+
1718
from st2common import transport
1819
from st2common.models.db.executionstate import actionexecstate_access
1920
from st2common.persistence import base as persistence
20-
from st2common.transport import utils as transport_utils
21+
22+
__all__ = [
23+
'ActionExecutionState'
24+
]
2125

2226

2327
class ActionExecutionState(persistence.Access):
@@ -31,6 +35,5 @@ def _get_impl(cls):
3135
@classmethod
3236
def _get_publisher(cls):
3337
if not cls.publisher:
34-
cls.publisher = transport.actionexecutionstate.ActionExecutionStatePublisher(
35-
urls=transport_utils.get_messaging_urls())
38+
cls.publisher = transport.actionexecutionstate.ActionExecutionStatePublisher()
3639
return cls.publisher

st2common/st2common/persistence/liveaction.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@
1414
# limitations under the License.
1515

1616
from __future__ import absolute_import
17+
1718
from st2common import transport
1819
from st2common.models.db.liveaction import liveaction_access
1920
from st2common.persistence import base as persistence
20-
from st2common.transport import utils as transport_utils
21+
22+
__all__ = [
23+
'LiveAction'
24+
]
2125

2226

2327
class LiveAction(persistence.StatusBasedResource):
@@ -31,8 +35,7 @@ def _get_impl(cls):
3135
@classmethod
3236
def _get_publisher(cls):
3337
if not cls.publisher:
34-
cls.publisher = transport.liveaction.LiveActionPublisher(
35-
urls=transport_utils.get_messaging_urls())
38+
cls.publisher = transport.liveaction.LiveActionPublisher()
3639
return cls.publisher
3740

3841
@classmethod

st2common/st2common/persistence/sensor.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@
1414
# limitations under the License.
1515

1616
from __future__ import absolute_import
17+
1718
from st2common import transport
1819
from st2common.models.db.sensor import sensor_type_access
1920
from st2common.persistence.base import ContentPackResource
20-
from st2common.transport import utils as transport_utils
21+
22+
__all__ = [
23+
'SensorType'
24+
]
2125

2226

2327
class SensorType(ContentPackResource):
@@ -31,6 +35,5 @@ def _get_impl(cls):
3135
@classmethod
3236
def _get_publisher(cls):
3337
if not cls.publisher:
34-
cls.publisher = transport.reactor.SensorCUDPublisher(
35-
urls=transport_utils.get_messaging_urls())
38+
cls.publisher = transport.reactor.SensorCUDPublisher()
3639
return cls.publisher

0 commit comments

Comments
 (0)