Skip to content
Merged
9 changes: 2 additions & 7 deletions apps/web/app/(ee)/api/cron/cleanup/link-retention/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { handleAndReturnErrorResponse } from "@/lib/api/errors";
import { qstash } from "@/lib/cron";
import { verifyQstashSignature } from "@/lib/cron/verify-qstash";
import { verifyVercelSignature } from "@/lib/cron/verify-vercel";
import { recordLinkTB, transformLinkTB } from "@/lib/tinybird";
import { recordLink } from "@/lib/tinybird";
import { prisma } from "@dub/prisma";
import { APP_DOMAIN_WITH_NGROK } from "@dub/utils";
import { Domain } from "@prisma/client";
Expand Down Expand Up @@ -125,12 +125,7 @@ async function deleteOldLinks(

// // Record the links deletion in Tinybird
// // not 100% sure if we need this yet, maybe we should just delete the link completely from TB to save space?
await recordLinkTB(
links.map((link) => ({
...transformLinkTB(link),
deleted: true,
})),
);
await recordLink(links, { deleted: true });

console.log(
`[Link retention cleanup] Deleted ${links.length} links for ${domain.slug} that are older than ${domain.linkRetentionDays} days!`,
Expand Down
12 changes: 12 additions & 0 deletions apps/web/app/(ee)/api/cron/framer/backfill-leads-batch/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,18 @@ export const POST = withWorkspace(async ({ req, workspace }) => {
body: dataArray.map((d) => JSON.stringify(d.clickData)).join("\n"),
},
),
// TODO: Remove after Tinybird migration
fetch(
`${process.env.TINYBIRD_API_URL}/v0/events?name=dub_click_events&wait=true`,
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.TINYBIRD_API_KEY_NEW}`,
"Content-Type": "application/x-ndjson",
},
body: dataArray.map((d) => JSON.stringify(d.clickData)).join("\n"),
},
),

// Record leads
recordLeadWithTimestamp(dataArray.map((d) => d.leadEventData)),
Expand Down
9 changes: 2 additions & 7 deletions apps/web/lib/actions/partners/revoke-program-invite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import { linkCache } from "@/lib/api/links/cache";
import { getDefaultProgramIdOrThrow } from "@/lib/api/programs/get-default-program-id-or-throw";
import { recordLinkTB, transformLinkTB } from "@/lib/tinybird";
import { recordLink } from "@/lib/tinybird";
import { prisma } from "@dub/prisma";
import { waitUntil } from "@vercel/functions";
import z from "../../zod";
Expand Down Expand Up @@ -72,12 +72,7 @@ export const revokeProgramInviteAction = authActionClient
linkCache.expireMany(partnerLinks),

// Record the links deletion in Tinybird
recordLinkTB(
partnerLinks.map((link) => ({
...transformLinkTB(link),
deleted: true,
})),
),
recordLink(partnerLinks, { deleted: true }),

// Update totalLinks for the workspace
prisma.project.update({
Expand Down
12 changes: 10 additions & 2 deletions apps/web/lib/api/audit-logs/record-audit-log.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getIP } from "@/lib/api/utils/get-ip";
import { tb } from "@/lib/tinybird";
import { tb, tbNew } from "@/lib/tinybird";
import { log } from "@dub/utils";
import { ipAddress as getIPAddress } from "@vercel/functions";
import { ipAddress as getIPAddress, waitUntil } from "@vercel/functions";
import { headers } from "next/headers";
import { z } from "zod";
import { createId } from "../create-id";
Expand Down Expand Up @@ -56,6 +56,7 @@ export const recordAuditLog = async (data: AuditLogInput | AuditLogInput[]) => {
: [transformAuditLogTB(data, { headersList, ipAddress })];

try {
waitUntil(recordAuditLogTBNew(auditLogs));
await recordAuditLogTB(auditLogs);
} catch (error) {
console.error(
Expand All @@ -77,3 +78,10 @@ const recordAuditLogTB = tb.buildIngestEndpoint({
event: auditLogSchemaTB,
wait: true,
});

// TODO: Remove after Tinybird migration
const recordAuditLogTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_audit_logs",
event: auditLogSchemaTB,
wait: true,
});
10 changes: 3 additions & 7 deletions apps/web/lib/api/conversions/track-lead.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { includeTags } from "@/lib/api/links/include-tags";
import { generateRandomName } from "@/lib/names";
import { createPartnerCommission } from "@/lib/partners/create-partner-commission";
import { isStored, storage } from "@/lib/storage";
import { getClickEvent, recordLead, recordLeadSync } from "@/lib/tinybird";
import { getClickEvent, recordLead } from "@/lib/tinybird";
import { logConversionEvent } from "@/lib/tinybird/log-conversion-events";
import { ClickEventTB, WebhookPartner, WorkspaceProps } from "@/lib/types";
import { redis } from "@/lib/upstash";
Expand Down Expand Up @@ -214,9 +214,6 @@ export const trackLead = async ({
: leadEventPayload;

await Promise.all([
// Use recordLeadSync which waits for the operation to complete
recordLeadSync(leadEventPayload),

// Cache the latest lead event for 5 minutes because the ingested event is not available immediately on Tinybird
// we're setting two keys because we want to support the use case where the customer has multiple lead events
redis.set(`leadCache:${customer.id}`, cacheLeadEventPayload, {
Expand All @@ -235,9 +232,8 @@ export const trackLead = async ({

waitUntil(
(async () => {
// for async mode, record the lead event in the background
// for deferred mode, defer the lead event creation to a subsequent request
if (mode === "async") {
// for deferred mode, we defer the lead event creation to a subsequent request
if (mode !== "deferred") {
await recordLead(createLeadEventPayload(customer.id));
}

Expand Down
9 changes: 2 additions & 7 deletions apps/web/lib/api/links/bulk-delete-links.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { storage } from "@/lib/storage";
import { recordLinkTB, transformLinkTB } from "@/lib/tinybird";
import { recordLink } from "@/lib/tinybird";
import { prisma } from "@dub/prisma";
import { R2_URL } from "@dub/utils";
import { linkCache } from "./cache";
Expand All @@ -15,12 +15,7 @@ export async function bulkDeleteLinks(links: ExpandedLink[]) {
linkCache.deleteMany(links),

// Record the links deletion in Tinybird
recordLinkTB(
links.map((link) => ({
...transformLinkTB(link),
deleted: true,
})),
),
recordLink(links, { deleted: true }),

// For links that have an image, delete the image from R2
links
Expand Down
7 changes: 2 additions & 5 deletions apps/web/lib/api/links/delete-link.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { storage } from "@/lib/storage";
import { recordLinkTB, transformLinkTB } from "@/lib/tinybird";
import { recordLink } from "@/lib/tinybird";
import { prisma } from "@dub/prisma";
import { R2_URL } from "@dub/utils";
import { waitUntil } from "@vercel/functions";
Expand Down Expand Up @@ -28,10 +28,7 @@ export async function deleteLink(linkId: string) {
linkCache.delete(link),

// Record link in the Tinybird
recordLinkTB({
...transformLinkTB(link),
deleted: true,
}),
recordLink(link, { deleted: true }),

link.projectId &&
prisma.project.update({
Expand Down
6 changes: 6 additions & 0 deletions apps/web/lib/tinybird/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ export const tb = new Tinybird({
token: process.env.TINYBIRD_API_KEY as string,
baseUrl: process.env.TINYBIRD_API_URL as string,
});

// TODO: Remove after Tinybird migration
export const tbNew = new Tinybird({
token: process.env.TINYBIRD_API_KEY_NEW as string,
baseUrl: process.env.TINYBIRD_API_URL as string,
});
16 changes: 14 additions & 2 deletions apps/web/lib/tinybird/log-conversion-events.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { waitUntil } from "@vercel/functions";
import { z } from "zod";
import { tb } from "./client";
import { tb, tbNew } from "./client";

const schema = z.object({
workspace_id: z.string(),
Expand All @@ -10,7 +11,18 @@ const schema = z.object({
});

// Log the conversion events for debugging purposes
export const logConversionEvent = tb.buildIngestEndpoint({
export const logConversionEventTB = tb.buildIngestEndpoint({
datasource: "dub_conversion_events_log",
event: schema,
});

// TODO: Remove after Tinybird migration
export const logConversionEventTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_conversion_events_log",
event: schema,
});

export const logConversionEvent = async (payload: any) => {
waitUntil(logConversionEventTBNew(payload));
return await logConversionEventTB(payload);
};
16 changes: 14 additions & 2 deletions apps/web/lib/tinybird/log-import-error.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
import { waitUntil } from "@vercel/functions";
import { importErrorLogSchema } from "../zod/schemas/import-error-log";
import { tb } from "./client";
import { tb, tbNew } from "./client";

export const logImportError = tb.buildIngestEndpoint({
export const logImportErrorTB = tb.buildIngestEndpoint({
datasource: "dub_import_error_logs",
event: importErrorLogSchema,
});

// TODO: Remove after Tinybird migration
export const logImportErrorTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_import_error_logs",
event: importErrorLogSchema,
});

export const logImportError = async (payload: any) => {
waitUntil(logImportErrorTBNew(payload));
return await logImportErrorTB(payload);
};
17 changes: 15 additions & 2 deletions apps/web/lib/tinybird/record-click-zod.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { waitUntil } from "@vercel/functions";
import { z } from "zod";
import { tb } from "./client";
import { tb, tbNew } from "./client";

export const recordClickZodSchema = z.object({
timestamp: z.string().default(""),
Expand Down Expand Up @@ -33,8 +34,20 @@ export const recordClickZodSchema = z.object({
trigger: z.string().default("link"),
});

export const recordClickZod = tb.buildIngestEndpoint({
export const recordClickZodTB = tb.buildIngestEndpoint({
datasource: "dub_click_events",
event: recordClickZodSchema,
wait: true,
});

// TODO: Remove after Tinybird migration
export const recordClickZodTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_click_events",
event: recordClickZodSchema,
wait: true,
});

export const recordClickZod = async (payload: any) => {
waitUntil(recordClickZodTBNew(payload));
return await recordClickZodTB(payload);
};
11 changes: 11 additions & 0 deletions apps/web/lib/tinybird/record-click.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,17 @@ export async function recordClick({
body: JSON.stringify(clickData),
},
).then((res) => res.json()),
// TODO: Remove after Tinybird migration
fetchWithRetry(
`${process.env.TINYBIRD_API_URL}/v0/events?name=dub_click_events&wait=true`,
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.TINYBIRD_API_KEY_NEW}`,
},
body: JSON.stringify(clickData),
},
).then((res) => res.json()),

