Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions apps/web/app/(ee)/api/cron/cleanup/link-retention/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { handleAndReturnErrorResponse } from "@/lib/api/errors";
import { qstash } from "@/lib/cron";
import { verifyQstashSignature } from "@/lib/cron/verify-qstash";
import { verifyVercelSignature } from "@/lib/cron/verify-vercel";
import { recordLinkTB, transformLinkTB } from "@/lib/tinybird";
import { recordLink } from "@/lib/tinybird";
import { prisma } from "@dub/prisma";
import { APP_DOMAIN_WITH_NGROK } from "@dub/utils";
import { Domain } from "@prisma/client";
Expand Down Expand Up @@ -125,12 +125,7 @@ async function deleteOldLinks(

// // Record the links deletion in Tinybird
// // not 100% sure if we need this yet, maybe we should just delete the link completely from TB to save space?
await recordLinkTB(
links.map((link) => ({
...transformLinkTB(link),
deleted: true,
})),
);
await recordLink(links, { deleted: true });

console.log(
`[Link retention cleanup] Deleted ${links.length} links for ${domain.slug} that are older than ${domain.linkRetentionDays} days!`,
Expand Down
12 changes: 12 additions & 0 deletions apps/web/app/(ee)/api/cron/framer/backfill-leads-batch/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,18 @@ export const POST = withWorkspace(async ({ req, workspace }) => {
body: dataArray.map((d) => JSON.stringify(d.clickData)).join("\n"),
},
),
// TODO: Remove after Tinybird migration
fetch(
`${process.env.TINYBIRD_API_URL}/v0/events?name=dub_click_events&wait=true`,
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.TINYBIRD_API_KEY_NEW}`,
"Content-Type": "application/x-ndjson",
},
body: dataArray.map((d) => JSON.stringify(d.clickData)).join("\n"),
},
),

// Record leads
recordLeadWithTimestamp(dataArray.map((d) => d.leadEventData)),
Expand Down
9 changes: 2 additions & 7 deletions apps/web/lib/actions/partners/revoke-program-invite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import { linkCache } from "@/lib/api/links/cache";
import { getDefaultProgramIdOrThrow } from "@/lib/api/programs/get-default-program-id-or-throw";
import { recordLinkTB, transformLinkTB } from "@/lib/tinybird";
import { recordLink } from "@/lib/tinybird";
import { prisma } from "@dub/prisma";
import { waitUntil } from "@vercel/functions";
import z from "../../zod";
Expand Down Expand Up @@ -72,12 +72,7 @@ export const revokeProgramInviteAction = authActionClient
linkCache.expireMany(partnerLinks),

// Record the links deletion in Tinybird
recordLinkTB(
partnerLinks.map((link) => ({
...transformLinkTB(link),
deleted: true,
})),
),
recordLink(partnerLinks, { deleted: true }),

// Update totalLinks for the workspace
prisma.project.update({
Expand Down
12 changes: 10 additions & 2 deletions apps/web/lib/api/audit-logs/record-audit-log.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getIP } from "@/lib/api/utils/get-ip";
import { tb } from "@/lib/tinybird";
import { tb, tbNew } from "@/lib/tinybird";
import { log } from "@dub/utils";
import { ipAddress as getIPAddress } from "@vercel/functions";
import { ipAddress as getIPAddress, waitUntil } from "@vercel/functions";
import { headers } from "next/headers";
import { z } from "zod";
import { createId } from "../create-id";
Expand Down Expand Up @@ -56,6 +56,7 @@ export const recordAuditLog = async (data: AuditLogInput | AuditLogInput[]) => {
: [transformAuditLogTB(data, { headersList, ipAddress })];

try {
waitUntil(recordAuditLogTBNew(auditLogs));
await recordAuditLogTB(auditLogs);
} catch (error) {
console.error(
Expand All @@ -77,3 +78,10 @@ const recordAuditLogTB = tb.buildIngestEndpoint({
event: auditLogSchemaTB,
wait: true,
});

// TODO: Remove after Tinybird migration
const recordAuditLogTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_audit_logs",
event: auditLogSchemaTB,
wait: true,
});
10 changes: 3 additions & 7 deletions apps/web/lib/api/conversions/track-lead.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { includeTags } from "@/lib/api/links/include-tags";
import { generateRandomName } from "@/lib/names";
import { createPartnerCommission } from "@/lib/partners/create-partner-commission";
import { isStored, storage } from "@/lib/storage";
import { getClickEvent, recordLead, recordLeadSync } from "@/lib/tinybird";
import { getClickEvent, recordLead } from "@/lib/tinybird";
import { logConversionEvent } from "@/lib/tinybird/log-conversion-events";
import { ClickEventTB, WebhookPartner, WorkspaceProps } from "@/lib/types";
import { redis } from "@/lib/upstash";
Expand Down Expand Up @@ -214,9 +214,6 @@ export const trackLead = async ({
: leadEventPayload;

await Promise.all([
// Use recordLeadSync which waits for the operation to complete
recordLeadSync(leadEventPayload),

// Cache the latest lead event for 5 minutes because the ingested event is not available immediately on Tinybird
// we're setting two keys because we want to support the use case where the customer has multiple lead events
redis.set(`leadCache:${customer.id}`, cacheLeadEventPayload, {
Expand All @@ -235,9 +232,8 @@ export const trackLead = async ({

waitUntil(
(async () => {
// for async mode, record the lead event in the background
// for deferred mode, defer the lead event creation to a subsequent request
if (mode === "async") {
// for deferred mode, we defer the lead event creation to a subsequent request
if (mode !== "deferred") {
await recordLead(createLeadEventPayload(customer.id));
}

Expand Down
9 changes: 2 additions & 7 deletions apps/web/lib/api/links/bulk-delete-links.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { storage } from "@/lib/storage";
import { recordLinkTB, transformLinkTB } from "@/lib/tinybird";
import { recordLink } from "@/lib/tinybird";
import { prisma } from "@dub/prisma";
import { R2_URL } from "@dub/utils";
import { linkCache } from "./cache";
Expand All @@ -15,12 +15,7 @@ export async function bulkDeleteLinks(links: ExpandedLink[]) {
linkCache.deleteMany(links),

// Record the links deletion in Tinybird
recordLinkTB(
links.map((link) => ({
...transformLinkTB(link),
deleted: true,
})),
),
recordLink(links, { deleted: true }),

// For links that have an image, delete the image from R2
links
Expand Down
7 changes: 2 additions & 5 deletions apps/web/lib/api/links/delete-link.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { storage } from "@/lib/storage";
import { recordLinkTB, transformLinkTB } from "@/lib/tinybird";
import { recordLink } from "@/lib/tinybird";
import { prisma } from "@dub/prisma";
import { R2_URL } from "@dub/utils";
import { waitUntil } from "@vercel/functions";
Expand Down Expand Up @@ -30,10 +30,7 @@ export async function deleteLink(linkId: string) {
linkCache.delete(link),

// Record link in the Tinybird
recordLinkTB({
...transformLinkTB(link),
deleted: true,
}),
recordLink(link, { deleted: true }),

link.projectId &&
prisma.project.update({
Expand Down
6 changes: 6 additions & 0 deletions apps/web/lib/tinybird/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ export const tb = new Tinybird({
token: process.env.TINYBIRD_API_KEY as string,
baseUrl: process.env.TINYBIRD_API_URL as string,
});

// TODO: Remove after Tinybird migration
export const tbNew = new Tinybird({
token: process.env.TINYBIRD_API_KEY_NEW as string,
baseUrl: process.env.TINYBIRD_API_URL as string,
});
16 changes: 14 additions & 2 deletions apps/web/lib/tinybird/log-conversion-events.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { waitUntil } from "@vercel/functions";
import { z } from "zod";
import { tb } from "./client";
import { tb, tbNew } from "./client";

const schema = z.object({
workspace_id: z.string(),
Expand All @@ -10,7 +11,18 @@ const schema = z.object({
});

// Log the conversion events for debugging purposes
export const logConversionEvent = tb.buildIngestEndpoint({
export const logConversionEventTB = tb.buildIngestEndpoint({
datasource: "dub_conversion_events_log",
event: schema,
});

// TODO: Remove after Tinybird migration
export const logConversionEventTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_conversion_events_log",
event: schema,
});

export const logConversionEvent = async (payload: any) => {
waitUntil(logConversionEventTBNew(payload));
return await logConversionEventTB(payload);
};
16 changes: 14 additions & 2 deletions apps/web/lib/tinybird/log-import-error.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
import { waitUntil } from "@vercel/functions";
import { importErrorLogSchema } from "../zod/schemas/import-error-log";
import { tb } from "./client";
import { tb, tbNew } from "./client";

export const logImportError = tb.buildIngestEndpoint({
export const logImportErrorTB = tb.buildIngestEndpoint({
datasource: "dub_import_error_logs",
event: importErrorLogSchema,
});

// TODO: Remove after Tinybird migration
export const logImportErrorTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_import_error_logs",
event: importErrorLogSchema,
});

export const logImportError = async (payload: any) => {
waitUntil(logImportErrorTBNew(payload));
return await logImportErrorTB(payload);
};
17 changes: 15 additions & 2 deletions apps/web/lib/tinybird/record-click-zod.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { waitUntil } from "@vercel/functions";
import { z } from "zod";
import { tb } from "./client";
import { tb, tbNew } from "./client";

export const recordClickZodSchema = z.object({
timestamp: z.string().default(""),
Expand Down Expand Up @@ -33,8 +34,20 @@ export const recordClickZodSchema = z.object({
trigger: z.string().default("link"),
});

export const recordClickZod = tb.buildIngestEndpoint({
export const recordClickZodTB = tb.buildIngestEndpoint({
datasource: "dub_click_events",
event: recordClickZodSchema,
wait: true,
});

// TODO: Remove after Tinybird migration
export const recordClickZodTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_click_events",
event: recordClickZodSchema,
wait: true,
});

export const recordClickZod = async (payload: any) => {
waitUntil(recordClickZodTBNew(payload));
return await recordClickZodTB(payload);
};
12 changes: 12 additions & 0 deletions apps/web/lib/tinybird/record-click.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,18 @@ export async function recordClick({
[linkId],
);
}),

// TODO: Remove after Tinybird migration
fetchWithRetry(
`${process.env.TINYBIRD_API_URL}/v0/events?name=dub_click_events&wait=true`,
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.TINYBIRD_API_KEY_NEW}`,
},
body: JSON.stringify(clickData),
},
).then((res) => res.json()),
]);

