Skip to content

Commit 8101f65

Browse files
feat: add static location and live location support (#1423)
1 parent 71e4715 commit 8101f65

22 files changed

+2203
-146
lines changed

src/LiveLocationManager.ts

Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
/**
2+
* RULES:
3+
*
4+
* 1. one loc-sharing message per channel per user
5+
* 2. live location is intended to be per device
6+
* but created_by_device_id has currently no checks,
7+
* and user can update the location from another device
8+
* thus making location sharing based on user and channel
9+
*/
10+
11+
import { withCancellation } from './utils/concurrency';
12+
import { StateStore } from './store';
13+
import { WithSubscriptions } from './utils/WithSubscriptions';
14+
import type { StreamChat } from './client';
15+
import type { Unsubscribe } from './store';
16+
import type {
17+
EventTypes,
18+
MessageResponse,
19+
SharedLiveLocationResponse,
20+
SharedLocationResponse,
21+
} from './types';
22+
import type { Coords } from './messageComposer';
23+
24+
export type WatchLocationHandler = (value: Coords) => void;
25+
export type WatchLocation = (handler: WatchLocationHandler) => Unsubscribe;
26+
type DeviceIdGenerator = () => string;
27+
type MessageId = string;
28+
29+
export type ScheduledLiveLocationSharing = SharedLiveLocationResponse & {
30+
stopSharingTimeout: ReturnType<typeof setTimeout> | null;
31+
};
32+
33+
export type LiveLocationManagerState = {
34+
ready: boolean;
35+
messages: Map<MessageId, ScheduledLiveLocationSharing>;
36+
};
37+
38+
const isExpiredLocation = (location: SharedLiveLocationResponse) => {
39+
const endTimeTimestamp = new Date(location.end_at).getTime();
40+
41+
return endTimeTimestamp < Date.now();
42+
};
43+
44+
function isValidLiveLocationMessage(
45+
message?: MessageResponse,
46+
): message is MessageResponse & { shared_location: SharedLiveLocationResponse } {
47+
if (!message || message.type === 'deleted' || !message.shared_location?.end_at)
48+
return false;
49+
50+
return !isExpiredLocation(message.shared_location as SharedLiveLocationResponse);
51+
}
52+
53+
export type LiveLocationManagerConstructorParameters = {
54+
client: StreamChat;
55+
getDeviceId: DeviceIdGenerator;
56+
watchLocation: WatchLocation;
57+
};
58+
59+
// Hard-coded minimal throttle timeout
60+
export const UPDATE_LIVE_LOCATION_REQUEST_MIN_THROTTLE_TIMEOUT = 3000;
61+
62+
export class LiveLocationManager extends WithSubscriptions {
63+
public state: StateStore<LiveLocationManagerState>;
64+
private client: StreamChat;
65+
private getDeviceId: DeviceIdGenerator;
66+
private _deviceId: string;
67+
private watchLocation: WatchLocation;
68+
69+
static symbol = Symbol(LiveLocationManager.name);
70+
71+
constructor({
72+
client,
73+
getDeviceId,
74+
watchLocation,
75+
}: LiveLocationManagerConstructorParameters) {
76+
if (!client.userID) {
77+
throw new Error('Live-location sharing is reserved for client-side use only');
78+
}
79+
80+
super();
81+
82+
this.client = client;
83+
this.state = new StateStore<LiveLocationManagerState>({
84+
messages: new Map(),
85+
ready: false,
86+
});
87+
this._deviceId = getDeviceId();
88+
this.getDeviceId = getDeviceId;
89+
this.watchLocation = watchLocation;
90+
}
91+
92+
public async init() {
93+
await this.assureStateInit();
94+
this.registerSubscriptions();
95+
}
96+
97+
public registerSubscriptions = () => {
98+
this.incrementRefCount();
99+
if (this.hasSubscriptions) return;
100+
101+
this.addUnsubscribeFunction(this.subscribeLiveLocationSharingUpdates());
102+
this.addUnsubscribeFunction(this.subscribeTargetMessagesChange());
103+
};
104+
105+
public unregisterSubscriptions = () => super.unregisterSubscriptions();
106+
107+
get messages() {
108+
return this.state.getLatestValue().messages;
109+
}
110+
111+
get stateIsReady() {
112+
return this.state.getLatestValue().ready;
113+
}
114+
115+
get deviceId() {
116+
if (!this._deviceId) {
117+
this._deviceId = this.getDeviceId();
118+
}
119+
return this._deviceId;
120+
}
121+
122+
private async assureStateInit() {
123+
if (this.stateIsReady) return;
124+
const { active_live_locations } = await this.client.getSharedLocations();
125+
this.state.next({
126+
messages: new Map(
127+
active_live_locations
128+
.filter((location) => !isExpiredLocation(location))
129+
.map((location) => [
130+
location.message_id,
131+
{
132+
...location,
133+
stopSharingTimeout: setTimeout(
134+
() => {
135+
this.unregisterMessages([location.message_id]);
136+
},
137+
new Date(location.end_at).getTime() - Date.now(),
138+
),
139+
},
140+
]),
141+
),
142+
ready: true,
143+
});
144+
}
145+
146+
private subscribeTargetMessagesChange() {
147+
let unsubscribeWatchLocation: null | (() => void) = null;
148+
149+
// Subscribe to location updates only if there are relevant messages to
150+
// update, no need for the location watcher to be active/instantiated otherwise
151+
const unsubscribe = this.state.subscribeWithSelector(
152+
({ messages }) => ({ messages }),
153+
({ messages }) => {
154+
if (!messages.size) {
155+
unsubscribeWatchLocation?.();
156+
unsubscribeWatchLocation = null;
157+
} else if (messages.size && !unsubscribeWatchLocation) {
158+
unsubscribeWatchLocation = this.subscribeWatchLocation();
159+
}
160+
},
161+
);
162+
163+
return () => {
164+
unsubscribe();
165+
unsubscribeWatchLocation?.();
166+
};
167+
}
168+
169+
private subscribeWatchLocation() {
170+
let nextAllowedUpdateCallTimestamp = Date.now();
171+
172+
const unsubscribe = this.watchLocation(({ latitude, longitude }) => {
173+
// Integrators can adjust the update interval by supplying custom watchLocation subscription,
174+
// but the minimal timeout still has to be set as a failsafe (to prevent rate-limitting)
175+
if (Date.now() < nextAllowedUpdateCallTimestamp) return;
176+
177+
nextAllowedUpdateCallTimestamp =
178+
Date.now() + UPDATE_LIVE_LOCATION_REQUEST_MIN_THROTTLE_TIMEOUT;
179+
180+
withCancellation(LiveLocationManager.symbol, async () => {
181+
const promises: Promise<SharedLocationResponse>[] = [];
182+
await this.assureStateInit();
183+
const expiredLocations: string[] = [];
184+
185+
for (const [messageId, location] of this.messages) {
186+
if (isExpiredLocation(location)) {
187+
expiredLocations.push(location.message_id);
188+
continue;
189+
}
190+
if (location.latitude === latitude && location.longitude === longitude)
191+
continue;
192+
const promise = this.client.updateLocation({
193+
created_by_device_id: location.created_by_device_id,
194+
message_id: messageId,
195+
latitude,
196+
longitude,
197+
});
198+
199+
promises.push(promise);
200+
}
201+
this.unregisterMessages(expiredLocations);
202+
if (promises.length > 0) {
203+
await Promise.allSettled(promises);
204+
}
205+
// TODO: handle values (remove failed - based on specific error code), keep re-trying others
206+
});
207+
});
208+
209+
return unsubscribe;
210+
}
211+
212+
private subscribeLiveLocationSharingUpdates() {
213+
/**
214+
* Both message.updated & live_location_sharing.stopped get emitted when message gets an
215+
* update, live_location_sharing.stopped gets emitted only locally and only if the update goes
216+
* through, it's a failsafe for when channel is no longer being watched for whatever reason
217+
*/
218+
const subscriptions = [
219+
...(
220+
[
221+
'live_location_sharing.started',
222+
'message.updated',
223+
'message.deleted',
224+
] as EventTypes[]
225+
).map((eventType) =>
226+
this.client.on(eventType, (event) => {
227+
if (!event.message) return;
228+
229+
if (event.type === 'live_location_sharing.started') {
230+
this.registerMessage(event.message);
231+
} else if (event.type === 'message.updated') {
232+
const isRegistered = this.messages.has(event.message.id);
233+
if (isRegistered && !isValidLiveLocationMessage(event.message)) {
234+
this.unregisterMessages([event.message.id]);
235+
}
236+
this.registerMessage(event.message);
237+
} else {
238+
this.unregisterMessages([event.message.id]);
239+
}
240+
}),
241+
),
242+
this.client.on('live_location_sharing.stopped', (event) => {
243+
if (!event.live_location) return;
244+
245+
this.unregisterMessages([event.live_location?.message_id]);
246+
}),
247+
];
248+
249+
return () => subscriptions.forEach((subscription) => subscription.unsubscribe());
250+
}
251+
252+
private registerMessage(message: MessageResponse) {
253+
if (
254+
!this.client.userID ||
255+
message?.user?.id !== this.client.userID ||
256+
!isValidLiveLocationMessage(message)
257+
)
258+
return;
259+
260+
this.state.next((currentValue) => {
261+
const messages = new Map(currentValue.messages);
262+
messages.set(message.id, {
263+
...message.shared_location,
264+
stopSharingTimeout: setTimeout(
265+
() => {
266+
this.unregisterMessages([message.id]);
267+
},
268+
new Date(message.shared_location.end_at).getTime() - Date.now(),
269+
),
270+
});
271+
return {
272+
...currentValue,
273+
messages,
274+
};
275+
});
276+
}
277+
278+
private unregisterMessages(messageIds: string[]) {
279+
const messages = this.messages;
280+
const removedMessages = new Set(messageIds);
281+
const newMessages = new Map(
282+
Array.from(messages).filter(([messageId, location]) => {
283+
if (removedMessages.has(messageId) && location.stopSharingTimeout) {
284+
clearTimeout(location.stopSharingTimeout);
285+
location.stopSharingTimeout = null;
286+
}
287+
return !removedMessages.has(messageId);
288+
}),
289+
);
290+
291+
if (newMessages.size === messages.size) return;
292+
293+
this.state.partialNext({
294+
messages: newMessages,
295+
});
296+
}
297+
}

src/channel.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import type {
3131
GetMultipleMessagesAPIResponse,
3232
GetReactionsAPIResponse,
3333
GetRepliesAPIResponse,
34+
LiveLocationPayload,
3435
LocalMessage,
3536
MarkReadOptions,
3637
MarkUnreadOptions,
@@ -63,10 +64,12 @@ import type {
6364
SendMessageAPIResponse,
6465
SendMessageOptions,
6566
SendReactionOptions,
67+
StaticLocationPayload,
6668
TruncateChannelAPIResponse,
6769
TruncateOptions,
6870
UpdateChannelAPIResponse,
6971
UpdateChannelOptions,
72+
UpdateLocationPayload,
7073
UserResponse,
7174
} from './types';
7275
import type { Role } from './permissions';
@@ -669,6 +672,37 @@ export class Channel {
669672
return data;
670673
}
671674

675+
public async sendSharedLocation(
676+
location: StaticLocationPayload | LiveLocationPayload,
677+
userId?: string,
678+
) {
679+
const result = await this.sendMessage({
680+
id: location.message_id,
681+
shared_location: location,
682+
user: userId ? { id: userId } : undefined,
683+
});
684+
685+
if ((location as LiveLocationPayload).end_at) {
686+
this.getClient().dispatchEvent({
687+
message: result.message,
688+
type: 'live_location_sharing.started',
689+
});
690+
}
691+
692+
return result;
693+
}
694+
695+
public async stopLiveLocationSharing(payload: UpdateLocationPayload) {
696+
const location = await this.getClient().updateLocation({
697+
...payload,
698+
end_at: new Date().toISOString(),
699+
});
700+
this.getClient().dispatchEvent({
701+
live_location: location,
702+
type: 'live_location_sharing.stopped',
703+
});
704+
}
705+
672706
/**
673707
* delete - Delete the channel. Messages are permanently removed.
674708
*

src/client.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ import type {
191191
SegmentTargetsResponse,
192192
SegmentType,
193193
SendFileAPIResponse,
194-
SharedLocationRequest,
195194
SharedLocationResponse,
196195
SortParam,
197196
StreamChatOptions,
@@ -209,6 +208,7 @@ import type {
209208
UpdateChannelTypeResponse,
210209
UpdateCommandOptions,
211210
UpdateCommandResponse,
211+
UpdateLocationPayload,
212212
UpdateMessageAPIResponse,
213213
UpdateMessageOptions,
214214
UpdatePollAPIResponse,
@@ -4593,11 +4593,11 @@ export class StreamChat {
45934593
/**
45944594
* updateLocation - Updates a location
45954595
*
4596-
* @param location UserLocation the location data to update
4596+
* @param location SharedLocationRequest the location data to update
45974597
*
4598-
* @returns {Promise<APIResponse>} The server response
4598+
* @returns {Promise<SharedLocationResponse>} The server response
45994599
*/
4600-
async updateLocation(location: SharedLocationRequest) {
4600+
async updateLocation(location: UpdateLocationPayload) {
46014601
return await this.put<SharedLocationResponse>(
46024602
this.baseURL + `/users/live_locations`,
46034603
location,

src/events.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ export const EVENT_MAP = {
6363
'connection.recovered': true,
6464
'transport.changed': true,
6565
'capabilities.changed': true,
66+
'live_location_sharing.started': true,
67+
'live_location_sharing.stopped': true,
6668

6769
// Reminder events
6870
'reminder.created': true,

0 commit comments

Comments
 (0)