Skip to content

Commit ad28172

Browse files
authored
feat: support compact state type for links in message reconciliation (#2074)
## Motivation Currently message reconciliation skips over LinkCompactState messages when fetching from hubs, and depending on the logic of the caller of that reconciler, it may make the caller mark the message as deleted (and subsequently perform deletions on the follows). ## Change Summary This change alters the hub message fetch to also call this RPC when reconciling links. Important: users of this version will have to have the corresponding RPC from #2068. ## Merge Checklist _Choose all relevant options below by adding an `x` now or at any time before submitting for review_ - [x] PR title adheres to the [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) standard - [x] PR has a [changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets) - [x] PR has been tagged with a change label(s) (i.e. documentation, feature, bugfix, or chore) - [x] PR includes [documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs) if necessary. - [x] All [commits have been signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits) ## Additional Context If this is a relatively large or complex change, provide more details here that will help reviewers <!-- start pr-codex --> --- ## PR-Codex overview This PR focuses on enhancing link reconciliation in the `shuttle` package by calling the compact state RPC. ### Detailed summary - Added a new method to retrieve link compact state messages by FID - Updated imports in integration tests - Implemented a test for reconciling messages and flagging incorrectly deleted messages > ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your question}` <!-- end pr-codex -->
1 parent f25f133 commit ad28172

3 files changed

Lines changed: 128 additions & 2 deletions

File tree

.changeset/proud-crabs-teach.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@farcaster/shuttle": minor
3+
---
4+
5+
Link reconciliation now calls the compact state rpc as well

packages/shuttle/src/shuttle.integration.test.ts

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,19 @@
11
import { migrateToLatest } from "./example-app/db";
22
import { log } from "./log";
33
import { sql } from "kysely";
4-
import { Factories, HubEvent, HubEventType, Message } from "@farcaster/hub-nodejs";
4+
import {
5+
CallOptions,
6+
Factories,
7+
FidRequest,
8+
HubEvent,
9+
HubEventType,
10+
HubRpcClient,
11+
LinkCompactStateBody,
12+
Message,
13+
MessageType,
14+
MessagesResponse,
15+
Metadata,
16+
} from "@farcaster/hub-nodejs";
517
import {
618
RedisClient,
719
HubSubscriber,
@@ -11,7 +23,9 @@ import {
1123
StoreMessageOperation,
1224
HubEventProcessor,
1325
MessageState,
26+
MessageReconciliation,
1427
} from "./shuttle";
28+
import { ok } from "neverthrow";
1529

1630
let db: DB;
1731
let subscriber: FakeHubSubscriber;
@@ -193,6 +207,99 @@ describe("shuttle", () => {
193207
expect(removeMessageInDb.deletedAt).toBeNull();
194208
});
195209

210+
test("reconciler flags incorrectly deleted messages", async () => {
211+
const addMessage = await Factories.LinkAddMessage.create({}, { transient: { signer } });
212+
const targetFid = addMessage.data.linkBody.targetFid;
213+
expect(targetFid).toBeDefined();
214+
const compactMessage = await Factories.LinkCompactStateMessage.create(
215+
{
216+
data: {
217+
fid: addMessage.data.fid,
218+
linkCompactStateBody: {
219+
type: addMessage.data.linkBody.type,
220+
targetFids: [targetFid ?? 0],
221+
},
222+
},
223+
},
224+
{ transient: { signer } },
225+
);
226+
227+
await subscriber.processHubEvent(
228+
HubEvent.create({ id: 1, type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message: addMessage } }),
229+
);
230+
await subscriber.processHubEvent(
231+
HubEvent.create({
232+
id: 2,
233+
type: HubEventType.MERGE_MESSAGE,
234+
mergeMessageBody: { message: compactMessage },
235+
}),
236+
);
237+
238+
// set compact message to deleted:
239+
await db.updateTable("messages").where("hash", "=", compactMessage.hash).set({ deletedAt: new Date() }).execute();
240+
241+
// It's a hack, but mockito is not handling this well:
242+
const mockRPCClient = {
243+
getAllLinkMessagesByFid: async (_request: FidRequest, _metadata: Metadata, _options: Partial<CallOptions>) => {
244+
return ok(
245+
MessagesResponse.create({
246+
messages: [addMessage],
247+
nextPageToken: undefined,
248+
}),
249+
);
250+
},
251+
getLinkCompactStateMessageByFid: async (
252+
_request: FidRequest,
253+
_metadata: Metadata,
254+
_options: Partial<CallOptions>,
255+
) => {
256+
return ok(
257+
MessagesResponse.create({
258+
messages: [compactMessage],
259+
nextPageToken: undefined,
260+
}),
261+
);
262+
},
263+
};
264+
265+
const reconciler = new MessageReconciliation(mockRPCClient as unknown as HubRpcClient, db, log);
266+
const messagesOnHub: Message[] = [];
267+
const missingFromHub: boolean[] = [];
268+
const prunedInDb: boolean[] = [];
269+
const revokedInDb: boolean[] = [];
270+
const messagesInDb: {
271+
hash: Uint8Array;
272+
prunedAt: Date | null;
273+
revokedAt: Date | null;
274+
fid: number;
275+
type: MessageType;
276+
raw: Uint8Array;
277+
signer: Uint8Array;
278+
}[] = [];
279+
const missingFromDb: boolean[] = [];
280+
const a = await reconciler.reconcileMessagesOfTypeForFid(
281+
addMessage.data.fid,
282+
MessageType.LINK_ADD,
283+
async (msg, missing, pruned, revoked) => {
284+
messagesOnHub.push(msg);
285+
missingFromHub.push(missing);
286+
prunedInDb.push(pruned);
287+
revokedInDb.push(revoked);
288+
},
289+
async (dbMsg, missing) => {
290+
messagesInDb.push(dbMsg);
291+
missingFromDb.push(missing);
292+
},
293+
);
294+
295+
expect(messagesOnHub.length).toBe(2);
296+
expect(messagesInDb.length).toBe(1);
297+
expect(missingFromHub).toMatchObject([false, false]);
298+
expect(prunedInDb).toMatchObject([false, false]);
299+
expect(revokedInDb).toMatchObject([false, false]);
300+
expect(missingFromDb).toMatchObject([false]);
301+
});
302+
196303
test("marks messages as pruned", async () => {
197304
const addMessage = await Factories.ReactionAddMessage.create({}, { transient: { signer } });
198305
subscriber.addMessageCallback((msg, operation, state, isNew, wasMissed) => {
@@ -272,7 +379,7 @@ describe("shuttle", () => {
272379
.select(["hash", "body"])
273380
.where("hash", "=", message.hash)
274381
.executeTakeFirstOrThrow();
275-
expect(res.body.targetFids).toEqual([1, 2, 3]);
382+
expect((res.body as LinkCompactStateBody).targetFids).toEqual([1, 2, 3]);
276383
});
277384
});
278385
});

packages/shuttle/src/shuttle/messageReconciliation.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,20 @@ export class MessageReconciliation {
174174
if (!pageToken?.length) break;
175175
result = await this.client.getAllLinkMessagesByFid({ pageSize, pageToken, fid });
176176
}
177+
178+
let deltaResult = await this.client.getLinkCompactStateMessageByFid({ fid, pageSize });
179+
for (;;) {
180+
if (deltaResult.isErr()) {
181+
throw new Error(`Unable to get all link compact results for FID ${fid}: ${deltaResult.error?.message}`);
182+
}
183+
184+
const { messages, nextPageToken: pageToken } = deltaResult.value;
185+
186+
yield messages;
187+
188+
if (!pageToken?.length) break;
189+
deltaResult = await this.client.getLinkCompactStateMessageByFid({ pageSize, pageToken, fid });
190+
}
177191
}
178192

179193
private async *getAllVerificationMessagesByFidInBatchesOf(fid: number, pageSize: number) {

0 commit comments

Comments
 (0)