// cache the recorded click for the corresponding IP address in Redis for 1 hour
recordClickCache.set({ domain, key, ip, clickId }),
Expand Down
28 changes: 23 additions & 5 deletions apps/web/lib/tinybird/record-lead.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,39 @@
import { waitUntil } from "@vercel/functions";
import { z } from "zod";
import { leadEventSchemaTB } from "../zod/schemas/leads";
import { tb } from "./client";
import { tb, tbNew } from "./client";

export const recordLead = tb.buildIngestEndpoint({
export const recordLeadTB = tb.buildIngestEndpoint({
datasource: "dub_lead_events",
event: leadEventSchemaTB,
});

export const recordLeadSync = tb.buildIngestEndpoint({
// TODO: Remove after Tinybird migration
export const recordLeadTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_lead_events",
event: leadEventSchemaTB,
wait: true,
});

export const recordLeadWithTimestamp = tb.buildIngestEndpoint({
export const recordLead = async (payload: any) => {
waitUntil(recordLeadTBNew(payload));
return await recordLeadTB(payload);
};

export const recordLeadWithTimestampTB = tb.buildIngestEndpoint({
datasource: "dub_lead_events",
event: leadEventSchemaTB.extend({
timestamp: z.string(),
}),
});

export const recordLeadWithTimestampTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_lead_events",
event: leadEventSchemaTB.extend({
timestamp: z.string(),
}),
});

export const recordLeadWithTimestamp = async (payload: any) => {
waitUntil(recordLeadWithTimestampTBNew(payload));
return await recordLeadWithTimestampTB(payload);
};
27 changes: 23 additions & 4 deletions apps/web/lib/tinybird/record-link.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import z from "@/lib/zod";
import { waitUntil } from "@vercel/functions";
import { ExpandedLink } from "../api/links";
import { decodeKeyIfCaseSensitive } from "../api/links/case-sensitivity";
import { prefixWorkspaceId } from "../api/workspaces/workspace-id";
import { tb } from "./client";
import { tb, tbNew } from "./client";

export const dubLinksMetadataSchema = z.object({
link_id: z.string(),
Expand Down Expand Up @@ -49,6 +50,13 @@ export const recordLinkTB = tb.buildIngestEndpoint({
wait: true,
});

// TODO: Remove after Tinybird migration
export const recordLinkTBNew = tbNew.buildIngestEndpoint({
datasource: "dub_links_metadata",
event: dubLinksMetadataSchema,
wait: true,
});

export const transformLinkTB = (link: ExpandedLink) => {
const key = decodeKeyIfCaseSensitive({
domain: link.domain,
Expand All @@ -70,10 +78,21 @@ export const transformLinkTB = (link: ExpandedLink) => {
};
};

export const recordLink = async (payload: ExpandedLink | ExpandedLink[]) => {
export const recordLink = async (
payload: ExpandedLink | ExpandedLink[],
{ deleted }: { deleted?: boolean } = {},
) => {
if (Array.isArray(payload)) {
return await recordLinkTB(payload.map(transformLinkTB));
waitUntil(
recordLinkTBNew(
payload.map(transformLinkTB).map((p) => ({ ...p, deleted })),
),
);
return await recordLinkTB(
payload.map(transformLinkTB).map((p) => ({ ...p, deleted })),
);
} else {
return await recordLinkTB(transformLinkTB(payload));
waitUntil(recordLinkTBNew({ ...transformLinkTB(payload), deleted }));
return await recordLinkTB({ ...transformLinkTB(payload), deleted });
}
};
Loading