Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
Expand Down Expand Up @@ -110,6 +109,7 @@
from synapse.storage.databases.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.databases.main.presence import PresenceStore
from synapse.storage.databases.main.search import SearchWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
Expand Down Expand Up @@ -241,6 +241,7 @@ class GenericWorkerSlavedStore(
StatsStore,
UIAuthWorkerStore,
EndToEndRoomKeyStore,
PresenceStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedReceiptsStore,
Expand All @@ -259,7 +260,6 @@ class GenericWorkerSlavedStore(
SlavedTransactionStore,
SlavedProfileStore,
SlavedClientIpStore,
SlavedPresenceStore,
SlavedFilteringStore,
MonthlyActiveUsersWorkerStore,
MediaRepositoryStore,
Expand Down
50 changes: 0 additions & 50 deletions synapse/replication/slave/storage/presence.py

This file was deleted.

47 changes: 1 addition & 46 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import logging
from typing import List, Optional, Tuple

from synapse.api.constants import PresenceState
from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.stats import UserSortOrder
Expand Down Expand Up @@ -51,7 +50,7 @@
from .metrics import ServerMetricsStore
from .monthly_active_users import MonthlyActiveUsersStore
from .openid import OpenIdStore
from .presence import PresenceStore, UserPresenceState
from .presence import PresenceStore
from .profile import ProfileStore
from .purge_events import PurgeEventsStore
from .push_rule import PushRuleStore
Expand Down Expand Up @@ -126,9 +125,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._clock = hs.get_clock()
self.database_engine = database.engine

self._presence_id_gen = StreamIdGenerator(
db_conn, "presence_stream", "stream_id"
)
self._public_room_id_gen = StreamIdGenerator(
db_conn, "public_room_list_stream", "stream_id"
)
Expand Down Expand Up @@ -177,21 +173,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):

super().__init__(database, db_conn, hs)

self._presence_on_startup = self._get_active_presence(db_conn)

presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
db_conn,
"presence_stream",
entity_column="user_id",
stream_column="stream_id",
max_value=self._presence_id_gen.get_current_token(),
)
self.presence_stream_cache = StreamChangeCache(
"PresenceStreamChangeCache",
min_presence_val,
prefilled_cache=presence_cache_prefill,
)

device_list_max = self._device_list_id_gen.get_current_token()
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache", device_list_max
Expand Down Expand Up @@ -238,32 +219,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
def get_device_stream_token(self) -> int:
return self._device_list_id_gen.get_current_token()

def take_presence_startup_info(self):
active_on_startup = self._presence_on_startup
self._presence_on_startup = None
return active_on_startup

def _get_active_presence(self, db_conn):
"""Fetch non-offline presence from the database so that we can register
the appropriate time outs.
"""

sql = (
"SELECT user_id, state, last_active_ts, last_federation_update_ts,"
" last_user_sync_ts, status_msg, currently_active FROM presence_stream"
" WHERE state != ?"
)

txn = db_conn.cursor()
txn.execute(sql, (PresenceState.OFFLINE,))
rows = self.db_pool.cursor_to_dict(txn)
txn.close()

for row in rows:
row["currently_active"] = bool(row["currently_active"])

return [UserPresenceState(**row) for row in rows]

async def get_users(self) -> List[JsonDict]:
"""Function to retrieve a list of users in users table.

Expand Down
97 changes: 95 additions & 2 deletions synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,74 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, List, Tuple
from typing import TYPE_CHECKING, Dict, List, Tuple

from synapse.api.presence import UserPresenceState
from synapse.api.presence import PresenceState, UserPresenceState
from synapse.replication.tcp.streams import PresenceStream
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter

if TYPE_CHECKING:
from synapse.server import HomeServer


class PresenceStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
db_conn: Connection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)

self._can_persist_presence = (
hs.get_instance_name() in hs.config.worker.writers.presence
)

if isinstance(database.engine, PostgresEngine):
self._can_persist_presence = (
self._instance_name in hs.config.worker.writers.presence
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are two copies of this. does it need to be conditional?


self._presence_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
stream_name="presence_stream",
instance_name=self._instance_name,
tables=[("presence_stream", "instance_name", "stream_id")],
sequence_name="presence_stream_sequence",
writers=hs.config.worker.writers.to_device,
)
else:
self._can_persist_presence = True
self._presence_id_gen = StreamIdGenerator(
db_conn, "presence_stream", "stream_id"
)

self._presence_on_startup = self._get_active_presence(db_conn)

presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
db_conn,
"presence_stream",
entity_column="user_id",
stream_column="stream_id",
max_value=self._presence_id_gen.get_current_token(),
)
self.presence_stream_cache = StreamChangeCache(
"PresenceStreamChangeCache",
min_presence_val,
prefilled_cache=presence_cache_prefill,
)

async def update_presence(self, presence_states):
assert self._can_persist_presence

stream_ordering_manager = self._presence_id_gen.get_next_mult(
len(presence_states)
)
Expand Down Expand Up @@ -57,6 +115,7 @@ def _update_presence_txn(self, txn, stream_orderings, presence_states):
"last_user_sync_ts": state.last_user_sync_ts,
"status_msg": state.status_msg,
"currently_active": state.currently_active,
"instance_name": self._instance_name,
}
for stream_id, state in zip(stream_orderings, presence_states)
],
Expand Down Expand Up @@ -216,3 +275,37 @@ async def get_presence_for_all_users(

def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()

def _get_active_presence(self, db_conn: Connection):
"""Fetch non-offline presence from the database so that we can register
the appropriate time outs.
"""

sql = (
"SELECT user_id, state, last_active_ts, last_federation_update_ts,"
" last_user_sync_ts, status_msg, currently_active FROM presence_stream"
" WHERE state != ?"
)

txn = db_conn.cursor()
txn.execute(sql, (PresenceState.OFFLINE,))
rows = self.db_pool.cursor_to_dict(txn)
txn.close()

for row in rows:
row["currently_active"] = bool(row["currently_active"])

return [UserPresenceState(**row) for row in rows]

def take_presence_startup_info(self):
active_on_startup = self._presence_on_startup
self._presence_on_startup = None
return active_on_startup

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == PresenceStream.NAME:
self._presence_id_gen.advance(instance_name, token)
for row in rows:
self.presence_stream_cache.entity_has_changed(row.user_id, token)
self._get_presence_for_user.invalidate((row.user_id,))
return super().process_replication_rows(stream_name, instance_name, token, rows)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ALTER TABLE presence_stream ADD COLUMN instance_name TEXT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE SEQUENCE IF NOT EXISTS presence_stream_sequence;

SELECT setval('presence_stream_sequence', (
SELECT COALESCE(MAX(stream_id), 1) FROM presence_stream
));
Comment on lines +16 to +20
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to add this to the port script?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gah. I wonder if there is a way to stop us (read: me) from forgetting this.