Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Copy Markdown
Contributor Author

@MadLittleMods MadLittleMods Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers, here are all the new changes that are different from the original PR -> https://github.com/matrix-org/synapse/pull/15913/files/7ad4cfc6a056f42a884803072cd0440db07bdc69..4e49e661577ea4d0caf55a159df119d304121215. At the top in the PR description, there is a section that describes the "Details of the additional fixes in the re-introduction PR"

If you're looking for context/discussion on any previous code, #15773 might have some answers.

File renamed without changes.
10 changes: 5 additions & 5 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -3963,11 +3963,11 @@ federation_sender_instances:
When using workers this should be a map from [`worker_name`](#worker_name) to the HTTP
replication listener of the worker, if configured, and to the main process. Each worker
declared under [`stream_writers`](../../workers.md#stream-writers) and
[`outbound_federation_restricted_to`](#outbound_federation_restricted_to) needs a HTTP replication listener, and that
listener should be included in the `instance_map`. The main process also needs an entry
on the `instance_map`, and it should be listed under `main` **if even one other worker
exists**. Ensure the port matches with what is declared inside the `listener` block for
a `replication` listener.
[`outbound_federation_restricted_to`](#outbound_federation_restricted_to) needs a HTTP
replication listener, and that listener should be included in the `instance_map`. The
main process also needs an entry on the `instance_map`, and it should be listed under
`main` **if even one other worker exists**. Ensure the port matches with what is
declared inside the `listener` block for a `replication` listener.


Example configuration:
Expand Down
7 changes: 7 additions & 0 deletions synapse/api/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ def __init__(self, msg: str):
super().__init__(HTTPStatus.BAD_REQUEST, msg, Codes.BAD_JSON)


class InvalidProxyCredentialsError(SynapseError):
"""Error raised when the proxy credentials are invalid."""

def __init__(self, msg: str, errcode: str = Codes.UNKNOWN):
super().__init__(401, msg, errcode)


class ProxiedRequestError(SynapseError):
"""An error from a general matrix endpoint, eg. from a proxied Matrix API call.

Expand Down
2 changes: 1 addition & 1 deletion synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def listen_http(
version_string,
max_request_body_size=max_request_body_size,
reactor=reactor,
federation_agent=hs.get_federation_http_client().agent,
hs=hs,
)

if isinstance(listener_config, TCPListenerConfig):
Expand Down
5 changes: 5 additions & 0 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,11 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
outbound_federation_restricted_to
)
if outbound_federation_restricted_to:
if not self.worker_replication_secret:
raise ConfigError(
"`worker_replication_secret` must be configured when using `outbound_federation_restricted_to`."
)
Comment on lines +416 to +419
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we okay with re-using worker_replication_secret for use with Proxy-Authorization between workers? It's still inter-worker communication but it's not replication traffic.


for instance in outbound_federation_restricted_to:
if instance not in self.instance_map:
raise ConfigError(
Expand Down
20 changes: 19 additions & 1 deletion synapse/http/connectproxyclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import base64
import logging
from typing import Optional, Union
Expand Down Expand Up @@ -39,8 +40,14 @@ class ProxyConnectError(ConnectError):
pass


@attr.s(auto_attribs=True)
class ProxyCredentials:
@abc.abstractmethod
def as_proxy_authorization_value(self) -> bytes:
raise NotImplementedError()


@attr.s(auto_attribs=True)
class BasicProxyCredentials(ProxyCredentials):
username_password: bytes

def as_proxy_authorization_value(self) -> bytes:
Expand All @@ -55,6 +62,17 @@ def as_proxy_authorization_value(self) -> bytes:
return b"Basic " + base64.encodebytes(self.username_password)


@attr.s(auto_attribs=True)
class BearerProxyCredentials(ProxyCredentials):
access_token: bytes

def as_proxy_authorization_value(self) -> bytes:
"""
Return the value for a Proxy-Authorization header (i.e. 'Bearer xxx').
"""
return b"Bearer " + self.access_token


@implementer(IStreamClientEndpoint)
class HTTPConnectProxyEndpoint:
"""An Endpoint implementation which will send a CONNECT request to an http proxy
Expand Down
14 changes: 12 additions & 2 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
encode_query_args,
read_body_with_max_size,
)
from synapse.http.connectproxyclient import BearerProxyCredentials
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.http.proxyagent import ProxyAgent
from synapse.http.types import QueryParams
Expand Down Expand Up @@ -407,14 +408,23 @@ def __init__(
hs.config.server.federation_ip_range_blocklist,
)
else:
proxy_authorization_secret = hs.config.worker.worker_replication_secret
assert (
proxy_authorization_secret is not None
), "`worker_replication_secret` must be set when using `outbound_federation_restricted_to` (used to authenticate requests across workers)"
federation_proxy_credentials = BearerProxyCredentials(
proxy_authorization_secret.encode("ascii")
)

# We need to talk to federation via the proxy via one of the configured
# locations
federation_proxies = outbound_federation_restricted_to.locations
federation_proxy_locations = outbound_federation_restricted_to.locations
federation_agent = ProxyAgent(
self.reactor,
self.reactor,
tls_client_options_factory,
federation_proxies=federation_proxies,
federation_proxy_locations=federation_proxy_locations,
federation_proxy_credentials=federation_proxy_credentials,
)

# Use a BlocklistingAgentWrapper to prevent circumventing the IP
Expand Down
80 changes: 57 additions & 23 deletions synapse/http/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
from twisted.python.failure import Failure
from twisted.web.client import ResponseDone
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent, IResponse
from twisted.web.iweb import IResponse
from twisted.web.resource import IResource
from twisted.web.server import Site
from twisted.web.server import Request, Site

from synapse.api.errors import Codes
from synapse.api.errors import Codes, InvalidProxyCredentialsError
from synapse.http import QuieterFileBodyProducer
from synapse.http.server import _AsyncResource
from synapse.logging.context import make_deferred_yieldable, run_in_background
Expand All @@ -38,6 +38,7 @@

if TYPE_CHECKING:
from synapse.http.site import SynapseRequest
from synapse.server import HomeServer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -98,16 +99,48 @@ class ProxyResource(_AsyncResource):

isLeaf = True

def __init__(self, reactor: ISynapseReactor, federation_agent: IAgent):
def __init__(self, reactor: ISynapseReactor, hs: "HomeServer"):
super().__init__(True)

self.reactor = reactor
self.agent = federation_agent
self.agent = hs.get_federation_http_client().agent

self._proxy_authorization_secret = hs.config.worker.worker_replication_secret

def _check_auth(self, request: Request) -> None:
# The `matrix-federation://` proxy functionality can only be used with auth.
# Protect homserver admins forgetting to configure a secret.
assert self._proxy_authorization_secret is not None

# Get the authorization header.
auth_headers = request.requestHeaders.getRawHeaders(b"Proxy-Authorization")

if not auth_headers:
raise InvalidProxyCredentialsError(
"Missing Proxy-Authorization header.", Codes.MISSING_TOKEN
)
if len(auth_headers) > 1:
raise InvalidProxyCredentialsError(
"Too many Proxy-Authorization headers.", Codes.UNAUTHORIZED
)
parts = auth_headers[0].split(b" ")
if parts[0] == b"Bearer" and len(parts) == 2:
received_secret = parts[1].decode("ascii")
if self._proxy_authorization_secret == received_secret:
# Success!
return

raise InvalidProxyCredentialsError(
"Invalid Proxy-Authorization header.", Codes.UNAUTHORIZED
)

async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
uri = urllib.parse.urlparse(request.uri)
assert uri.scheme == b"matrix-federation"

# Check the authorization headers before handling the request.
self._check_auth(request)

headers = Headers()
for header_name in (b"User-Agent", b"Authorization", b"Content-Type"):
header_value = request.getHeader(header_name)
Expand Down Expand Up @@ -171,23 +204,24 @@ def _send_error_response(
f: failure.Failure,
request: "SynapseRequest",
) -> None:
request.setResponseCode(502)
if isinstance(f.value, InvalidProxyCredentialsError):
error_response_code = f.value.code
error_response_json = {"errcode": f.value.errcode, "err": f.value.msg}
else:
error_response_code = 502
error_response_json = {
"errcode": Codes.UNKNOWN,
"err": "ProxyResource: Error when proxying request: %s %s -> %s"
% (
request.method.decode("ascii"),
request.uri.decode("ascii"),
f,
),
}

request.setResponseCode(error_response_code)
request.setHeader(b"Content-Type", b"application/json")
request.write(
(
json.dumps(
{
"errcode": Codes.UNKNOWN,
"err": "ProxyResource: Error when proxying request: %s %s -> %s"
% (
request.method.decode("ascii"),
request.uri.decode("ascii"),
f,
),
}
)
).encode()
)
request.write((json.dumps(error_response_json)).encode())
request.finish()


Expand Down Expand Up @@ -235,11 +269,11 @@ def __init__(
self,
resource: IResource,
reactor: ISynapseReactor,
federation_agent: IAgent,
hs: "HomeServer",
):
super().__init__(resource, reactor=reactor)

self._proxy_resource = ProxyResource(reactor, federation_agent)
self._proxy_resource = ProxyResource(reactor, hs=hs)

def getResourceFor(self, request: "SynapseRequest") -> IResource:
uri = urllib.parse.urlparse(request.uri)
Expand Down
Loading