Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
"scripts": {
"build": "yarn workspaces run build",
"test": "yarn workspaces run test",
"dev": "yarn workspace @sourcebot/db prisma:migrate:dev && npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web dev:redis",
"dev:mt": "yarn workspace @sourcebot/db prisma:migrate:dev && npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web dev:redis",
"dev": "yarn workspace @sourcebot/db prisma:migrate:dev && cross-env SOURCEBOT_TENANT_MODE=single npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web dev:redis",
"dev:mt": "yarn workspace @sourcebot/db prisma:migrate:dev && cross-env SOURCEBOT_TENANT_MODE=multi npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web dev:redis",
"dev:zoekt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=none && zoekt-webserver -index .sourcebot/index -rpc",
"dev:zoekt:mt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=strict && zoekt-webserver -index .sourcebot/index -rpc",
"dev:backend": "yarn workspace @sourcebot/backend dev:watch",
"dev:web": "yarn workspace @sourcebot/web dev",
"dev:redis": "docker ps --filter \"name=redis\" --format \"{{.Names}}\" | grep -q \"^redis$\" && docker rm -f redis; docker run -d --name redis -p 6379:6379 redis"

},
"devDependencies": {
"cross-env": "^7.0.3",
"npm-run-all": "^4.1.5"
}
}
6 changes: 4 additions & 2 deletions packages/backend/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { SourcebotConfigurationSchema } from "./schemas/v2.js";
import { AppContext } from "./types.js";
import { getTokenFromConfig, isRemotePath, marshalBool } from "./utils.js";

