Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added language model input-modality and document capability resolution, automatically resolved from the models.dev catalog (falls back to text-only for uncatalogued/self-hosted models). [#1372](https://github.com/sourcebot-dev/sourcebot/pull/1372)
- [EE] Added DPoP sender-constrained OAuth tokens for MCP clients. [#1395](https://github.com/sourcebot-dev/sourcebot/pull/1395)
- [EE] Added text file attachments to Ask Sourcebot, letting users attach text/code/config files to a chat message via the paperclip button, drag-and-drop, or paste, with large pastes auto-converted to attachments. [#1374](https://github.com/sourcebot-dev/sourcebot/pull/1374)
- [EE] Added image attachments to Ask Sourcebot, letting users attach images to a chat message when the selected model supports image input. [#1375](https://github.com/sourcebot-dev/sourcebot/pull/1375)

### Fixed
- Send anonymous server-side PostHog events as personless so unauthenticated requests don't inflate person counts. [#1367](https://github.com/sourcebot-dev/sourcebot/pull/1367)
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/configuration/environment-variables.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ The following environment variables allow you to configure your Sourcebot deploy
| `SOURCEBOT_TELEMETRY_DISABLED` | `false` | <p>Enables/disables telemetry collection in Sourcebot. See [this doc](/docs/misc/telemetry) for more info.</p> |
| `DEFAULT_MAX_MATCH_COUNT` | `10000` | <p>The default maximum number of search results to return when using search in the web app.</p> |
| `ALWAYS_INDEX_FILE_PATTERNS` | - | <p>A comma separated list of glob patterns matching file paths that should always be indexed, regardless of size or number of trigrams.</p> |
| `SOURCEBOT_CHAT_ATTACHMENT_MAX_IMAGE_BYTES` | `10485760` (10 MiB) | <p>Maximum size in bytes of a single image attachment uploaded to Ask Sourcebot. Enforced server-side at upload time.</p> |
| `SOURCEBOT_CHAT_ATTACHMENT_ORPHAN_TTL_HOURS` | `24` | <p>How long in hours an uploaded-but-unsent attachment is retained before being deleted by the orphan sweep. Set to `0` to disable the sweep.</p> |
| `NODE_USE_ENV_PROXY` | `0` | <p>Enables Node.js to automatically use `HTTP_PROXY`, `HTTPS_PROXY`, and `NO_PROXY` environment variables for network requests. Set to `1` to enable or `0` to disable. See [this doc](https://nodejs.org/en/learn/http/enterprise-network-configuration) for more info.</p> |
| `HTTP_PROXY` | - | <p>HTTP proxy URL for routing non-SSL requests through a proxy server (e.g., `http://proxy.company.com:8080`). Requires `NODE_USE_ENV_PROXY=1`.</p> |
| `HTTPS_PROXY` | - | <p>HTTPS proxy URL for routing SSL requests through a proxy server (e.g., `http://proxy.company.com:8080`). Requires `NODE_USE_ENV_PROXY=1`.</p> |
Expand Down
156 changes: 156 additions & 0 deletions packages/backend/src/attachmentPruner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import { AttachmentStatus, PrismaClient } from "@sourcebot/db";
import { createLogger, env, getStorageBackend } from "@sourcebot/shared";
import { setIntervalAsync } from "./utils.js";

const BATCH_SIZE = 1_000;
const ONE_HOUR_MS = 60 * 60 * 1000;

const logger = createLogger('attachment-pruner');

/**
* Periodically reclaims orphaned attachment blobs older than the configured TTL,
* along with their stored bytes, using the `DELETING` tombstone protocol: an
* orphan is first atomically flipped to `DELETING`, then its bytes are deleted,
* and only then is the row removed. Because the row (the only durable handle to
* the bytes) outlives the byte delete, a failed byte delete is always retryable.
*
* Each tick condemns two classes of orphan to `DELETING`, then reclaims all
* tombstones:
*
* 1. PENDING (uploaded-but-never-linked): produced when a user selects a file
* in the chat box but never sends the message.
* 2. COMMITTED with zero links: normally a committed blob is reclaimed inline
* by the chat-delete sweep in the web app, but if that sweep is interrupted
* (process crash / DB error / failed byte delete) the blob is left tombstoned
* or unlinked. This is the backstop for that case.
*
* @note Byte deletion goes through the shared `StorageBackend`, so the web app
* and this worker share one on-disk layout.
*/
export class AttachmentPruner {
private interval?: NodeJS.Timeout;
private readonly storage = getStorageBackend();

constructor(private db: PrismaClient) {}

startScheduler() {
const ttlHours = env.SOURCEBOT_CHAT_ATTACHMENT_ORPHAN_TTL_HOURS;
if (ttlHours <= 0) {
logger.info('SOURCEBOT_CHAT_ATTACHMENT_ORPHAN_TTL_HOURS is 0, attachment orphan pruning is disabled.');
return;
}

logger.debug(`Attachment pruner started. Reclaiming orphaned attachments older than ${ttlHours} hours.`);

// Run immediately on startup, then every hour. The startup call isn't
// awaited, so log any failure here: this worker exits on
// unhandledRejection, and the recurring schedule will retry.
this.pruneOrphanedAttachments().catch((error) => {
logger.warn(`Initial attachment prune failed: ${error}`);
});
this.interval = setIntervalAsync(() => this.pruneOrphanedAttachments(), ONE_HOUR_MS);
}

async dispose() {
if (this.interval) {
clearInterval(this.interval);
this.interval = undefined;
}
}

private async pruneOrphanedAttachments() {
const cutoff = new Date(Date.now() - env.SOURCEBOT_CHAT_ATTACHMENT_ORPHAN_TTL_HOURS * ONE_HOUR_MS);

// Condemn orphans by flipping them to the DELETING tombstone. Each claim
// is atomic, so a PENDING blob committed by a concurrent send (its commit
// matches only PENDING rows) or a zero-link blob re-linked by a concurrent
// duplicate-chat loses the claim and is left intact.
//
// PENDING orphans: uploaded but the message was never sent.
const pendingClaimed = await this.db.attachment.updateMany({
where: {
status: AttachmentStatus.PENDING,
createdAt: { lt: cutoff },
},
data: { status: AttachmentStatus.DELETING },
});

// COMMITTED orphans: blobs left with zero links by an interrupted
// chat-delete sweep in the web app.
const committedClaimed = await this.db.attachment.updateMany({
where: {
status: AttachmentStatus.COMMITTED,
createdAt: { lt: cutoff },
chats: { none: {} },
},
data: { status: AttachmentStatus.DELETING },
});

// Reclaim every tombstone: delete bytes, then the row. This also picks up
// tombstones left behind by the web app's inline reclaim (or a crashed
// earlier tick) whose byte delete failed.
const reclaimed = await this.reclaimTombstonedAttachments();

if (pendingClaimed.count > 0 || committedClaimed.count > 0 || reclaimed > 0) {
logger.debug(
`Attachment prune: condemned ${pendingClaimed.count} PENDING + ` +
`${committedClaimed.count} COMMITTED orphan(s), reclaimed ${reclaimed} tombstone(s).`,
);
}
}

/**
* Deletes the bytes for every `DELETING` tombstone, then removes the row.
* The row (the only durable handle to the bytes) is removed only after its
* bytes are confirmed gone, so a failed byte delete leaves the tombstone in
* place to be retried on the next tick — bytes can never be orphaned by a
* transient storage error. Rows whose byte delete fails this run are
* excluded from subsequent batches so a persistent failure can't spin the
* loop.
*
* @returns the number of tombstones fully reclaimed (bytes + row).
*/
private async reclaimTombstonedAttachments(): Promise<number> {
let totalReclaimed = 0;
const failedIds: string[] = [];

while (true) {
const batch = await this.db.attachment.findMany({
where: { status: AttachmentStatus.DELETING, id: { notIn: failedIds } },
Comment thread
whoisthey marked this conversation as resolved.
select: { id: true, storageKey: true },
take: BATCH_SIZE,
});

if (batch.length === 0) {
break;
}

const settled = await Promise.allSettled(
batch.map((attachment) => this.storage.delete(attachment.storageKey)));

const reclaimedIds: string[] = [];
batch.forEach((attachment, index) => {
const outcome = settled[index];
if (outcome.status === 'fulfilled') {
reclaimedIds.push(attachment.id);
} else {
logger.warn(`Failed to delete bytes for tombstoned attachment ${attachment.id}, will retry next tick: ${outcome.reason}`);
failedIds.push(attachment.id);
}
});

if (reclaimedIds.length > 0) {
const result = await this.db.attachment.deleteMany({
where: { id: { in: reclaimedIds }, status: AttachmentStatus.DELETING },
});
totalReclaimed += result.count;
}

if (batch.length < BATCH_SIZE) {
break;
}
}

return totalReclaimed;
}
}
4 changes: 4 additions & 0 deletions packages/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import 'express-async-errors';
import { existsSync } from 'fs';
import { mkdir } from 'fs/promises';
import { Api } from "./api.js";
import { AttachmentPruner } from "./attachmentPruner.js";
import { ConfigManager } from "./configManager.js";
import { ConnectionManager } from './connectionManager.js';
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR, SHUTDOWN_SIGNALS } from './constants.js';
Expand Down Expand Up @@ -55,10 +56,12 @@ const accountPermissionSyncer = new AccountPermissionSyncer(prisma, settings, re
const repoIndexManager = new RepoIndexManager(prisma, settings, redis, promClient);
const configManager = new ConfigManager(prisma, connectionManager, env.CONFIG_PATH);
const auditLogPruner = new AuditLogPruner(prisma);
const attachmentPruner = new AttachmentPruner(prisma);

connectionManager.startScheduler();
await repoIndexManager.startScheduler();
auditLogPruner.startScheduler();
attachmentPruner.startScheduler();

if (env.PERMISSION_SYNC_ENABLED === 'true' && !await hasEntitlement('permission-syncing')) {
logger.warn('Permission syncing is not supported in current plan. Please contact team@sourcebot.dev for assistance.');
Expand Down Expand Up @@ -99,6 +102,7 @@ const listenToShutdownSignals = () => {
await repoPermissionSyncer.dispose()
await accountPermissionSyncer.dispose()
await auditLogPruner.dispose()
await attachmentPruner.dispose()
await configManager.dispose()

await prisma.$disconnect();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
-- CreateEnum
CREATE TYPE "AttachmentStatus" AS ENUM ('PENDING', 'COMMITTED');

-- CreateTable
CREATE TABLE "Attachment" (
"id" TEXT NOT NULL,
"orgId" INTEGER NOT NULL,
"storageKey" TEXT NOT NULL,
"filename" TEXT NOT NULL,
"mediaType" TEXT NOT NULL,
"sizeBytes" INTEGER NOT NULL,
"checksum" TEXT NOT NULL,
"uploadedById" TEXT,
"status" "AttachmentStatus" NOT NULL DEFAULT 'PENDING',
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

CONSTRAINT "Attachment_pkey" PRIMARY KEY ("id")
);

-- CreateTable
CREATE TABLE "ChatAttachment" (
"id" TEXT NOT NULL,
"chatId" TEXT NOT NULL,
"attachmentId" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

CONSTRAINT "ChatAttachment_pkey" PRIMARY KEY ("id")
);

-- CreateIndex
CREATE INDEX "Attachment_status_createdAt_idx" ON "Attachment"("status", "createdAt");

-- CreateIndex
CREATE INDEX "ChatAttachment_attachmentId_idx" ON "ChatAttachment"("attachmentId");

-- CreateIndex
CREATE UNIQUE INDEX "ChatAttachment_chatId_attachmentId_key" ON "ChatAttachment"("chatId", "attachmentId");

-- AddForeignKey
ALTER TABLE "Attachment" ADD CONSTRAINT "Attachment_orgId_fkey" FOREIGN KEY ("orgId") REFERENCES "Org"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "Attachment" ADD CONSTRAINT "Attachment_uploadedById_fkey" FOREIGN KEY ("uploadedById") REFERENCES "User"("id") ON DELETE SET NULL ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "ChatAttachment" ADD CONSTRAINT "ChatAttachment_chatId_fkey" FOREIGN KEY ("chatId") REFERENCES "Chat"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "ChatAttachment" ADD CONSTRAINT "ChatAttachment_attachmentId_fkey" FOREIGN KEY ("attachmentId") REFERENCES "Attachment"("id") ON DELETE CASCADE ON UPDATE CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterEnum
ALTER TYPE "AttachmentStatus" ADD VALUE 'DELETING';
79 changes: 79 additions & 0 deletions packages/db/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ enum ChatVisibility {
PUBLIC
}

/// Lifecycle status of an uploaded attachment blob.
/// PENDING: uploaded but not yet linked to a chat (orphan until a message
/// referencing it is sent). COMMITTED: linked to at least one chat.
/// DELETING: condemned tombstone. The row is kept (as the only durable handle
/// to the bytes) until the stored bytes are confirmed deleted, at which point
/// the row is removed. A failed byte delete leaves the row DELETING for the
/// attachment pruner's reclaim sweep to retry, so a transient storage error
/// can never orphan bytes.
enum AttachmentStatus {
PENDING
COMMITTED
DELETING
}

/// @note: The @map annotation is required to maintain backwards compatibility
/// with the existing database.
/// @note: In the generated client, these mapped values will be in pascalCase.
Expand Down Expand Up @@ -272,6 +286,7 @@ model Org {
connections Connection[]
repos Repo[]
apiKeys ApiKey[]
attachments Attachment[]
isOnboarded Boolean @default(false)
imageUrl String?

Expand Down Expand Up @@ -456,6 +471,7 @@ model User {
chats Chat[]
sharedChats ChatAccess[]
repoVisits RepoVisit[]
uploadedAttachments Attachment[]

oauthTokens OAuthToken[]
oauthAuthCodes OAuthAuthorizationCode[]
Expand Down Expand Up @@ -608,6 +624,69 @@ model Chat {
messages Json // This is a JSON array of `Message` types from @ai-sdk/ui-utils.

sharedWith ChatAccess[]

attachments ChatAttachment[]
}

/// A user-uploaded binary attachment blob (e.g. an image). The bytes live in
/// the configured StorageBackend (keyed by `storageKey`), never in the DB.
/// Attachments are NOT chat-bound: they are uploaded before any chat
/// association exists, and linked to chats via `ChatAttachment`. Permissions
/// are derived entirely from the linked chat(s); there are no independent ACLs.
model Attachment {
id String @id @default(cuid())

org Org @relation(fields: [orgId], references: [id], onDelete: Cascade)
orgId Int

/// Opaque key the StorageBackend uses to locate the bytes.
storageKey String

/// Original (sanitized) filename supplied by the uploader.
filename String

/// Final media type of the stored bytes (validated by decoding at upload).
mediaType String

/// Size of the stored bytes.
sizeBytes Int

/// Hex SHA-256 of the stored bytes (integrity / debugging; not used for dedup).
checksum String

/// The user who uploaded this blob. Uploads require authentication, so this
/// is set at creation (anonymous users cannot upload binary attachments). It
/// is nulled if the uploader is later deleted, so committed attachments
/// survive on the chats they're linked to.
uploadedBy User? @relation(fields: [uploadedById], references: [id], onDelete: SetNull)
uploadedById String?

status AttachmentStatus @default(PENDING)

createdAt DateTime @default(now())

chats ChatAttachment[]

@@index([status, createdAt])
}

/// Join table linking an `Attachment` blob to a `Chat`. This is the linker
/// that makes chat duplication metadata-only (no byte copy) and keeps
/// attachment access purely chat-derived. Deleting a chat cascades these rows;
/// a separate sweep deletes `Attachment`s left with zero links (and their bytes).
model ChatAttachment {
id String @id @default(cuid())

chat Chat @relation(fields: [chatId], references: [id], onDelete: Cascade)
chatId String

attachment Attachment @relation(fields: [attachmentId], references: [id], onDelete: Cascade)
attachmentId String

createdAt DateTime @default(now())

@@unique([chatId, attachmentId])
@@index([attachmentId])
}

/// Represents a user's access to a chat that has been shared with them.
Expand Down
17 changes: 17 additions & 0 deletions packages/shared/src/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,23 @@ const options = {
SOURCEBOT_CHAT_PROMPT_CACHE_BREAK_DETECTION_ENABLED: booleanSchema.default('false'),
SOURCEBOT_MCP_TOOL_CALL_TIMEOUT_MS: numberSchema.int().positive().max(maxTimerDelayMs).default(60000),

/**
* Maximum size (in bytes) of a single image attachment uploaded to the
* Ask chat. Enforced server-side at upload time. Distinct from the
* inline-text cap (which lives as a web-package constant).
* @default 10 MiB
*/
SOURCEBOT_CHAT_ATTACHMENT_MAX_IMAGE_BYTES: numberSchema.int().positive().default(10 * 1024 * 1024),

/**
* How long (in hours) an uploaded-but-unlinked (PENDING) attachment
* blob is retained before the orphan sweep deletes it and its bytes.
* Covers "select a file then never send" abandonment. Set to 0 to
* disable the orphan sweep entirely.
* @default 24 hours
*/
SOURCEBOT_CHAT_ATTACHMENT_ORPHAN_TTL_HOURS: numberSchema.int().nonnegative().default(24),

DEBUG_WRITE_CHAT_MESSAGES_TO_FILE: booleanSchema.default('false'),
DEBUG_ENABLE_REACT_SCAN: booleanSchema.default('false'),
DEBUG_ENABLE_REACT_GRAB: booleanSchema.default('false'),
Expand Down
Loading
Loading