Skip to content

Commit 746e5fe

Browse files
authored
Merge pull request #2924 from dubinc/tb-migration
Tinybird migration
2 parents ce029d0 + 690fd63 commit 746e5fe

File tree

93 files changed

+9530
-1270
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+9530
-1270
lines changed

apps/web/app/(ee)/api/cron/cleanup/link-retention/route.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { handleAndReturnErrorResponse } from "@/lib/api/errors";
22
import { qstash } from "@/lib/cron";
33
import { verifyQstashSignature } from "@/lib/cron/verify-qstash";
44
import { verifyVercelSignature } from "@/lib/cron/verify-vercel";
5-
import { recordLinkTB, transformLinkTB } from "@/lib/tinybird";
5+
import { recordLink } from "@/lib/tinybird";
66
import { prisma } from "@dub/prisma";
77
import { APP_DOMAIN_WITH_NGROK } from "@dub/utils";
88
import { Domain } from "@prisma/client";
@@ -125,12 +125,7 @@ async function deleteOldLinks(
125125

126126
// // Record the links deletion in Tinybird
127127
// // not 100% sure if we need this yet, maybe we should just delete the link completely from TB to save space?
128-
await recordLinkTB(
129-
links.map((link) => ({
130-
...transformLinkTB(link),
131-
deleted: true,
132-
})),
133-
);
128+
await recordLink(links, { deleted: true });
134129

135130
console.log(
136131
`[Link retention cleanup] Deleted ${links.length} links for ${domain.slug} that are older than ${domain.linkRetentionDays} days!`,

apps/web/app/(ee)/api/cron/framer/backfill-leads-batch/route.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,18 @@ export const POST = withWorkspace(async ({ req, workspace }) => {
300300
body: dataArray.map((d) => JSON.stringify(d.clickData)).join("\n"),
301301
},
302302
),
303+
// TODO: Remove after Tinybird migration
304+
fetch(
305+
`${process.env.TINYBIRD_API_URL}/v0/events?name=dub_click_events&wait=true`,
306+
{
307+
method: "POST",
308+
headers: {
309+
Authorization: `Bearer ${process.env.TINYBIRD_API_KEY_NEW}`,
310+
"Content-Type": "application/x-ndjson",
311+
},
312+
body: dataArray.map((d) => JSON.stringify(d.clickData)).join("\n"),
313+
},
314+
),
303315

304316
// Record leads
305317
recordLeadWithTimestamp(dataArray.map((d) => d.leadEventData)),

apps/web/lib/actions/partners/revoke-program-invite.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import { linkCache } from "@/lib/api/links/cache";
44
import { getDefaultProgramIdOrThrow } from "@/lib/api/programs/get-default-program-id-or-throw";
5-
import { recordLinkTB, transformLinkTB } from "@/lib/tinybird";
5+
import { recordLink } from "@/lib/tinybird";
66
import { prisma } from "@dub/prisma";
77
import { waitUntil } from "@vercel/functions";
88
import z from "../../zod";
@@ -72,12 +72,7 @@ export const revokeProgramInviteAction = authActionClient
7272
linkCache.expireMany(partnerLinks),
7373

7474
// Record the links deletion in Tinybird
75-
recordLinkTB(
76-
partnerLinks.map((link) => ({
77-
...transformLinkTB(link),
78-
deleted: true,
79-
})),
80-
),
75+
recordLink(partnerLinks, { deleted: true }),
8176

8277
// Update totalLinks for the workspace
8378
prisma.project.update({

apps/web/lib/api/audit-logs/record-audit-log.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { getIP } from "@/lib/api/utils/get-ip";
2-
import { tb } from "@/lib/tinybird";
2+
import { tb, tbNew } from "@/lib/tinybird";
33
import { log } from "@dub/utils";
4-
import { ipAddress as getIPAddress } from "@vercel/functions";
4+
import { ipAddress as getIPAddress, waitUntil } from "@vercel/functions";
55
import { headers } from "next/headers";
66
import { z } from "zod";
77
import { createId } from "../create-id";
@@ -56,6 +56,7 @@ export const recordAuditLog = async (data: AuditLogInput | AuditLogInput[]) => {
5656
: [transformAuditLogTB(data, { headersList, ipAddress })];
5757

5858
try {
59+
waitUntil(recordAuditLogTBNew(auditLogs));
5960
await recordAuditLogTB(auditLogs);
6061
} catch (error) {
6162
console.error(
@@ -77,3 +78,10 @@ const recordAuditLogTB = tb.buildIngestEndpoint({
7778
event: auditLogSchemaTB,
7879
wait: true,
7980
});
81+
82+
// TODO: Remove after Tinybird migration
83+
const recordAuditLogTBNew = tbNew.buildIngestEndpoint({
84+
datasource: "dub_audit_logs",
85+
event: auditLogSchemaTB,
86+
wait: true,
87+
});

apps/web/lib/api/conversions/track-lead.ts

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { includeTags } from "@/lib/api/links/include-tags";
44
import { generateRandomName } from "@/lib/names";
55
import { createPartnerCommission } from "@/lib/partners/create-partner-commission";
66
import { isStored, storage } from "@/lib/storage";
7-
import { getClickEvent, recordLead, recordLeadSync } from "@/lib/tinybird";
7+
import { getClickEvent, recordLead } from "@/lib/tinybird";
88
import { logConversionEvent } from "@/lib/tinybird/log-conversion-events";
99
import { ClickEventTB, WebhookPartner, WorkspaceProps } from "@/lib/types";
1010
import { redis } from "@/lib/upstash";
@@ -214,9 +214,6 @@ export const trackLead = async ({
214214
: leadEventPayload;
215215

216216
await Promise.all([
217-
// Use recordLeadSync which waits for the operation to complete
218-
recordLeadSync(leadEventPayload),
219-
220217
// Cache the latest lead event for 5 minutes because the ingested event is not available immediately on Tinybird
221218
// we're setting two keys because we want to support the use case where the customer has multiple lead events
222219
redis.set(`leadCache:${customer.id}`, cacheLeadEventPayload, {
@@ -235,9 +232,8 @@ export const trackLead = async ({
235232

236233
waitUntil(
237234
(async () => {
238-
// for async mode, record the lead event in the background
239-
// for deferred mode, defer the lead event creation to a subsequent request
240-
if (mode === "async") {
235+
// for deferred mode, we defer the lead event creation to a subsequent request
236+
if (mode !== "deferred") {
241237
await recordLead(createLeadEventPayload(customer.id));
242238
}
243239

apps/web/lib/api/links/bulk-delete-links.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { storage } from "@/lib/storage";
2-
import { recordLinkTB, transformLinkTB } from "@/lib/tinybird";
2+
import { recordLink } from "@/lib/tinybird";
33
import { prisma } from "@dub/prisma";
44
import { R2_URL } from "@dub/utils";
55
import { linkCache } from "./cache";
@@ -15,12 +15,7 @@ export async function bulkDeleteLinks(links: ExpandedLink[]) {
1515
linkCache.deleteMany(links),
1616

1717
// Record the links deletion in Tinybird
18-
recordLinkTB(
19-
links.map((link) => ({
20-
...transformLinkTB(link),
21-
deleted: true,
22-
})),
23-
),
18+
recordLink(links, { deleted: true }),
2419

2520
// For links that have an image, delete the image from R2
2621
links

apps/web/lib/api/links/delete-link.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { storage } from "@/lib/storage";
2-
import { recordLinkTB, transformLinkTB } from "@/lib/tinybird";
2+
import { recordLink } from "@/lib/tinybird";
33
import { prisma } from "@dub/prisma";
44
import { R2_URL } from "@dub/utils";
55
import { waitUntil } from "@vercel/functions";
@@ -30,10 +30,7 @@ export async function deleteLink(linkId: string) {
3030
linkCache.delete(link),
3131

3232
// Record link in the Tinybird
33-
recordLinkTB({
34-
...transformLinkTB(link),
35-
deleted: true,
36-
}),
33+
recordLink(link, { deleted: true }),
3734

3835
link.projectId &&
3936
prisma.project.update({

apps/web/lib/tinybird/client.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,9 @@ export const tb = new Tinybird({
44
token: process.env.TINYBIRD_API_KEY as string,
55
baseUrl: process.env.TINYBIRD_API_URL as string,
66
});
7+
8+
// TODO: Remove after Tinybird migration
9+
export const tbNew = new Tinybird({
10+
token: process.env.TINYBIRD_API_KEY_NEW as string,
11+
baseUrl: process.env.TINYBIRD_API_URL as string,
12+
});

apps/web/lib/tinybird/log-conversion-events.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
import { waitUntil } from "@vercel/functions";
12
import { z } from "zod";
2-
import { tb } from "./client";
3+
import { tb, tbNew } from "./client";
34

45
const schema = z.object({
56
workspace_id: z.string(),
@@ -10,7 +11,18 @@ const schema = z.object({
1011
});
1112

1213
// Log the conversion events for debugging purposes
13-
export const logConversionEvent = tb.buildIngestEndpoint({
14+
export const logConversionEventTB = tb.buildIngestEndpoint({
1415
datasource: "dub_conversion_events_log",
1516
event: schema,
1617
});
18+
19+
// TODO: Remove after Tinybird migration
20+
export const logConversionEventTBNew = tbNew.buildIngestEndpoint({
21+
datasource: "dub_conversion_events_log",
22+
event: schema,
23+
});
24+
25+
export const logConversionEvent = async (payload: any) => {
26+
waitUntil(logConversionEventTBNew(payload));
27+
return await logConversionEventTB(payload);
28+
};
Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,19 @@
1+
import { waitUntil } from "@vercel/functions";
12
import { importErrorLogSchema } from "../zod/schemas/import-error-log";
2-
import { tb } from "./client";
3+
import { tb, tbNew } from "./client";
34

4-
export const logImportError = tb.buildIngestEndpoint({
5+
export const logImportErrorTB = tb.buildIngestEndpoint({
56
datasource: "dub_import_error_logs",
67
event: importErrorLogSchema,
78
});
9+
10+
// TODO: Remove after Tinybird migration
11+
export const logImportErrorTBNew = tbNew.buildIngestEndpoint({
12+
datasource: "dub_import_error_logs",
13+
event: importErrorLogSchema,
14+
});
15+
16+
export const logImportError = async (payload: any) => {
17+
waitUntil(logImportErrorTBNew(payload));
18+
return await logImportErrorTB(payload);
19+
};

0 commit comments

Comments
 (0)