export const syncConfig = async (configPath: string, db: PrismaClient, signal: AbortSignal, ctx: AppContext) => {
export const fetchConfigFromPath = async (configPath: string, signal: AbortSignal) => {
const configContent = await (async () => {
if (isRemotePath(configPath)) {
const response = await fetch(configPath, {
Expand All @@ -25,9 +25,11 @@ export const syncConfig = async (configPath: string, db: PrismaClient, signal: A
}
})();

// @todo: we should validate the configuration file's structure here.
const config = JSON.parse(stripJsonComments(configContent)) as SourcebotConfigurationSchema;
return config;
}

export const syncConfig = async (config: SourcebotConfigurationSchema, db: PrismaClient, signal: AbortSignal, ctx: AppContext) => {
for (const repoConfig of config.repos ?? []) {
switch (repoConfig.type) {
case 'github': {
Expand Down
1 change: 1 addition & 0 deletions packages/backend/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ export const DEFAULT_SETTINGS: Settings = {
reindexIntervalMs: 1000 * 60,
resyncIntervalMs: 1000 * 60 * 60 * 24, // 1 day in milliseconds
indexConcurrencyMultiple: 3,
configSyncConcurrencyMultiple: 3,
}
8 changes: 7 additions & 1 deletion packages/backend/src/environment.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import dotenv from 'dotenv';

export const getEnv = (env: string | undefined, defaultValue?: string) => {
export const getEnv = (env: string | undefined, defaultValue?: string, required?: boolean) => {
if (required && !env && !defaultValue) {
throw new Error(`Missing required environment variable`);
}

return env ?? defaultValue;
}

Expand All @@ -15,6 +19,8 @@ dotenv.config({
path: './.env',
});


export const SOURCEBOT_TENANT_MODE = getEnv(process.env.SOURCEBOT_TENANT_MODE, undefined, true);
export const SOURCEBOT_LOG_LEVEL = getEnv(process.env.SOURCEBOT_LOG_LEVEL, 'info')!;
export const SOURCEBOT_TELEMETRY_DISABLED = getEnvBoolean(process.env.SOURCEBOT_TELEMETRY_DISABLED, false)!;
export const SOURCEBOT_INSTALL_ID = getEnv(process.env.SOURCEBOT_INSTALL_ID, 'unknown')!;
Expand Down
7 changes: 4 additions & 3 deletions packages/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { isRemotePath } from "./utils.js";
import { AppContext } from "./types.js";
import { main } from "./main.js"
import { PrismaClient } from "@sourcebot/db";
import { SOURCEBOT_TENANT_MODE } from "./environment.js";


const parser = new ArgumentParser({
Expand All @@ -19,7 +20,7 @@ type Arguments = {

parser.add_argument("--configPath", {
help: "Path to config file",
required: true,
required: false,
});

parser.add_argument("--cacheDir", {
Expand All @@ -28,8 +29,8 @@ parser.add_argument("--cacheDir", {
});
const args = parser.parse_args() as Arguments;

if (!isRemotePath(args.configPath) && !existsSync(args.configPath)) {
console.error(`Config file ${args.configPath} does not exist`);
if (SOURCEBOT_TENANT_MODE === "single" && !isRemotePath(args.configPath) && !existsSync(args.configPath)) {
console.error(`Config file ${args.configPath} does not exist, and is required in single tenant mode`);
process.exit(1);
}

Expand Down
137 changes: 107 additions & 30 deletions packages/backend/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { PrismaClient, Repo, RepoIndexingStatus } from '@sourcebot/db';
import { ConfigSyncStatus, PrismaClient, Repo, Config, RepoIndexingStatus, Prisma } from '@sourcebot/db';
import { existsSync, watch } from 'fs';
import { syncConfig } from "./config.js";
import { fetchConfigFromPath, syncConfig } from "./config.js";
import { cloneRepository, fetchRepository } from "./git.js";
import { createLogger } from "./logger.js";
import { captureEvent } from "./posthog.js";
Expand All @@ -11,6 +11,8 @@ import { DEFAULT_SETTINGS } from './constants.js';
import { Queue, Worker, Job } from 'bullmq';
import { Redis } from 'ioredis';
import * as os from 'os';
import { SOURCEBOT_TENANT_MODE } from './environment.js';
import { SourcebotConfigurationSchema } from './schemas/v2.js';

const logger = createLogger('main');

Expand Down Expand Up @@ -56,6 +58,23 @@ const syncGitRepository = async (repo: Repo, ctx: AppContext) => {
}
}

async function addConfigsToQueue(db: PrismaClient, queue: Queue, configs: Config[]) {
for (const config of configs) {
await db.$transaction(async (tx) => {
await tx.config.update({
where: { id: config.id },
data: { syncStatus: ConfigSyncStatus.IN_SYNC_QUEUE },
});

// Add the job to the queue
await queue.add('configSyncJob', config);
logger.info(`Added job to queue for config ${config.id}`);
}).catch((err: unknown) => {
logger.error(`Failed to add job to queue for config ${config.id}: ${err}`);
});
}
}

async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
for (const repo of repos) {
await db.$transaction(async (tx) => {
Expand All @@ -67,7 +86,7 @@ async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
// Add the job to the queue
await queue.add('indexJob', repo);
logger.info(`Added job to queue for repo ${repo.id}`);
}).catch((err) => {
}).catch((err: unknown) => {
logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
});
}
Expand All @@ -76,66 +95,123 @@ async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
export const main = async (db: PrismaClient, context: AppContext) => {
let abortController = new AbortController();
let isSyncing = false;
const _syncConfig = async () => {
const _syncConfig = async (dbConfig?: Prisma.JsonValue | undefined) => {
if (isSyncing) {
abortController.abort();
abortController = new AbortController();
}

let config: SourcebotConfigurationSchema;
switch (SOURCEBOT_TENANT_MODE) {
case 'single':
logger.info(`Syncing configuration file ${context.configPath} ...`);
config = await fetchConfigFromPath(context.configPath, abortController.signal);
break;
case 'multi':
if(!dbConfig) {
throw new Error('config object is required in multi tenant mode');
}
config = dbConfig as SourcebotConfigurationSchema
break;
default:
throw new Error(`Invalid SOURCEBOT_TENANT_MODE: ${SOURCEBOT_TENANT_MODE}`);
}

logger.info(`Syncing configuration file ${context.configPath} ...`);
isSyncing = true;

try {
const { durationMs } = await measure(() => syncConfig(context.configPath, db, abortController.signal, context))
logger.info(`Synced configuration file ${context.configPath} in ${durationMs / 1000}s`);
const { durationMs } = await measure(() => syncConfig(config, db, abortController.signal, context))
logger.info(`Synced configuration file in ${durationMs / 1000}s`);
isSyncing = false;
} catch (err: any) {
if (err.name === "AbortError") {
// @note: If we're aborting, we don't want to set isSyncing to false
// since it implies another sync is in progress.
} else {
isSyncing = false;
logger.error(`Failed to sync configuration file ${context.configPath} with error:`);
logger.error(`Failed to sync configuration file with error:`);
console.log(err);
}
}
}

// Re-sync on file changes if the config file is local
if (!isRemotePath(context.configPath)) {
watch(context.configPath, () => {
logger.info(`Config file ${context.configPath} changed. Re-syncing...`);
_syncConfig();
});
}

// Re-sync at a fixed interval
setInterval(() => {
_syncConfig();
}, DEFAULT_SETTINGS.resyncIntervalMs);

// Sync immediately on startup
await _syncConfig();

/////////////////////////////
// Init Redis
/////////////////////////////
const redis = new Redis({
host: 'localhost',
port: 6379,
maxRetriesPerRequest: null
});
redis.ping().then(() => {
logger.info('Connected to redis');
}).catch((err) => {
}).catch((err: unknown) => {
logger.error('Failed to connect to redis');
console.error(err);
process.exit(1);
});

/////////////////////////////
// Setup config sync watchers
/////////////////////////////
switch (SOURCEBOT_TENANT_MODE) {
case 'single':
// Re-sync on file changes if the config file is local
if (!isRemotePath(context.configPath)) {
watch(context.configPath, () => {
logger.info(`Config file ${context.configPath} changed. Re-syncing...`);
_syncConfig();
});
}

// Re-sync at a fixed interval
setInterval(() => {
_syncConfig();
}, DEFAULT_SETTINGS.resyncIntervalMs);

// Sync immediately on startup
await _syncConfig();
break;
case 'multi':
const configSyncQueue = new Queue('configSyncQueue');
const numCores = os.cpus().length;
const numWorkers = numCores * DEFAULT_SETTINGS.configSyncConcurrencyMultiple;
logger.info(`Detected ${numCores} cores. Setting config sync max concurrency to ${numWorkers}`);
const configSyncWorker = new Worker('configSyncQueue', async (job: Job) => {
const config = job.data as Config;
await _syncConfig(config.data);
}, { connection: redis, concurrency: numWorkers });
configSyncWorker.on('completed', (job: Job) => {
logger.info(`Config sync job ${job.id} completed`);
});
configSyncWorker.on('failed', (job: Job | undefined, err: unknown) => {
logger.info(`Config sync job failed with error: ${err}`);
});

setInterval(async () => {
const configs = await db.config.findMany({
where: {
syncStatus: ConfigSyncStatus.SYNC_NEEDED,
}
});

logger.info(`Found ${configs.length} configs to sync...`);
addConfigsToQueue(db, configSyncQueue, configs);
}, 1000);
break;
default:
throw new Error(`Invalid SOURCEBOT_TENANT_MODE: ${SOURCEBOT_TENANT_MODE}`);
}


/////////////////////////
// Setup repo indexing
/////////////////////////
const indexQueue = new Queue('indexQueue');

const numCores = os.cpus().length;
const numWorkers = numCores * DEFAULT_SETTINGS.indexConcurrencyMultiple;
logger.info(`Detected ${numCores} cores. Setting max concurrency to ${numWorkers}`);
const worker = new Worker('indexQueue', async (job) => {
logger.info(`Detected ${numCores} cores. Setting repo index max concurrency to ${numWorkers}`);
const worker = new Worker('indexQueue', async (job: Job) => {
const repo = job.data as Repo;

let indexDuration_s: number | undefined;
Expand Down Expand Up @@ -166,10 +242,10 @@ export const main = async (db: PrismaClient, context: AppContext) => {
});
}, { connection: redis, concurrency: numWorkers });

worker.on('completed', (job) => {
worker.on('completed', (job: Job) => {
logger.info(`Job ${job.id} completed`);
});
worker.on('failed', async (job: Job | undefined, err) => {
worker.on('failed', async (job: Job | undefined, err: unknown) => {
logger.info(`Job failed with error: ${err}`);
if (job) {
await db.repo.update({
Expand All @@ -183,6 +259,7 @@ export const main = async (db: PrismaClient, context: AppContext) => {
}
});

// Repo indexing loop
while (true) {
const thresholdDate = new Date(Date.now() - DEFAULT_SETTINGS.reindexIntervalMs);
const repos = await db.repo.findMany({
Expand Down
4 changes: 4 additions & 0 deletions packages/backend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ export type Settings = {
* The multiple of the number of CPUs to use for indexing.
*/
indexConcurrencyMultiple: number;
/**
* The multiple of the number of CPUs to use for syncing the configuration.
*/
configSyncConcurrencyMultiple: number;
}

// @see : https://stackoverflow.com/a/61132308
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- CreateTable
CREATE TABLE "Config" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"data" JSONB NOT NULL,
"createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" DATETIME NOT NULL,
"syncedAt" DATETIME,
"syncStatus" TEXT NOT NULL DEFAULT 'SYNC_NEEDED',
"orgId" INTEGER NOT NULL,
CONSTRAINT "Config_orgId_fkey" FOREIGN KEY ("orgId") REFERENCES "Org" ("id") ON DELETE CASCADE ON UPDATE CASCADE
);
23 changes: 23 additions & 0 deletions packages/db/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ enum RepoIndexingStatus {
FAILED
}

enum ConfigSyncStatus {
SYNC_NEEDED
IN_SYNC_QUEUE
SYNCING
SYNCED
FAILED
}

model Repo {
id Int @id @default(autoincrement())
name String
Expand All @@ -42,12 +50,27 @@ model Repo {
@@unique([external_id, external_codeHostUrl])
}

model Config {
id Int @id @default(autoincrement())
data Json
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
syncedAt DateTime?

syncStatus ConfigSyncStatus @default(SYNC_NEEDED)

// The organization that owns this config
org Org @relation(fields: [orgId], references: [id], onDelete: Cascade)
orgId Int
}

model Org {
id Int @id @default(autoincrement())
name String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
members UserToOrg[]
configs Config[]
}

model UserToOrg {
Expand Down
Loading