/** * Intake routes — URL intake pipeline + rules CRUD + job status. */ import type { FastifyInstance } from 'fastify'; import { randomUUID } from 'node:crypto'; import { getUserId, getRequestProductId } from '../../lib/request-context.js'; import { BadRequestError, NotFoundError } from '@bytelyst/errors'; import { isFeatureEnabled } from '../../lib/feature-flags.js'; import { trackEvent } from '../../lib/telemetry.js'; import { PRODUCT_ID } from '../../lib/product-config.js'; import { classifyUrl } from './url-classifier.js'; import { extractContent } from './extractors.js'; import { getBuiltinIntakeRules } from './seed-rules.js'; import * as repo from './repository.js'; import * as noteRepo from '../notes/repository.js'; import { executePrompt } from '../note-prompts/runner.js'; import * as promptRepo from '../note-prompts/repository.js'; import { stripHtmlForEmbedding } from '../../lib/embeddings.js'; import { IntakeRequestSchema, CreateIntakeRuleSchema, UpdateIntakeRuleSchema, ListIntakeJobsQuerySchema, } from './types.js'; import type { IntakeRuleDoc, IntakeJobStatus } from './types.js'; // ── Rate limiter (simple in-memory) ────────────────────────────── const rateLimitMap = new Map(); const RATE_LIMIT_WINDOW_MS = 3600_000; const RATE_LIMIT_MAX = 20; function checkRateLimit(userId: string): void { const now = Date.now(); const timestamps = rateLimitMap.get(userId) ?? []; const recent = timestamps.filter((t) => now - t < RATE_LIMIT_WINDOW_MS); if (recent.length >= RATE_LIMIT_MAX) { throw new BadRequestError(`Rate limit exceeded: max ${RATE_LIMIT_MAX} intakes per hour`); } recent.push(now); rateLimitMap.set(userId, recent); } // ── Helpers ────────────────────────────────────────────────────── async function matchIntakeRule( url: string, userId: string, productId: string, ): Promise { const rules = await repo.listIntakeRules(userId, productId); // Also include built-in rules that may not be persisted yet const builtinRules = getBuiltinIntakeRules().map((r) => ({ ...r, createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), })); const allRules = [...rules]; // Add built-in rules not already in the list for (const br of builtinRules) { if (!allRules.some((r) => r.id === br.id)) { allRules.push(br); } } // Sort by priority (lower = higher priority) allRules.sort((a, b) => a.priority - b.priority); for (const rule of allRules) { if (!rule.enabled) continue; try { const regex = new RegExp(rule.urlPattern, 'i'); if (regex.test(url)) return rule; } catch { // Invalid regex in rule — skip } } return null; } async function runIntakeBackground( jobId: string, userId: string, url: string, contentType: string, templateSlug: string, noteId: string, workspaceId: string, ): Promise { try { // Phase 1: Extract content await repo.updateIntakeJob(jobId, userId, { status: 'extracting' }); const extracted = await extractContent(url, contentType as import('./types.js').IntakeContentType); await repo.updateIntakeJob(jobId, userId, { status: 'processing', extractedText: extracted.text.slice(0, 10_000), }); // Phase 2: Run prompt template const template = await promptRepo.getPromptTemplate(templateSlug, userId); if (!template) { // Template not found — save raw extracted content to note await noteRepo.updateNote(noteId, workspaceId, { title: extracted.title || url, body: `

