Skip to content

Commit 0844d2b

Browse files
committed
Tinybird migration
1 parent c54abf9 commit 0844d2b

File tree

88 files changed

+9502
-1233
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+9502
-1233
lines changed

apps/web/app/(ee)/api/cron/framer/backfill-leads-batch/route.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,18 @@ export const POST = withWorkspace(async ({ req, workspace }) => {
300300
body: dataArray.map((d) => JSON.stringify(d.clickData)).join("\n"),
301301
},
302302
),
303+
// TODO: Remove after Tinybird migration
304+
fetch(
305+
`${process.env.TINYBIRD_API_URL}/v0/events?name=dub_click_events&wait=true`,
306+
{
307+
method: "POST",
308+
headers: {
309+
Authorization: `Bearer ${process.env.TINYBIRD_API_KEY_NEW}`,
310+
"Content-Type": "application/x-ndjson",
311+
},
312+
body: dataArray.map((d) => JSON.stringify(d.clickData)).join("\n"),
313+
},
314+
),
303315

304316
// Record leads
305317
recordLeadWithTimestamp(dataArray.map((d) => d.leadEventData)),

apps/web/lib/api/audit-logs/record-audit-log.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { getIP } from "@/lib/api/utils/get-ip";
2-
import { tb } from "@/lib/tinybird";
2+
import { tb, tbNew } from "@/lib/tinybird";
33
import { log } from "@dub/utils";
4-
import { ipAddress as getIPAddress } from "@vercel/functions";
4+
import { ipAddress as getIPAddress, waitUntil } from "@vercel/functions";
55
import { headers } from "next/headers";
66
import { z } from "zod";
77
import { createId } from "../create-id";
@@ -56,6 +56,7 @@ export const recordAuditLog = async (data: AuditLogInput | AuditLogInput[]) => {
5656
: [transformAuditLogTB(data, { headersList, ipAddress })];
5757

5858
try {
59+
waitUntil(recordAuditLogTBNew(auditLogs));
5960
await recordAuditLogTB(auditLogs);
6061
} catch (error) {
6162
console.error(
@@ -77,3 +78,10 @@ const recordAuditLogTB = tb.buildIngestEndpoint({
7778
event: auditLogSchemaTB,
7879
wait: true,
7980
});
81+
82+
// TODO: Remove after Tinybird migration
83+
const recordAuditLogTBNew = tbNew.buildIngestEndpoint({
84+
datasource: "dub_audit_logs",
85+
event: auditLogSchemaTB,
86+
wait: true,
87+
});

apps/web/lib/api/conversions/track-lead.ts

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { includeTags } from "@/lib/api/links/include-tags";
44
import { generateRandomName } from "@/lib/names";
55
import { createPartnerCommission } from "@/lib/partners/create-partner-commission";
66
import { isStored, storage } from "@/lib/storage";
7-
import { getClickEvent, recordLead, recordLeadSync } from "@/lib/tinybird";
7+
import { getClickEvent, recordLead } from "@/lib/tinybird";
88
import { logConversionEvent } from "@/lib/tinybird/log-conversion-events";
99
import { ClickEventTB, WebhookPartner, WorkspaceProps } from "@/lib/types";
1010
import { redis } from "@/lib/upstash";
@@ -214,9 +214,6 @@ export const trackLead = async ({
214214
: leadEventPayload;
215215

216216
await Promise.all([
217-
// Use recordLeadSync which waits for the operation to complete
218-
recordLeadSync(leadEventPayload),
219-
220217
// Cache the latest lead event for 5 minutes because the ingested event is not available immediately on Tinybird
221218
// we're setting two keys because we want to support the use case where the customer has multiple lead events
222219
redis.set(`leadCache:${customer.id}`, cacheLeadEventPayload, {
@@ -235,9 +232,8 @@ export const trackLead = async ({
235232

236233
waitUntil(
237234
(async () => {
238-
// for async mode, record the lead event in the background
239-
// for deferred mode, defer the lead event creation to a subsequent request
240-
if (mode === "async") {
235+
// for deferred mode, we defer the lead event creation to a subsequent request
236+
if (mode !== "deferred") {
241237
await recordLead(createLeadEventPayload(customer.id));
242238
}
243239

apps/web/lib/tinybird/client.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,9 @@ export const tb = new Tinybird({
44
token: process.env.TINYBIRD_API_KEY as string,
55
baseUrl: process.env.TINYBIRD_API_URL as string,
66
});
7+
8+
// TODO: Remove after Tinybird migration
9+
export const tbNew = new Tinybird({
10+
token: process.env.TINYBIRD_API_KEY_NEW as string,
11+
baseUrl: process.env.TINYBIRD_API_URL as string,
12+
});

apps/web/lib/tinybird/log-conversion-events.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
import { waitUntil } from "@vercel/functions";
12
import { z } from "zod";
2-
import { tb } from "./client";
3+
import { tb, tbNew } from "./client";
34

45
const schema = z.object({
56
workspace_id: z.string(),
@@ -10,7 +11,18 @@ const schema = z.object({
1011
});
1112

1213
// Log the conversion events for debugging purposes
13-
export const logConversionEvent = tb.buildIngestEndpoint({
14+
export const logConversionEventTB = tb.buildIngestEndpoint({
1415
datasource: "dub_conversion_events_log",
1516
event: schema,
1617
});
18+
19+
// TODO: Remove after Tinybird migration
20+
export const logConversionEventTBNew = tbNew.buildIngestEndpoint({
21+
datasource: "dub_conversion_events_log",
22+
event: schema,
23+
});
24+
25+
export const logConversionEvent = async (payload: any) => {
26+
waitUntil(logConversionEventTBNew(payload));
27+
return await logConversionEventTB(payload);
28+
};
Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,19 @@
1+
import { waitUntil } from "@vercel/functions";
12
import { importErrorLogSchema } from "../zod/schemas/import-error-log";
2-
import { tb } from "./client";
3+
import { tb, tbNew } from "./client";
34

4-
export const logImportError = tb.buildIngestEndpoint({
5+
export const logImportErrorTB = tb.buildIngestEndpoint({
56
datasource: "dub_import_error_logs",
67
event: importErrorLogSchema,
78
});
9+
10+
// TODO: Remove after Tinybird migration
11+
export const logImportErrorTBNew = tbNew.buildIngestEndpoint({
12+
datasource: "dub_import_error_logs",
13+
event: importErrorLogSchema,
14+
});
15+
16+
export const logImportError = async (payload: any) => {
17+
waitUntil(logImportErrorTBNew(payload));
18+
return await logImportErrorTB(payload);
19+
};

apps/web/lib/tinybird/record-click-zod.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
import { waitUntil } from "@vercel/functions";
12
import { z } from "zod";
2-
import { tb } from "./client";
3+
import { tb, tbNew } from "./client";
34

45
export const recordClickZodSchema = z.object({
56
timestamp: z.string().default(""),
@@ -33,8 +34,20 @@ export const recordClickZodSchema = z.object({
3334
trigger: z.string().default("link"),
3435
});
3536

36-
export const recordClickZod = tb.buildIngestEndpoint({
37+
export const recordClickZodTB = tb.buildIngestEndpoint({
3738
datasource: "dub_click_events",
3839
event: recordClickZodSchema,
3940
wait: true,
4041
});
42+
43+
// TODO: Remove after Tinybird migration
44+
export const recordClickZodTBNew = tbNew.buildIngestEndpoint({
45+
datasource: "dub_click_events",
46+
event: recordClickZodSchema,
47+
wait: true,
48+
});
49+
50+
export const recordClickZod = async (payload: any) => {
51+
waitUntil(recordClickZodTBNew(payload));
52+
return await recordClickZodTB(payload);
53+
};

apps/web/lib/tinybird/record-click.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,17 @@ export async function recordClick({
177177
body: JSON.stringify(clickData),
178178
},
179179
).then((res) => res.json()),
180+
// TODO: Remove after Tinybird migration
181+
fetchWithRetry(
182+
`${process.env.TINYBIRD_API_URL}/v0/events?name=dub_click_events&wait=true`,
183+
{
184+
method: "POST",
185+
headers: {
186+
Authorization: `Bearer ${process.env.TINYBIRD_API_KEY_NEW}`,
187+
},
188+
body: JSON.stringify(clickData),
189+
},
190+
).then((res) => res.json()),
180191

181192
// cache the recorded click for the corresponding IP address in Redis for 1 hour
182193
recordClickCache.set({ domain, key, ip, clickId }),
Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,39 @@
1+
import { waitUntil } from "@vercel/functions";
12
import { z } from "zod";
23
import { leadEventSchemaTB } from "../zod/schemas/leads";
3-
import { tb } from "./client";
4+
import { tb, tbNew } from "./client";
45

5-
export const recordLead = tb.buildIngestEndpoint({
6+
export const recordLeadTB = tb.buildIngestEndpoint({
67
datasource: "dub_lead_events",
78
event: leadEventSchemaTB,
89
});
910

10-
export const recordLeadSync = tb.buildIngestEndpoint({
11+
// TODO: Remove after Tinybird migration
12+
export const recordLeadTBNew = tbNew.buildIngestEndpoint({
1113
datasource: "dub_lead_events",
1214
event: leadEventSchemaTB,
13-
wait: true,
1415
});
1516

16-
export const recordLeadWithTimestamp = tb.buildIngestEndpoint({
17+
export const recordLead = async (payload: any) => {
18+
waitUntil(recordLeadTBNew(payload));
19+
return await recordLeadTB(payload);
20+
};
21+
22+
export const recordLeadWithTimestampTB = tb.buildIngestEndpoint({
23+
datasource: "dub_lead_events",
24+
event: leadEventSchemaTB.extend({
25+
timestamp: z.string(),
26+
}),
27+
});
28+
29+
export const recordLeadWithTimestampTBNew = tbNew.buildIngestEndpoint({
1730
datasource: "dub_lead_events",
1831
event: leadEventSchemaTB.extend({
1932
timestamp: z.string(),
2033
}),
2134
});
35+
36+
export const recordLeadWithTimestamp = async (payload: any) => {
37+
waitUntil(recordLeadWithTimestampTBNew(payload));
38+
return await recordLeadWithTimestampTB(payload);
39+
};

apps/web/lib/tinybird/record-link.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import z from "@/lib/zod";
2+
import { waitUntil } from "@vercel/functions";
23
import { ExpandedLink } from "../api/links";
34
import { decodeKeyIfCaseSensitive } from "../api/links/case-sensitivity";
45
import { prefixWorkspaceId } from "../api/workspaces/workspace-id";
5-
import { tb } from "./client";
6+
import { tb, tbNew } from "./client";
67

78
export const dubLinksMetadataSchema = z.object({
89
link_id: z.string(),
@@ -49,6 +50,13 @@ export const recordLinkTB = tb.buildIngestEndpoint({
4950
wait: true,
5051
});
5152

53+
// TODO: Remove after Tinybird migration
54+
export const recordLinkTBNew = tbNew.buildIngestEndpoint({
55+
datasource: "dub_links_metadata",
56+
event: dubLinksMetadataSchema,
57+
wait: true,
58+
});
59+
5260
export const transformLinkTB = (link: ExpandedLink) => {
5361
const key = decodeKeyIfCaseSensitive({
5462
domain: link.domain,
@@ -72,8 +80,10 @@ export const transformLinkTB = (link: ExpandedLink) => {
7280

7381
export const recordLink = async (payload: ExpandedLink | ExpandedLink[]) => {
7482
if (Array.isArray(payload)) {
83+
waitUntil(recordLinkTBNew(payload.map(transformLinkTB)));
7584
return await recordLinkTB(payload.map(transformLinkTB));
7685
} else {
86+
waitUntil(recordLinkTBNew(transformLinkTB(payload)));
7787
return await recordLinkTB(transformLinkTB(payload));
7888
}
7989
};

0 commit comments

Comments
 (0)