// Find the rejected promises and log them
Expand Down
28 changes: 23 additions & 5 deletions apps/web/lib/tinybird/record-lead.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,39 @@
import { waitUntil } from "@vercel/functions";
import { z } from "zod";
import { leadEventSchemaTB } from "../zod/schemas/leads";
import { tb } from "./client";
import { tb, tbNew } from "./client";

export const recordLead = tb.buildIngestEndpoint({
export const recordLeadTB = tb.buildIngestEndpoint({
datasource: "dub_lead_events",
event: leadEventSchemaTB,
});

export const recordLeadSync = tb.buildIngestEndpoint({
// TODO: Remove after Tinybird migration
export const recordLeadTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_lead_events",
event: leadEventSchemaTB,
wait: true,
});

export const recordLeadWithTimestamp = tb.buildIngestEndpoint({
export const recordLead = async (payload: any) => {
waitUntil(recordLeadTBNew(payload));
return await recordLeadTB(payload);
};

export const recordLeadWithTimestampTB = tb.buildIngestEndpoint({
datasource: "dub_lead_events",
event: leadEventSchemaTB.extend({
timestamp: z.string(),
}),
});

export const recordLeadWithTimestampTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_lead_events",
event: leadEventSchemaTB.extend({
timestamp: z.string(),
}),
});

export const recordLeadWithTimestamp = async (payload: any) => {
waitUntil(recordLeadWithTimestampTBNew(payload));
return await recordLeadWithTimestampTB(payload);
};
27 changes: 23 additions & 4 deletions apps/web/lib/tinybird/record-link.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import z from "@/lib/zod";
import { waitUntil } from "@vercel/functions";
import { ExpandedLink } from "../api/links";
import { decodeKeyIfCaseSensitive } from "../api/links/case-sensitivity";
import { prefixWorkspaceId } from "../api/workspaces/workspace-id";
import { tb } from "./client";
import { tb, tbNew } from "./client";

export const dubLinksMetadataSchema = z.object({
link_id: z.string(),
Expand Down Expand Up @@ -49,6 +50,13 @@ export const recordLinkTB = tb.buildIngestEndpoint({
wait: true,
});

// TODO: Remove after Tinybird migration
export const recordLinkTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_links_metadata",
event: dubLinksMetadataSchema,
wait: true,
});

export const transformLinkTB = (link: ExpandedLink) => {
const key = decodeKeyIfCaseSensitive({
domain: link.domain,
Expand All @@ -70,10 +78,21 @@ export const transformLinkTB = (link: ExpandedLink) => {
};
};

export const recordLink = async (payload: ExpandedLink | ExpandedLink[]) => {
export const recordLink = async (
payload: ExpandedLink | ExpandedLink[],
{ deleted }: { deleted?: boolean } = {},
) => {
if (Array.isArray(payload)) {
return await recordLinkTB(payload.map(transformLinkTB));
waitUntil(
recordLinkTBNew(
payload.map(transformLinkTB).map((p) => ({ ...p, deleted })),
),
);
return await recordLinkTB(
payload.map(transformLinkTB).map((p) => ({ ...p, deleted })),
);
} else {
return await recordLinkTB(transformLinkTB(payload));
waitUntil(recordLinkTBNew({ ...transformLinkTB(payload), deleted }));
return await recordLinkTB({ ...transformLinkTB(payload), deleted });
}
};
Loading