Skip to content

Commit 80fd68b

Browse files
Feature/sdk reporting (#316)
* emiiter engine * emitter changes * chnages based on feedback * refactoring and some renaming * controller addition handler * type fix * dev packages * forgot lockfile * reverted dev packages * lockfile dammit * testing * test emitter * test * added emiiter to socket route * fixed binding * removed log * report storage initial * sdk scape * reworked the emitter engine * alterations to types eventemitter * list events chnage * changed document type mongo * changed document type mongo * changed document type mongo * changed document type mongo * chnaged date imps events * chnaged jwt exp * ffs * dev packages are desynced * test disconnect event * disconnect event * removed accidental key in event * changes to update logic sdk * chnageste bump * dev packages issue * dev packages * chnages to mongo storage and package bumps * unset disconnect on connect * change document sdk * deisconnect chnages * connection query * chnaged query for sdk data * removed the disconnect time * chnaged disconnect report to use userdata * fix * chnaged sdk scrape and added sdk delete event * change disconnect mongo update * wip * wip * create new doc only if older than a day * wip * wip * duh * scrape data * wip * timespan logging * fix day query bug * remove facet * dates * wip * wip * unique sdks * wip * sdk * sdk * wip * wip * list connections and some helpers * corrections timeframes * testing * scrapeSdk interval * change to response * change to response * testing date ranges * custom start range list connections * start date * custom date validation * date start validation * testing * test * refactoring * date bug * fixes krep in bugs * added seperate filter for delete * indexes * check document scan result * remove explain * end date to list connections * removed a log * postgres imp WIP * tests * timestamp casting * casting issues * fix incorrect type * iso issues * testing * testing * transaction * added logs * pg-wire frustrations * pg-wire frustrations * pg-wire frustrations * pg-wire frustrations * pg-wire frustrations * pg-wire frustrations * pg-wire frustrations * pg-wire frustrations * change the sql query to test * testing * testing * testing * testing * ts-codecs * ts-codecs * ts-codecs * ts-codecs * ts-codecs * ts-codecs * fixed sdk scrape query * checking delete old data * checking delete old data * deleted row logging * seperated migration for sdk * seperated migration for sdk * added redundancy for events * better log * oops * clean up * fixing a bug * fixing a bug * removed query * removed query * removed query * trying to simplify the update * trying to simplify the update * tests tests * postgres tests WIP * removed command * application name conflict report storage * checking concurrency issues in tests * test chnages to tests * completed postgres tests * mongo tests report * seperated the migrations for sdk, problems testing * okay then * mongo db tests report storage * changed the disconnect report to use connect_at time for long running connections * fixed connected at filter * cclean up some code for pr * version bump * chnages to date ranges on scrapes, simplified * fixed list current connections payload * fix migration indexes on down * model document naming * moved abstarct methods to the top * PR review chnages * PR review changes * emitter PR chnages * PR chnages * PR changes as requested * changed names to the agreed naming convention * chnages table names to match naming conventions * fixed tests * removed index drop migrations * removed unused type * changed date query * added comments * refactored test utils to its own file in postgres and mongo * oops * some fixes * better documentation * removed date range from get connected clients * refactored tests * date formats * removed as utc * DateTime chnages to postgres driver * added forced UTC for connection update * removed as utc * moved migration * renamed migration * if not exists to connecction migration * mock event engine to tests * mock token payload * Merge main * removed errand import merged from main * PR changes * removed duplicated type * jpgwire * jpgwire * removed jpgwire comment
1 parent 0e9aa94 commit 80fd68b

File tree

59 files changed

+1915
-161
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1915
-161
lines changed

.changeset/heavy-pianos-grin.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-module-mongodb-storage': patch
3+
'@powersync/service-core': patch
4+
'@powersync/service-types': patch
5+
---
6+
7+
sdk reporting

.changeset/honest-sloths-smoke.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
'@powersync/service-module-mongodb-storage': patch
4+
'@powersync/service-core': patch
5+
'@powersync/service-types': patch
6+
---
7+
8+
Added sdk reporting to storage

.changeset/smart-mugs-share.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
'@powersync/service-module-mongodb-storage': patch
4+
'@powersync/service-module-postgres': patch
5+
'@powersync/service-module-mongodb': patch
6+
'@powersync/service-core': patch
7+
'@powersync/service-module-mysql': patch
8+
'@powersync/service-module-core': patch
9+
'@powersync/lib-services-framework': patch
10+
'@powersync/service-types': patch
11+
---
12+
13+
Reporting mongo storage added to storage engine.

libs/lib-services/src/container.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import { ErrorReporter } from './alerts/definitions.js';
44
import { NoOpReporter } from './alerts/no-op-reporter.js';
55
import { MigrationManager } from './migrations/MigrationManager.js';
66
import {
7-
ProbeModule,
8-
TerminationHandler,
97
createInMemoryProbe,
10-
createTerminationHandler
8+
createTerminationHandler,
9+
ProbeModule,
10+
TerminationHandler
1111
} from './signals/signals-index.js';
1212

1313
export enum ContainerImplementation {
@@ -47,7 +47,6 @@ export type Newable<T> = new (...args: never[]) => T;
4747
* Identifier used to get and register implementations
4848
*/
4949
export type ServiceIdentifier<T = unknown> = string | symbol | Newable<T> | Abstract<T> | ContainerImplementation;
50-
5150
const DEFAULT_GENERATORS: ContainerImplementationDefaultGenerators = {
5251
[ContainerImplementation.REPORTER]: () => NoOpReporter,
5352
[ContainerImplementation.PROBES]: () => createInMemoryProbe(),

modules/module-mongodb-storage/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ export * as storage from './storage/storage-index.js';
55

66
export * from './types/types.js';
77
export * as types from './types/types.js';
8+
export * as utils from './utils/utils-index.js';
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { migrations } from '@powersync/service-core';
2+
import * as storage from '../../../storage/storage-index.js';
3+
import { MongoStorageConfig } from '../../../types/types.js';
4+
5+
export const up: migrations.PowerSyncMigrationFunction = async (context) => {
6+
const {
7+
service_context: { configuration }
8+
} = context;
9+
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);
10+
11+
try {
12+
await db.createConnectionReportingCollection();
13+
14+
await db.connection_report_events.createIndex(
15+
{
16+
connected_at: 1,
17+
jwt_exp: 1,
18+
disconnected_at: 1
19+
},
20+
{ name: 'connection_list_index' }
21+
);
22+
23+
await db.connection_report_events.createIndex(
24+
{
25+
user_id: 1
26+
},
27+
{ name: 'connection_user_id_index' }
28+
);
29+
await db.connection_report_events.createIndex(
30+
{
31+
client_id: 1
32+
},
33+
{ name: 'connection_client_id_index' }
34+
);
35+
await db.connection_report_events.createIndex(
36+
{
37+
sdk: 1
38+
},
39+
{ name: 'connection_index' }
40+
);
41+
} finally {
42+
await db.client.close();
43+
}
44+
};
45+
46+
export const down: migrations.PowerSyncMigrationFunction = async (context) => {
47+
const {
48+
service_context: { configuration }
49+
} = context;
50+
51+
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);
52+
53+
try {
54+
await db.db.dropCollection('connection_report_events');
55+
} finally {
56+
await db.client.close();
57+
}
58+
};

modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { PowerSyncMongo } from './implementation/db.js';
1212
import { SyncRuleDocument } from './implementation/models.js';
1313
import { MongoPersistedSyncRulesContent } from './implementation/MongoPersistedSyncRulesContent.js';
1414
import { MongoSyncBucketStorage, MongoSyncBucketStorageOptions } from './implementation/MongoSyncBucketStorage.js';
15-
import { generateSlotName } from './implementation/util.js';
15+
import { generateSlotName } from '../utils/util.js';
1616

1717
export class MongoBucketStorage
1818
extends BaseObserver<storage.BucketStorageFactoryListener>
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
import { storage } from '@powersync/service-core';
2+
import { event_types } from '@powersync/service-types';
3+
import { PowerSyncMongo } from './implementation/db.js';
4+
import { logger } from '@powersync/lib-services-framework';
5+
6+
export class MongoReportStorage implements storage.ReportStorage {
7+
public readonly db: PowerSyncMongo;
8+
9+
constructor(db: PowerSyncMongo) {
10+
this.db = db;
11+
}
12+
async deleteOldConnectionData(data: event_types.DeleteOldConnectionData): Promise<void> {
13+
const { date } = data;
14+
const result = await this.db.connection_report_events.deleteMany({
15+
connected_at: { $lt: date },
16+
$or: [
17+
{ disconnected_at: { $exists: true } },
18+
{ jwt_exp: { $lt: new Date() }, disconnected_at: { $exists: false } }
19+
]
20+
});
21+
if (result.deletedCount > 0) {
22+
logger.info(
23+
`TTL from ${date.toISOString()}: ${result.deletedCount} MongoDB documents have been removed from connection_report_events.`
24+
);
25+
}
26+
}
27+
28+
async getClientConnectionReports(
29+
data: event_types.ClientConnectionReportRequest
30+
): Promise<event_types.ClientConnectionReportResponse> {
31+
const { start, end } = data;
32+
const result = await this.db.connection_report_events
33+
.aggregate<event_types.ClientConnectionReportResponse>([
34+
{
35+
$match: {
36+
connected_at: { $lte: end, $gte: start }
37+
}
38+
},
39+
this.connectionsFacetPipeline(),
40+
this.connectionsProjectPipeline()
41+
])
42+
.toArray();
43+
return result[0];
44+
}
45+
46+
async reportClientConnection(data: event_types.ClientConnectionBucketData): Promise<void> {
47+
const updateFilter = this.updateDocFilter(data.user_id, data.client_id!);
48+
await this.db.connection_report_events.findOneAndUpdate(
49+
updateFilter,
50+
{
51+
$set: data,
52+
$unset: {
53+
disconnected_at: ''
54+
}
55+
},
56+
{
57+
upsert: true
58+
}
59+
);
60+
}
61+
async reportClientDisconnection(data: event_types.ClientDisconnectionEventData): Promise<void> {
62+
const { connected_at, user_id, client_id } = data;
63+
await this.db.connection_report_events.findOneAndUpdate(
64+
{
65+
client_id,
66+
user_id,
67+
connected_at
68+
},
69+
{
70+
$set: {
71+
disconnected_at: data.disconnected_at
72+
},
73+
$unset: {
74+
jwt_exp: ''
75+
}
76+
}
77+
);
78+
}
79+
async getConnectedClients(): Promise<event_types.ClientConnectionReportResponse> {
80+
const result = await this.db.connection_report_events
81+
.aggregate<event_types.ClientConnectionReportResponse>([
82+
{
83+
$match: {
84+
disconnected_at: { $exists: false },
85+
jwt_exp: { $gt: new Date() }
86+
}
87+
},
88+
this.connectionsFacetPipeline(),
89+
this.connectionsProjectPipeline()
90+
])
91+
.toArray();
92+
return result[0];
93+
}
94+
95+
async [Symbol.asyncDispose]() {
96+
// No-op
97+
}
98+
99+
private parseJsDate(date: Date) {
100+
const year = date.getUTCFullYear();
101+
const month = date.getUTCMonth();
102+
const today = date.getUTCDate();
103+
const day = date.getUTCDay();
104+
return {
105+
year,
106+
month,
107+
today,
108+
day,
109+
parsedDate: date
110+
};
111+
}
112+
113+
private connectionsFacetPipeline() {
114+
return {
115+
$facet: {
116+
unique_users: [
117+
{
118+
$group: {
119+
_id: '$user_id'
120+
}
121+
},
122+
{
123+
$count: 'count'
124+
}
125+
],
126+
sdk_versions_array: [
127+
{
128+
$group: {
129+
_id: '$sdk',
130+
total: { $sum: 1 },
131+
client_ids: { $addToSet: '$client_id' },
132+
user_ids: { $addToSet: '$user_id' }
133+
}
134+
},
135+
{
136+
$project: {
137+
_id: 0,
138+
sdk: '$_id',
139+
users: { $size: '$user_ids' },
140+
clients: { $size: '$client_ids' }
141+
}
142+
},
143+
{
144+
$sort: {
145+
sdk: 1
146+
}
147+
}
148+
]
149+
}
150+
};
151+
}
152+
153+
private connectionsProjectPipeline() {
154+
return {
155+
$project: {
156+
users: { $ifNull: [{ $arrayElemAt: ['$unique_users.count', 0] }, 0] },
157+
sdks: '$sdk_versions_array'
158+
}
159+
};
160+
}
161+
162+
private updateDocFilter(userId: string, clientId: string) {
163+
const { year, month, today } = this.parseJsDate(new Date());
164+
const nextDay = today + 1;
165+
return {
166+
user_id: userId,
167+
client_id: clientId,
168+
connected_at: {
169+
$gte: new Date(Date.UTC(year, month, today)),
170+
$lt: new Date(Date.UTC(year, month, nextDay))
171+
}
172+
};
173+
}
174+
}

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import { MongoIdSequence } from './MongoIdSequence.js';
2828
import { batchCreateCustomWriteCheckpoints } from './MongoWriteCheckpointAPI.js';
2929
import { cacheKey, OperationBatch, RecordOperation } from './OperationBatch.js';
3030
import { PersistedBatch } from './PersistedBatch.js';
31-
import { idPrefixFilter } from './util.js';
31+
import { idPrefixFilter } from '../../utils/util.js';
3232

3333
/**
3434
* 15MB

modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import { POWERSYNC_VERSION, storage } from '@powersync/service-core';
44
import { MongoStorageConfig } from '../../types/types.js';
55
import { MongoBucketStorage } from '../MongoBucketStorage.js';
66
import { PowerSyncMongo } from './db.js';
7+
import { MongoReportStorage } from '../MongoReportStorage.js';
78

8-
export class MongoStorageProvider implements storage.BucketStorageProvider {
9+
export class MongoStorageProvider implements storage.StorageProvider {
910
get type() {
1011
return lib_mongo.MONGO_CONNECTION_TYPE;
1112
}
@@ -37,15 +38,19 @@ export class MongoStorageProvider implements storage.BucketStorageProvider {
3738
await client.connect();
3839

3940
const database = new PowerSyncMongo(client, { database: resolvedConfig.storage.database });
40-
const factory = new MongoBucketStorage(database, {
41+
const syncStorageFactory = new MongoBucketStorage(database, {
4142
// TODO currently need the entire resolved config due to this
4243
slot_name_prefix: resolvedConfig.slot_name_prefix
4344
});
45+
46+
// Storage factory for reports
47+
const reportStorageFactory = new MongoReportStorage(database);
4448
return {
45-
storage: factory,
49+
storage: syncStorageFactory,
50+
reportStorage: reportStorageFactory,
4651
shutDown: async () => {
4752
shuttingDown = true;
48-
await factory[Symbol.asyncDispose]();
53+
await syncStorageFactory[Symbol.asyncDispose]();
4954
await client.close();
5055
},
5156
tearDown: () => {

0 commit comments

Comments
 (0)