Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7040.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for running replication over Redis when using workers.
40 changes: 40 additions & 0 deletions stubs/txredisapi.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Contains *incomplete* type hints for txredisapi.
"""

from typing import List, Optional, Union

class RedisProtocol:
def publish(self, channel: str, message: bytes): ...

class SubscriberProtocol:
def subscribe(self, channels: Union[str, List[str]]): ...

def lazyConnection(
host: str = ...,
port: int = ...,
dbid: Optional[int] = ...,
reconnect: bool = ...,
charset: str = ...,
password: Optional[str] = ...,
connectTimeout: Optional[int] = ...,
replyTimeout: Optional[int] = ...,
convertNumbers: bool = ...,
) -> RedisProtocol: ...

class SubscriberFactory:
def buildProtocol(self, addr): ...
6 changes: 6 additions & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ def _configure_named_resource(self, name, compress=False):
def start_listening(self, listeners):
config = self.get_config()

if config.redis_enabled:
# If redis is enabled we connect via the replication command handler
# in the same way as the workers (since we're effectively a client
# rather than a server).
self.get_tcp_replication().start_replication(self)

for listener in listeners:
if listener["type"] == "http":
self._listening_services.extend(self._listener_http(config, listener))
Expand Down
2 changes: 2 additions & 0 deletions synapse/config/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from .password_auth_providers import PasswordAuthProviderConfig
from .push import PushConfig
from .ratelimiting import RatelimitConfig
from .redis import RedisConfig
from .registration import RegistrationConfig
from .repository import ContentRepositoryConfig
from .room_directory import RoomDirectoryConfig
Expand Down Expand Up @@ -82,4 +83,5 @@ class HomeServerConfig(RootConfig):
RoomDirectoryConfig,
ThirdPartyRulesConfig,
TracerConfig,
RedisConfig,
]
35 changes: 35 additions & 0 deletions synapse/config/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.config._base import Config
from synapse.python_dependencies import check_requirements


class RedisConfig(Config):
section = "redis"

def read_config(self, config, **kwargs):
redis_config = config.get("redis", {})
self.redis_enabled = redis_config.get("enabled", False)

if not self.redis_enabled:
return

check_requirements("redis")

self.redis_host = redis_config.get("host", "localhost")
self.redis_port = redis_config.get("port", 6379)
self.redis_dbid = redis_config.get("dbid")
self.redis_password = redis_config.get("password")
1 change: 1 addition & 0 deletions synapse/python_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
"sentry": ["sentry-sdk>=0.7.2"],
"opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
"jwt": ["pyjwt>=1.6.4"],
"redis": ["txredisapi>=1.4.7"],
}

ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str]
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
logger = logging.getLogger(__name__)


class ReplicationClientFactory(ReconnectingClientFactory):
class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
"""Factory for building connections to the master. Will reconnect if the
connection is lost.

Expand Down
18 changes: 18 additions & 0 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,3 +454,21 @@ class RemoteServerUpCommand(_SimpleCommand):
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
)


def parse_command_from_line(line: str) -> Command:
"""Parses a command from a received line.

Line should already be stripped of whitespace and be checked if blank.
"""

idx = line.index(" ")
if idx >= 0:
cmd_name = line[:idx]
rest_of_line = line[idx + 1 :]
else:
cmd_name = line
rest_of_line = ""

cmd_cls = COMMAND_MAP[cmd_name]
return cmd_cls.from_line(rest_of_line)
50 changes: 43 additions & 7 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@

from prometheus_client import Counter

from twisted.internet.protocol import ReconnectingClientFactory

from synapse.metrics import LaterGauge
from synapse.replication.tcp.client import ReplicationClientFactory
from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
Expand Down Expand Up @@ -92,7 +94,7 @@ def __init__(self, hs):
self._pending_batches = {} # type: Dict[str, List[Any]]

# The factory used to create connections.
self._factory = None # type: Optional[ReplicationClientFactory]
self._factory = None # type: Optional[ReconnectingClientFactory]

# The currently connected connections.
self._connections = [] # type: List[AbstractConnection]
Expand All @@ -119,11 +121,45 @@ def start_replication(self, hs):
"""Helper method to start a replication connection to the remote server
using TCP.
"""
client_name = hs.config.worker_name
self._factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, self._factory)
if hs.config.redis.redis_enabled:
from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
)
import txredisapi

logger.info(
"Connecting to redis (host=%r port=%r DBID=%r)",
hs.config.redis_host,
hs.config.redis_port,
hs.config.redis_dbid,
)

# We need two connections to redis, one for the subscription stream and
# one to send commands to (as you can't send further redis commands to a
# connection after SUBSCRIBE is called).

# First create the connection for sending commands.
outbound_redis_connection = txredisapi.lazyConnection(
host=hs.config.redis_host,
port=hs.config.redis_port,
dbid=hs.config.redis_dbid,
password=hs.config.redis.redis_password,
reconnect=True,
)

# Now create the factory/connection for the subscription stream.
self._factory = RedisDirectTcpReplicationClientFactory(
hs, outbound_redis_connection
)
hs.get_reactor().connectTCP(
hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory,
)
else:
client_name = hs.config.worker_name
self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, self._factory)

async def on_REPLICATE(self, cmd: ReplicateCommand):
# We only want to announce positions by the writer of the streams.
Expand Down
38 changes: 12 additions & 26 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import (
COMMAND_MAP,
VALID_CLIENT_COMMANDS,
VALID_SERVER_COMMANDS,
Command,
Expand All @@ -72,6 +71,7 @@
PingCommand,
ReplicateCommand,
ServerCommand,
parse_command_from_line,
)
from synapse.types import Collection
from synapse.util import Clock
Expand Down Expand Up @@ -210,38 +210,24 @@ def lineReceived(self, line: bytes):

linestr = line.decode("utf-8")

# split at the first " ", handling one-word commands
idx = linestr.index(" ")
if idx >= 0:
cmd_name = linestr[:idx]
rest_of_line = linestr[idx + 1 :]
else:
cmd_name = linestr
rest_of_line = ""
try:
cmd = parse_command_from_line(linestr)
except Exception as e:
logger.exception("[%s] failed to parse line: %r", self.id(), linestr)
self.send_error("failed to parse line: %r (%r):" % (e, linestr))
return

if cmd_name not in self.VALID_INBOUND_COMMANDS:
logger.error("[%s] invalid command %s", self.id(), cmd_name)
self.send_error("invalid command: %s", cmd_name)
if cmd.NAME not in self.VALID_INBOUND_COMMANDS:
logger.error("[%s] invalid command %s", self.id(), cmd.NAME)
self.send_error("invalid command: %s", cmd.NAME)
return

self.last_received_command = self.clock.time_msec()

self.inbound_commands_counter[cmd_name] = (
self.inbound_commands_counter[cmd_name] + 1
self.inbound_commands_counter[cmd.NAME] = (
self.inbound_commands_counter[cmd.NAME] + 1
)

cmd_cls = COMMAND_MAP[cmd_name]
try:
cmd = cmd_cls.from_line(rest_of_line)
except Exception as e:
logger.exception(
"[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line
)
self.send_error(
"failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line)
)
return

# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
Expand Down
Loading