${extracted.text.replace(/\n/g, '

')}

`, status: 'active', }); await repo.updateIntakeJob(jobId, userId, { status: 'complete', completedAt: new Date().toISOString(), }); return; } const result = await executePrompt(template, { templateId: templateSlug, noteId, workspaceId, }, extracted.text); // Phase 3: Update note with LLM result await noteRepo.updateNote(noteId, workspaceId, { title: extracted.title || url, body: `

${result.content.replace(/\n/g, '

')}

`, status: 'active', }); await repo.updateIntakeJob(jobId, userId, { status: 'complete', completedAt: new Date().toISOString(), }); trackEvent('intake_job_completed', userId, { contentType, templateSlug, url: new URL(url).hostname, }); } catch (err) { const errorMsg = err instanceof Error ? err.message : 'Unknown error'; await repo.updateIntakeJob(jobId, userId, { status: 'failed', error: errorMsg, completedAt: new Date().toISOString(), }).catch(() => {}); trackEvent('intake_job_failed', userId, { contentType, error: errorMsg }); } } // ── Route Plugin ───────────────────────────────────────────────── export async function intakeRoutes(app: FastifyInstance): Promise { // ── POST /intake — main intake endpoint ──────────────────────── app.post('/intake', async (req, reply) => { const userId = getUserId(req); const productId = getRequestProductId(req); if (!isFeatureEnabled('notelett_intake_enabled')) { throw new BadRequestError('Intake feature is not enabled'); } checkRateLimit(userId); const parsed = IntakeRequestSchema.safeParse(req.body); if (!parsed.success) { throw new BadRequestError(parsed.error.issues.map((i: { message: string }) => i.message).join('; ')); } const input = parsed.data; const workspaceId = input.workspaceId || 'default'; // Classify URL const classification = classifyUrl(input.url); // Match intake rule const rule = await matchIntakeRule(input.url, userId, productId); const templateSlug = input.templateOverride || rule?.templateId || 'article-summary'; // Create draft note const noteId = `note_intake_${randomUUID().replace(/-/g, '').slice(0, 12)}`; const now = new Date().toISOString(); await noteRepo.createNote({ id: noteId, productId: PRODUCT_ID, workspaceId, userId, title: `Processing: ${new URL(input.url).hostname}`, body: '

Processing URL content...

', status: 'draft', tags: ['intake', classification.contentType], links: [], sourceType: 'intake', sourceUri: input.url, createdAt: now, updatedAt: now, createdBy: userId, updatedBy: userId, agentId: 'intake-pipeline', }); // Create intake job const jobId = `job_${randomUUID().replace(/-/g, '').slice(0, 12)}`; await repo.createIntakeJob({ id: jobId, productId: PRODUCT_ID, userId, workspaceId, noteId, ruleId: rule?.id || '__auto__', url: input.url, contentType: classification.contentType, templateSlug, status: 'queued', startedAt: now, }); // Fire background processing (no await) setImmediate(() => { void runIntakeBackground( jobId, userId, input.url, classification.contentType, templateSlug, noteId, workspaceId, ); }); trackEvent('intake_submitted', userId, { contentType: classification.contentType, templateSlug, domain: new URL(input.url).hostname, }); reply.code(202); return { jobId, noteId, contentType: classification.contentType, ruleMatched: rule?.name || null, templateSlug, status: 'queued', }; }); // ── GET /intake/jobs — list intake jobs ──────────────────────── app.get('/intake/jobs', async (req) => { const userId = getUserId(req); const productId = getRequestProductId(req); const query = ListIntakeJobsQuerySchema.parse(req.query); // Support comma-separated statuses (e.g. "queued,extracting,processing") const statuses = query.status ? query.status.split(',').map((s) => s.trim()).filter(Boolean) as IntakeJobStatus[] : undefined; const jobs = await repo.listIntakeJobs(userId, productId, { statuses, since: query.since, limit: query.limit, offset: query.offset, }); return { items: jobs, total: jobs.length }; }); // ── GET /intake/jobs/:id — single job status ────────────────── app.get('/intake/jobs/:id', async (req) => { const userId = getUserId(req); const { id } = req.params as { id: string }; const job = await repo.getIntakeJob(id, userId); if (!job) throw new NotFoundError('Intake job not found'); return job; }); // ── Intake Rules CRUD ───────────────────────────────────────── app.get('/intake-rules', async (req) => { const userId = getUserId(req); const productId = getRequestProductId(req); const rules = await repo.listIntakeRules(userId, productId); // Merge in built-in rules that aren't persisted const builtinRules = getBuiltinIntakeRules().map((r) => ({ ...r, createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), })); const all = [...rules]; for (const br of builtinRules) { if (!all.some((r) => r.id === br.id)) { all.push(br); } } all.sort((a, b) => a.priority - b.priority); return { items: all, total: all.length }; }); app.post('/intake-rules', async (req, reply) => { const userId = getUserId(req); const productId = getRequestProductId(req); const input = CreateIntakeRuleSchema.parse(req.body); const now = new Date().toISOString(); // Validate regex try { new RegExp(input.urlPattern); } catch { throw new BadRequestError('Invalid URL pattern (must be valid regex)'); } const doc: IntakeRuleDoc = { id: `rule_${randomUUID().replace(/-/g, '').slice(0, 12)}`, productId, userId, ...input, createdAt: now, updatedAt: now, }; const created = await repo.createIntakeRule(doc); reply.code(201); return created; }); app.patch('/intake-rules/:id', async (req) => { const userId = getUserId(req); const { id } = req.params as { id: string }; const existing = await repo.getIntakeRule(id, userId); if (!existing) throw new NotFoundError('Intake rule not found'); if (existing.userId === '__builtin__') { throw new BadRequestError('Cannot modify built-in intake rules'); } const updates = UpdateIntakeRuleSchema.parse(req.body); if (updates.urlPattern) { try { new RegExp(updates.urlPattern); } catch { throw new BadRequestError('Invalid URL pattern'); } } return repo.updateIntakeRule(id, userId, { ...updates, updatedAt: new Date().toISOString(), }); }); app.delete('/intake-rules/:id', async (req, reply) => { const userId = getUserId(req); const { id } = req.params as { id: string }; const existing = await repo.getIntakeRule(id, userId); if (!existing) throw new NotFoundError('Intake rule not found'); if (existing.userId === '__builtin__') { throw new BadRequestError('Cannot delete built-in intake rules'); } await repo.deleteIntakeRule(id, userId); reply.code(204); }); }