feat(intake): add URL intake pipeline — classifier, extractors, rules engine, routes, 6 new prompt templates (27 total), 26 new tests

This commit is contained in:
saravanakumardb1 2026-04-06 20:28:36 -07:00
parent 6f262a5218
commit 0e16714da1
14 changed files with 1228 additions and 3 deletions

View File

@ -15,6 +15,8 @@ const CONTAINER_DEFS: Record<string, ContainerConfig> = {
note_prompt_webhooks: { partitionKeyPath: '/userId' },
note_shares: { partitionKeyPath: '/workspaceId' },
note_versions: { partitionKeyPath: '/workspaceId' },
note_intake_rules: { partitionKeyPath: '/userId' },
note_intake_jobs: { partitionKeyPath: '/userId' },
};
export async function initCosmosIfNeeded(): Promise<void> {

View File

@ -24,6 +24,10 @@ const registry = createFlagRegistry({
'notelett_voice_capture_enabled': false,
'notelett_scheduled_actions_enabled': false,
'notelett_webhooks_enabled': false,
// Intake pipeline feature flags
'notelett_intake_enabled': true,
'notelett_collaborative_sharing_enabled': false,
'notelett_push_notifications_enabled': false,
},
enabled: config.FEATURE_FLAGS_ENABLED,
});

View File

@ -0,0 +1,194 @@
/**
* Content extractors platform-specific strategies for fetching text from URLs.
*/
import type { IntakeContentType } from './types.js';
export interface ExtractionResult {
title: string;
text: string;
metadata?: Record<string, string>;
}
const FETCH_TIMEOUT = 15_000;
const MAX_TEXT_LENGTH = 10_000;
/**
* Extract content from a URL based on its content type.
*/
export async function extractContent(url: string, contentType: IntakeContentType): Promise<ExtractionResult> {
switch (contentType) {
case 'youtube':
return extractYouTube(url);
case 'tweet':
return extractTweet(url);
case 'pdf':
return extractPdf(url);
default:
return extractArticle(url);
}
}
/**
* YouTube: parse meta tags for title/description, attempt oEmbed for structured data.
* Full transcript extraction requires yt-dlp or YouTube Data API (future upgrade).
*/
async function extractYouTube(url: string): Promise<ExtractionResult> {
const videoId = extractYouTubeId(url);
// Try oEmbed first for structured metadata
try {
const oembedUrl = `https://www.youtube.com/oembed?url=${encodeURIComponent(url)}&format=json`;
const res = await fetch(oembedUrl, { signal: AbortSignal.timeout(FETCH_TIMEOUT) });
if (res.ok) {
const data = await res.json() as { title?: string; author_name?: string };
const title = data.title || url;
const author = data.author_name || 'Unknown';
// Fetch the page for meta description
const pageText = await fetchAndStripHtml(url);
const description = extractMetaDescription(pageText.rawHtml) || pageText.text.slice(0, 2000);
return {
title,
text: `# ${title}\n\nBy: ${author}\nVideo ID: ${videoId || 'unknown'}\n\n${description}`,
metadata: { videoId: videoId || '', author },
};
}
} catch {
// Fall through to HTML extraction
}
// Fallback: fetch page and extract meta tags
const page = await fetchAndStripHtml(url);
const title = extractMetaTitle(page.rawHtml) || url;
const description = extractMetaDescription(page.rawHtml) || page.text.slice(0, 2000);
return {
title,
text: `# ${title}\n\n${description}`,
metadata: { videoId: videoId || '' },
};
}
/**
* Tweet: use Twitter/X oEmbed API for tweet text.
*/
async function extractTweet(url: string): Promise<ExtractionResult> {
try {
const oembedUrl = `https://publish.twitter.com/oembed?url=${encodeURIComponent(url)}`;
const res = await fetch(oembedUrl, { signal: AbortSignal.timeout(FETCH_TIMEOUT) });
if (res.ok) {
const data = await res.json() as { html?: string; author_name?: string };
const author = data.author_name || 'Unknown';
const tweetHtml = data.html || '';
const text = tweetHtml
.replace(/<[^>]*>/g, ' ')
.replace(/\s+/g, ' ')
.trim();
return {
title: `Tweet by @${author}`,
text: `Tweet by @${author}:\n\n${text}`,
metadata: { author },
};
}
} catch {
// Fall through
}
// Fallback: fetch page HTML
const page = await fetchAndStripHtml(url);
return {
title: extractMetaTitle(page.rawHtml) || 'Tweet',
text: page.text,
};
}
/**
* PDF: attempt to fetch and note that extraction-service is needed for full text.
* Basic: fetch headers to confirm PDF, return placeholder.
*/
async function extractPdf(url: string): Promise<ExtractionResult> {
try {
const res = await fetch(url, {
method: 'HEAD',
signal: AbortSignal.timeout(FETCH_TIMEOUT),
});
const ct = res.headers.get('content-type') || '';
const filename = url.split('/').pop()?.split('?')[0] || 'document.pdf';
if (ct.includes('pdf') || filename.endsWith('.pdf')) {
return {
title: filename,
text: `PDF document: ${filename}\n\nURL: ${url}\n\nNote: Full PDF text extraction requires the extraction-service (port 4005). This note contains the document metadata only.`,
metadata: { filename, contentType: ct },
};
}
} catch {
// Fall through
}
return {
title: 'PDF Document',
text: `PDF URL: ${url}\n\nCould not verify PDF content. The URL may require authentication or may not be accessible.`,
};
}
/**
* Generic article extraction: fetch HTML, strip tags, extract text.
* Shared logic also used as fallback for other content types.
*/
async function extractArticle(url: string): Promise<ExtractionResult> {
const page = await fetchAndStripHtml(url);
const title = extractMetaTitle(page.rawHtml) || url;
return {
title,
text: page.text,
};
}
// ── Shared Helpers ───────────────────────────────────────────────
async function fetchAndStripHtml(url: string): Promise<{ text: string; rawHtml: string }> {
const response = await fetch(url, {
headers: { 'User-Agent': 'NoteLett/1.0 (URL-to-note extraction)' },
signal: AbortSignal.timeout(FETCH_TIMEOUT),
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const rawHtml = await response.text();
const text = rawHtml
.replace(/<script[^>]*>[\s\S]*?<\/script>/gi, '')
.replace(/<style[^>]*>[\s\S]*?<\/style>/gi, '')
.replace(/<nav[^>]*>[\s\S]*?<\/nav>/gi, '')
.replace(/<footer[^>]*>[\s\S]*?<\/footer>/gi, '')
.replace(/<header[^>]*>[\s\S]*?<\/header>/gi, '')
.replace(/<[^>]*>/g, ' ')
.replace(/\s+/g, ' ')
.trim()
.slice(0, MAX_TEXT_LENGTH);
return { text, rawHtml: rawHtml.slice(0, 50_000) };
}
function extractMetaTitle(html: string): string | null {
const ogMatch = html.match(/<meta[^>]+property=["']og:title["'][^>]+content=["']([^"']+)["']/i);
if (ogMatch) return ogMatch[1];
const titleMatch = html.match(/<title[^>]*>([^<]+)<\/title>/i);
if (titleMatch) return titleMatch[1].trim();
return null;
}
function extractMetaDescription(html: string): string | null {
const ogMatch = html.match(/<meta[^>]+property=["']og:description["'][^>]+content=["']([^"']+)["']/i);
if (ogMatch) return ogMatch[1];
const descMatch = html.match(/<meta[^>]+name=["']description["'][^>]+content=["']([^"']+)["']/i);
if (descMatch) return descMatch[1];
return null;
}
function extractYouTubeId(url: string): string | null {
const match = url.match(/(?:v=|youtu\.be\/|\/shorts\/)([a-zA-Z0-9_-]{11})/);
return match ? match[1] : null;
}

View File

@ -0,0 +1,77 @@
import { getCollection } from '../../lib/datastore.js';
import type { FilterMap } from '@bytelyst/datastore';
import type { IntakeRuleDoc, IntakeJobDoc, IntakeJobStatus } from './types.js';
// ── Intake Rules ─────────────────────────────────────────────────
function rulesCollection() {
return getCollection<IntakeRuleDoc>('note_intake_rules', '/userId');
}
export async function createIntakeRule(doc: IntakeRuleDoc): Promise<IntakeRuleDoc> {
return rulesCollection().create(doc);
}
export async function getIntakeRule(id: string, userId: string): Promise<IntakeRuleDoc | null> {
return rulesCollection().findById(id, userId);
}
export async function listIntakeRules(
userId: string,
productId: string,
): Promise<IntakeRuleDoc[]> {
const filter: FilterMap = { productId };
// Fetch both user rules and built-in rules
const userRules = await rulesCollection().findMany({ filter: { ...filter, userId }, sort: { priority: 1 }, limit: 100, offset: 0 });
const builtinRules = await rulesCollection().findMany({ filter: { ...filter, userId: '__builtin__' }, sort: { priority: 1 }, limit: 100, offset: 0 });
return [...userRules, ...builtinRules];
}
export async function updateIntakeRule(
id: string,
userId: string,
updates: Partial<IntakeRuleDoc>,
): Promise<IntakeRuleDoc> {
return rulesCollection().update(id, userId, updates);
}
export async function deleteIntakeRule(id: string, userId: string): Promise<void> {
await rulesCollection().delete(id, userId);
}
// ── Intake Jobs ──────────────────────────────────────────────────
function jobsCollection() {
return getCollection<IntakeJobDoc>('note_intake_jobs', '/userId');
}
export async function createIntakeJob(doc: IntakeJobDoc): Promise<IntakeJobDoc> {
return jobsCollection().create(doc);
}
export async function getIntakeJob(id: string, userId: string): Promise<IntakeJobDoc | null> {
return jobsCollection().findById(id, userId);
}
export async function listIntakeJobs(
userId: string,
productId: string,
options?: { status?: IntakeJobStatus; since?: string; limit?: number; offset?: number },
): Promise<IntakeJobDoc[]> {
const filter: FilterMap = { userId, productId };
if (options?.status) filter.status = options.status;
return jobsCollection().findMany({
filter,
sort: { startedAt: -1 },
limit: options?.limit ?? 20,
offset: options?.offset ?? 0,
});
}
export async function updateIntakeJob(
id: string,
userId: string,
updates: Partial<IntakeJobDoc>,
): Promise<IntakeJobDoc> {
return jobsCollection().update(id, userId, updates);
}

View File

@ -0,0 +1,264 @@
import { describe, expect, it, vi, beforeEach } from 'vitest';
vi.mock('../../lib/request-context.js', () => ({
getUserId: vi.fn(() => 'user_1'),
getRequestProductId: vi.fn(() => 'notelett'),
}));
vi.mock('../../lib/feature-flags.js', () => ({
isFeatureEnabled: vi.fn(() => true),
}));
vi.mock('../../lib/telemetry.js', () => ({
trackEvent: vi.fn(),
}));
vi.mock('../../lib/product-config.js', () => ({
PRODUCT_ID: 'notelett',
}));
vi.mock('../../lib/embeddings.js', () => ({
stripHtmlForEmbedding: vi.fn((s: string) => s),
}));
vi.mock('../../lib/llm.js', () => ({
llm: vi.fn(() => ({
isConfigured: () => true,
chatCompletion: vi.fn(async () => ({
content: 'Summary of the page',
model: 'test-model',
usage: { promptTokens: 10, completionTokens: 20, totalTokens: 30 },
})),
})),
}));
const createNoteMock = vi.fn(async (doc: Record<string, unknown>) => doc);
const updateNoteMock = vi.fn(async (_id: string, _ws: string, updates: Record<string, unknown>) => updates);
vi.mock('../notes/repository.js', () => ({
createNote: (...args: unknown[]) => createNoteMock(...args as [Record<string, unknown>]),
updateNote: (...args: unknown[]) => updateNoteMock(...args as [string, string, Record<string, unknown>]),
getNote: vi.fn(async () => null),
}));
const createIntakeRuleMock = vi.fn(async (doc: Record<string, unknown>) => doc);
const getIntakeRuleMock = vi.fn(async () => null);
const listIntakeRulesMock = vi.fn(async () => []);
const updateIntakeRuleMock = vi.fn(async (_id: string, _uid: string, updates: Record<string, unknown>) => updates);
const deleteIntakeRuleMock = vi.fn(async () => undefined);
const createIntakeJobMock = vi.fn(async (doc: Record<string, unknown>) => doc);
const getIntakeJobMock = vi.fn(async () => null);
const listIntakeJobsMock = vi.fn(async () => []);
const updateIntakeJobMock = vi.fn(async (_id: string, _uid: string, updates: Record<string, unknown>) => updates);
vi.mock('./repository.js', () => ({
createIntakeRule: (...args: unknown[]) => createIntakeRuleMock(...args as [Record<string, unknown>]),
getIntakeRule: (...args: unknown[]) => getIntakeRuleMock(...args as [string, string]),
listIntakeRules: (...args: unknown[]) => listIntakeRulesMock(...args as [string, string]),
updateIntakeRule: (...args: unknown[]) => updateIntakeRuleMock(...args as [string, string, Record<string, unknown>]),
deleteIntakeRule: (...args: unknown[]) => deleteIntakeRuleMock(...args as [string, string]),
createIntakeJob: (...args: unknown[]) => createIntakeJobMock(...args as [Record<string, unknown>]),
getIntakeJob: (...args: unknown[]) => getIntakeJobMock(...args as [string, string]),
listIntakeJobs: (...args: unknown[]) => listIntakeJobsMock(...args as [string, string]),
updateIntakeJob: (...args: unknown[]) => updateIntakeJobMock(...args as [string, string, Record<string, unknown>]),
}));
vi.mock('../note-prompts/runner.js', () => ({
executePrompt: vi.fn(async () => ({
content: 'AI summary',
model: 'test-model',
usage: { promptTokens: 10, completionTokens: 20, totalTokens: 30 },
templateSlug: 'article-summary',
outputType: 'new_note',
approvalState: 'applied',
})),
}));
vi.mock('../note-prompts/repository.js', () => ({
getPromptTemplate: vi.fn(async () => null),
}));
vi.mock('./extractors.js', () => ({
extractContent: vi.fn(async () => ({
title: 'Test Article',
text: 'Extracted content from the page.',
})),
}));
import { buildTestApp } from '../../test-helpers.js';
import { intakeRoutes } from './routes.js';
async function buildApp() {
return buildTestApp(intakeRoutes);
}
describe('intake routes', () => {
beforeEach(() => {
vi.clearAllMocks();
createNoteMock.mockImplementation(async (doc: Record<string, unknown>) => doc);
createIntakeJobMock.mockImplementation(async (doc: Record<string, unknown>) => doc);
listIntakeRulesMock.mockResolvedValue([]);
listIntakeJobsMock.mockResolvedValue([]);
});
describe('POST /intake', () => {
it('creates a draft note and job for a valid URL', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/intake',
payload: { url: 'https://blog.example.com/my-post' },
});
expect(res.statusCode).toBe(202);
const body = res.json();
expect(body.status).toBe('queued');
expect(body.contentType).toBe('article');
expect(body.noteId).toBeDefined();
expect(body.jobId).toBeDefined();
expect(createNoteMock).toHaveBeenCalledOnce();
expect(createIntakeJobMock).toHaveBeenCalledOnce();
});
it('classifies YouTube URLs', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/intake',
payload: { url: 'https://www.youtube.com/watch?v=abc123' },
});
expect(res.statusCode).toBe(202);
expect(res.json().contentType).toBe('youtube');
});
it('rejects invalid URLs', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/intake',
payload: { url: 'not-a-url' },
});
expect(res.statusCode).toBe(400);
});
it('accepts optional workspaceId', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/intake',
payload: { url: 'https://example.com/article', workspaceId: 'ws-1' },
});
expect(res.statusCode).toBe(202);
const noteArg = createNoteMock.mock.calls[0]?.[0] as Record<string, unknown> | undefined;
expect(noteArg?.workspaceId).toBe('ws-1');
});
it('accepts template override', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/intake',
payload: { url: 'https://example.com', templateOverride: 'summarize' },
});
expect(res.statusCode).toBe(202);
expect(res.json().templateSlug).toBe('summarize');
});
});
describe('GET /intake/jobs', () => {
it('returns job list', async () => {
listIntakeJobsMock.mockResolvedValueOnce([
{ id: 'job_1', status: 'complete', url: 'https://example.com' },
]);
const app = await buildApp();
const res = await app.inject({ method: 'GET', url: '/api/intake/jobs' });
expect(res.statusCode).toBe(200);
expect(res.json().items).toHaveLength(1);
});
});
describe('GET /intake/jobs/:id', () => {
it('returns 404 for missing job', async () => {
getIntakeJobMock.mockResolvedValueOnce(null);
const app = await buildApp();
const res = await app.inject({ method: 'GET', url: '/api/intake/jobs/missing' });
expect(res.statusCode).toBe(404);
});
it('returns job when found', async () => {
getIntakeJobMock.mockResolvedValueOnce({ id: 'job_1', status: 'complete' });
const app = await buildApp();
const res = await app.inject({ method: 'GET', url: '/api/intake/jobs/job_1' });
expect(res.statusCode).toBe(200);
expect(res.json().id).toBe('job_1');
});
});
describe('GET /intake-rules', () => {
it('returns rules including built-ins', async () => {
const app = await buildApp();
const res = await app.inject({ method: 'GET', url: '/api/intake-rules' });
expect(res.statusCode).toBe(200);
// Should include built-in rules even with empty DB
expect(res.json().items.length).toBeGreaterThan(0);
});
});
describe('POST /intake-rules', () => {
it('creates a custom rule', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/intake-rules',
payload: {
workspaceId: 'ws-1',
name: 'My Rule',
urlPattern: 'mysite\\.com',
contentType: 'article',
templateId: 'summarize',
},
});
expect(res.statusCode).toBe(201);
expect(createIntakeRuleMock).toHaveBeenCalledOnce();
});
it('rejects invalid regex patterns', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/intake-rules',
payload: {
workspaceId: 'ws-1',
name: 'Bad Rule',
urlPattern: '[invalid',
contentType: 'article',
templateId: 'summarize',
},
});
expect(res.statusCode).toBe(400);
});
});
describe('DELETE /intake-rules/:id', () => {
it('prevents deleting built-in rules', async () => {
getIntakeRuleMock.mockResolvedValueOnce({ id: 'builtin-intake-youtube', userId: '__builtin__' });
const app = await buildApp();
const res = await app.inject({ method: 'DELETE', url: '/api/intake-rules/builtin-intake-youtube' });
expect(res.statusCode).toBe(400);
expect(deleteIntakeRuleMock).not.toHaveBeenCalled();
});
it('deletes user rules', async () => {
getIntakeRuleMock.mockResolvedValueOnce({ id: 'rule_abc', userId: 'user_1' });
const app = await buildApp();
const res = await app.inject({ method: 'DELETE', url: '/api/intake-rules/rule_abc' });
expect(res.statusCode).toBe(204);
expect(deleteIntakeRuleMock).toHaveBeenCalledOnce();
});
});
});

View File

@ -0,0 +1,353 @@
/**
* Intake routes URL intake pipeline + rules CRUD + job status.
*/
import type { FastifyInstance } from 'fastify';
import { randomUUID } from 'node:crypto';
import { getUserId, getRequestProductId } from '../../lib/request-context.js';
import { BadRequestError, NotFoundError } from '@bytelyst/errors';
import { isFeatureEnabled } from '../../lib/feature-flags.js';
import { trackEvent } from '../../lib/telemetry.js';
import { PRODUCT_ID } from '../../lib/product-config.js';
import { classifyUrl } from './url-classifier.js';
import { extractContent } from './extractors.js';
import { getBuiltinIntakeRules } from './seed-rules.js';
import * as repo from './repository.js';
import * as noteRepo from '../notes/repository.js';
import { executePrompt } from '../note-prompts/runner.js';
import * as promptRepo from '../note-prompts/repository.js';
import { stripHtmlForEmbedding } from '../../lib/embeddings.js';
import {
IntakeRequestSchema,
CreateIntakeRuleSchema,
UpdateIntakeRuleSchema,
ListIntakeJobsQuerySchema,
} from './types.js';
import type { IntakeRuleDoc } from './types.js';
// ── Rate limiter (simple in-memory) ──────────────────────────────
const rateLimitMap = new Map<string, number[]>();
const RATE_LIMIT_WINDOW_MS = 3600_000;
const RATE_LIMIT_MAX = 20;
function checkRateLimit(userId: string): void {
const now = Date.now();
const timestamps = rateLimitMap.get(userId) ?? [];
const recent = timestamps.filter((t) => now - t < RATE_LIMIT_WINDOW_MS);
if (recent.length >= RATE_LIMIT_MAX) {
throw new BadRequestError(`Rate limit exceeded: max ${RATE_LIMIT_MAX} intakes per hour`);
}
recent.push(now);
rateLimitMap.set(userId, recent);
}
// ── Helpers ──────────────────────────────────────────────────────
async function matchIntakeRule(
url: string,
userId: string,
productId: string,
): Promise<IntakeRuleDoc | null> {
const rules = await repo.listIntakeRules(userId, productId);
// Also include built-in rules that may not be persisted yet
const builtinRules = getBuiltinIntakeRules().map((r) => ({
...r,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
}));
const allRules = [...rules];
// Add built-in rules not already in the list
for (const br of builtinRules) {
if (!allRules.some((r) => r.id === br.id)) {
allRules.push(br);
}
}
// Sort by priority (lower = higher priority)
allRules.sort((a, b) => a.priority - b.priority);
for (const rule of allRules) {
if (!rule.enabled) continue;
try {
const regex = new RegExp(rule.urlPattern, 'i');
if (regex.test(url)) return rule;
} catch {
// Invalid regex in rule — skip
}
}
return null;
}
async function runIntakeBackground(
jobId: string,
userId: string,
url: string,
contentType: string,
templateSlug: string,
noteId: string,
workspaceId: string,
): Promise<void> {
try {
// Phase 1: Extract content
await repo.updateIntakeJob(jobId, userId, { status: 'extracting' });
const extracted = await extractContent(url, contentType as import('./types.js').IntakeContentType);
await repo.updateIntakeJob(jobId, userId, {
status: 'processing',
extractedText: extracted.text.slice(0, 10_000),
});
// Phase 2: Run prompt template
const template = await promptRepo.getPromptTemplate(templateSlug, userId);
if (!template) {
// Template not found — save raw extracted content to note
await noteRepo.updateNote(noteId, workspaceId, {
title: extracted.title || url,
body: `<p>${extracted.text.replace(/\n/g, '</p><p>')}</p>`,
status: 'active',
});
await repo.updateIntakeJob(jobId, userId, {
status: 'complete',
completedAt: new Date().toISOString(),
});
return;
}
const result = await executePrompt(template, {
templateId: templateSlug,
noteId,
workspaceId,
}, extracted.text);
// Phase 3: Update note with LLM result
await noteRepo.updateNote(noteId, workspaceId, {
title: extracted.title || url,
body: `<p>${result.content.replace(/\n/g, '</p><p>')}</p>`,
status: 'active',
});
await repo.updateIntakeJob(jobId, userId, {
status: 'complete',
completedAt: new Date().toISOString(),
});
trackEvent('intake_job_completed', userId, {
contentType,
templateSlug,
url: new URL(url).hostname,
});
} catch (err) {
const errorMsg = err instanceof Error ? err.message : 'Unknown error';
await repo.updateIntakeJob(jobId, userId, {
status: 'failed',
error: errorMsg,
completedAt: new Date().toISOString(),
}).catch(() => {});
trackEvent('intake_job_failed', userId, { contentType, error: errorMsg });
}
}
// ── Route Plugin ─────────────────────────────────────────────────
export async function intakeRoutes(app: FastifyInstance): Promise<void> {
// ── POST /intake — main intake endpoint ────────────────────────
app.post('/intake', async (req, reply) => {
const userId = getUserId(req);
const productId = getRequestProductId(req);
if (!isFeatureEnabled('notelett_intake_enabled')) {
throw new BadRequestError('Intake feature is not enabled');
}
checkRateLimit(userId);
const parsed = IntakeRequestSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map((i: { message: string }) => i.message).join('; '));
}
const input = parsed.data;
const workspaceId = input.workspaceId || 'default';
// Classify URL
const classification = classifyUrl(input.url);
// Match intake rule
const rule = await matchIntakeRule(input.url, userId, productId);
const templateSlug = input.templateOverride || rule?.templateId || 'article-summary';
// Create draft note
const noteId = `note_intake_${randomUUID().replace(/-/g, '').slice(0, 12)}`;
const now = new Date().toISOString();
await noteRepo.createNote({
id: noteId,
productId: PRODUCT_ID,
workspaceId,
userId,
title: `Processing: ${new URL(input.url).hostname}`,
body: '<p>Processing URL content...</p>',
status: 'draft',
tags: ['intake', classification.contentType],
links: [],
sourceType: 'intake',
sourceUri: input.url,
createdAt: now,
updatedAt: now,
createdBy: userId,
updatedBy: userId,
agentId: 'intake-pipeline',
});
// Create intake job
const jobId = `job_${randomUUID().replace(/-/g, '').slice(0, 12)}`;
await repo.createIntakeJob({
id: jobId,
productId: PRODUCT_ID,
userId,
workspaceId,
noteId,
ruleId: rule?.id || '__auto__',
url: input.url,
contentType: classification.contentType,
templateSlug,
status: 'queued',
startedAt: now,
});
// Fire background processing (no await)
setImmediate(() => {
void runIntakeBackground(
jobId, userId, input.url,
classification.contentType, templateSlug,
noteId, workspaceId,
);
});
trackEvent('intake_submitted', userId, {
contentType: classification.contentType,
templateSlug,
domain: new URL(input.url).hostname,
});
reply.code(202);
return {
jobId,
noteId,
contentType: classification.contentType,
ruleMatched: rule?.name || null,
templateSlug,
status: 'queued',
};
});
// ── GET /intake/jobs — list intake jobs ────────────────────────
app.get('/intake/jobs', async (req) => {
const userId = getUserId(req);
const productId = getRequestProductId(req);
const query = ListIntakeJobsQuerySchema.parse(req.query);
const jobs = await repo.listIntakeJobs(userId, productId, {
status: query.status,
since: query.since,
limit: query.limit,
offset: query.offset,
});
return { items: jobs, total: jobs.length };
});
// ── GET /intake/jobs/:id — single job status ──────────────────
app.get('/intake/jobs/:id', async (req) => {
const userId = getUserId(req);
const { id } = req.params as { id: string };
const job = await repo.getIntakeJob(id, userId);
if (!job) throw new NotFoundError('Intake job not found');
return job;
});
// ── Intake Rules CRUD ─────────────────────────────────────────
app.get('/intake-rules', async (req) => {
const userId = getUserId(req);
const productId = getRequestProductId(req);
const rules = await repo.listIntakeRules(userId, productId);
// Merge in built-in rules that aren't persisted
const builtinRules = getBuiltinIntakeRules().map((r) => ({
...r,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
}));
const all = [...rules];
for (const br of builtinRules) {
if (!all.some((r) => r.id === br.id)) {
all.push(br);
}
}
all.sort((a, b) => a.priority - b.priority);
return { items: all, total: all.length };
});
app.post('/intake-rules', async (req, reply) => {
const userId = getUserId(req);
const productId = getRequestProductId(req);
const input = CreateIntakeRuleSchema.parse(req.body);
const now = new Date().toISOString();
// Validate regex
try {
new RegExp(input.urlPattern);
} catch {
throw new BadRequestError('Invalid URL pattern (must be valid regex)');
}
const doc: IntakeRuleDoc = {
id: `rule_${randomUUID().replace(/-/g, '').slice(0, 12)}`,
productId,
userId,
...input,
createdAt: now,
updatedAt: now,
};
const created = await repo.createIntakeRule(doc);
reply.code(201);
return created;
});
app.patch('/intake-rules/:id', async (req) => {
const userId = getUserId(req);
const { id } = req.params as { id: string };
const existing = await repo.getIntakeRule(id, userId);
if (!existing) throw new NotFoundError('Intake rule not found');
if (existing.userId === '__builtin__') {
throw new BadRequestError('Cannot modify built-in intake rules');
}
const updates = UpdateIntakeRuleSchema.parse(req.body);
if (updates.urlPattern) {
try { new RegExp(updates.urlPattern); } catch { throw new BadRequestError('Invalid URL pattern'); }
}
return repo.updateIntakeRule(id, userId, {
...updates,
updatedAt: new Date().toISOString(),
});
});
app.delete('/intake-rules/:id', async (req, reply) => {
const userId = getUserId(req);
const { id } = req.params as { id: string };
const existing = await repo.getIntakeRule(id, userId);
if (!existing) throw new NotFoundError('Intake rule not found');
if (existing.userId === '__builtin__') {
throw new BadRequestError('Cannot delete built-in intake rules');
}
await repo.deleteIntakeRule(id, userId);
reply.code(204);
});
}

View File

@ -0,0 +1,73 @@
/**
* Built-in intake rules seeded on startup.
* userId = '__builtin__' is a sentinel for system-owned rules.
*/
import { PRODUCT_ID } from '../../lib/product-config.js';
import type { IntakeRuleDoc } from './types.js';
const BUILTIN_USER = '__builtin__';
const BUILTIN_WORKSPACE = '__all__';
type SeedRule = Omit<IntakeRuleDoc, 'id' | 'productId' | 'userId' | 'workspaceId' | 'createdAt' | 'updatedAt' | '_ts' | '_etag'>;
const RULES: SeedRule[] = [
{
name: 'YouTube Video',
urlPattern: 'youtube\\.com/(?:watch|shorts|live)|youtu\\.be/',
contentType: 'youtube',
templateId: 'youtube-summary',
enabled: true,
priority: 10,
},
{
name: 'Tweet / X Post',
urlPattern: '(?:twitter|x)\\.com/.*/status/',
contentType: 'tweet',
templateId: 'tweet-thread',
enabled: true,
priority: 10,
},
{
name: 'GitHub Repository',
urlPattern: 'github\\.com/[^/]+/[^/]+/?$',
contentType: 'github',
templateId: 'repo-summary',
enabled: true,
priority: 10,
},
{
name: 'PDF Document',
urlPattern: '\\.pdf(\\?.*)?$',
contentType: 'pdf',
templateId: 'pdf-summary',
enabled: true,
priority: 10,
},
{
name: 'Reddit Post',
urlPattern: 'reddit\\.com/r/',
contentType: 'reddit',
templateId: 'article-summary',
enabled: true,
priority: 20,
},
{
name: 'Generic Article',
urlPattern: '.*',
contentType: 'generic',
templateId: 'article-summary',
enabled: true,
priority: 99,
},
];
export function getBuiltinIntakeRules(): Omit<IntakeRuleDoc, 'createdAt' | 'updatedAt' | '_ts' | '_etag'>[] {
return RULES.map((r, idx) => ({
...r,
id: `builtin-intake-${r.contentType}${idx > 4 ? `-${idx}` : ''}`,
productId: PRODUCT_ID,
userId: BUILTIN_USER,
workspaceId: BUILTIN_WORKSPACE,
}));
}

View File

@ -0,0 +1,95 @@
import { z } from 'zod';
// ── Content Types ────────────────────────────────────────────────
export const INTAKE_CONTENT_TYPES = [
'youtube', 'article', 'pdf', 'tweet', 'reddit', 'github', 'generic',
] as const;
export type IntakeContentType = (typeof INTAKE_CONTENT_TYPES)[number];
export const INTAKE_JOB_STATUSES = [
'queued', 'extracting', 'processing', 'complete', 'failed',
] as const;
export type IntakeJobStatus = (typeof INTAKE_JOB_STATUSES)[number];
// ── Intake Rule ──────────────────────────────────────────────────
export interface IntakeRuleDoc {
id: string;
productId: string;
userId: string;
workspaceId: string;
name: string;
urlPattern: string;
contentType: IntakeContentType;
templateId: string;
enabled: boolean;
priority: number;
createdAt: string;
updatedAt: string;
_ts?: number;
_etag?: string;
}
export const CreateIntakeRuleSchema = z.object({
workspaceId: z.string().min(1).max(128),
name: z.string().min(1).max(200),
urlPattern: z.string().min(1).max(1000),
contentType: z.enum(INTAKE_CONTENT_TYPES),
templateId: z.string().min(1).max(128),
enabled: z.boolean().default(true),
priority: z.number().int().min(1).max(999).default(50),
});
export type CreateIntakeRuleInput = z.infer<typeof CreateIntakeRuleSchema>;
export const UpdateIntakeRuleSchema = z.object({
name: z.string().min(1).max(200).optional(),
urlPattern: z.string().min(1).max(1000).optional(),
contentType: z.enum(INTAKE_CONTENT_TYPES).optional(),
templateId: z.string().min(1).max(128).optional(),
enabled: z.boolean().optional(),
priority: z.number().int().min(1).max(999).optional(),
});
export type UpdateIntakeRuleInput = z.infer<typeof UpdateIntakeRuleSchema>;
// ── Intake Job ───────────────────────────────────────────────────
export interface IntakeJobDoc {
id: string;
productId: string;
userId: string;
workspaceId: string;
noteId: string;
ruleId: string;
url: string;
contentType: IntakeContentType;
templateSlug: string;
status: IntakeJobStatus;
extractedText?: string;
error?: string;
startedAt: string;
completedAt?: string;
_ts?: number;
_etag?: string;
}
// ── Intake Request ───────────────────────────────────────────────
export const IntakeRequestSchema = z.object({
url: z.string().url().max(4096),
workspaceId: z.string().min(1).max(128).optional(),
templateOverride: z.string().min(1).max(128).optional(),
});
export type IntakeRequest = z.infer<typeof IntakeRequestSchema>;
export const ListIntakeJobsQuerySchema = z.object({
status: z.enum(INTAKE_JOB_STATUSES).optional(),
since: z.string().max(64).optional(),
limit: z.coerce.number().int().min(1).max(100).default(20),
offset: z.coerce.number().int().min(0).default(0),
});
export type ListIntakeJobsQuery = z.infer<typeof ListIntakeJobsQuerySchema>;

View File

@ -0,0 +1,56 @@
import { describe, expect, it } from 'vitest';
import { classifyUrl } from './url-classifier.js';
describe('classifyUrl', () => {
it('classifies YouTube watch URLs', () => {
expect(classifyUrl('https://www.youtube.com/watch?v=dQw4w9WgXcQ')).toEqual({ contentType: 'youtube', confidence: 'high' });
});
it('classifies YouTube short URLs', () => {
expect(classifyUrl('https://youtu.be/dQw4w9WgXcQ')).toEqual({ contentType: 'youtube', confidence: 'high' });
});
it('classifies YouTube shorts', () => {
expect(classifyUrl('https://youtube.com/shorts/abc123')).toEqual({ contentType: 'youtube', confidence: 'high' });
});
it('classifies YouTube live URLs', () => {
expect(classifyUrl('https://youtube.com/live/xyz789')).toEqual({ contentType: 'youtube', confidence: 'high' });
});
it('classifies Twitter status URLs', () => {
expect(classifyUrl('https://twitter.com/user/status/12345')).toEqual({ contentType: 'tweet', confidence: 'high' });
});
it('classifies X.com status URLs', () => {
expect(classifyUrl('https://x.com/user/status/12345')).toEqual({ contentType: 'tweet', confidence: 'high' });
});
it('classifies GitHub repo URLs', () => {
expect(classifyUrl('https://github.com/owner/repo')).toEqual({ contentType: 'github', confidence: 'high' });
});
it('classifies GitHub repo URLs with trailing slash', () => {
expect(classifyUrl('https://github.com/owner/repo/')).toEqual({ contentType: 'github', confidence: 'high' });
});
it('classifies Reddit URLs', () => {
expect(classifyUrl('https://www.reddit.com/r/typescript/comments/abc')).toEqual({ contentType: 'reddit', confidence: 'high' });
});
it('classifies PDF URLs', () => {
expect(classifyUrl('https://example.com/document.pdf')).toEqual({ contentType: 'pdf', confidence: 'high' });
});
it('classifies PDF URLs with query params', () => {
expect(classifyUrl('https://example.com/doc.pdf?token=abc')).toEqual({ contentType: 'pdf', confidence: 'high' });
});
it('classifies generic article URLs', () => {
expect(classifyUrl('https://blog.example.com/my-post')).toEqual({ contentType: 'article', confidence: 'low' });
});
it('classifies unknown URLs as article', () => {
expect(classifyUrl('https://example.com')).toEqual({ contentType: 'article', confidence: 'low' });
});
});

View File

@ -0,0 +1,30 @@
/**
* URL classifier pure function that determines the content type of a URL.
*/
import type { IntakeContentType } from './types.js';
export interface ClassificationResult {
contentType: IntakeContentType;
confidence: 'high' | 'medium' | 'low';
}
const PATTERNS: Array<{ contentType: IntakeContentType; regex: RegExp; confidence: 'high' | 'medium' }> = [
{ contentType: 'youtube', regex: /(?:youtube\.com\/(?:watch|shorts|live)|youtu\.be\/)/i, confidence: 'high' },
{ contentType: 'tweet', regex: /(?:twitter\.com|x\.com)\/[^/]+\/status\//i, confidence: 'high' },
{ contentType: 'github', regex: /github\.com\/[^/]+\/[^/]+\/?$/i, confidence: 'high' },
{ contentType: 'reddit', regex: /reddit\.com\/r\//i, confidence: 'high' },
{ contentType: 'pdf', regex: /\.pdf(\?.*)?$/i, confidence: 'high' },
];
/**
* Classify a URL into a content type.
*/
export function classifyUrl(url: string): ClassificationResult {
for (const { contentType, regex, confidence } of PATTERNS) {
if (regex.test(url)) {
return { contentType, confidence };
}
}
return { contentType: 'article', confidence: 'low' };
}

View File

@ -272,9 +272,9 @@ describe('reading-time', () => {
});
describe('seed', () => {
it('getBuiltinTemplates returns 21 templates', () => {
it('getBuiltinTemplates returns 27 templates', () => {
const templates = getBuiltinTemplates();
expect(templates.length).toBe(21);
expect(templates.length).toBe(27);
expect(templates.every((t) => t.isBuiltin)).toBe(true);
expect(templates.every((t) => t.id.startsWith('builtin-'))).toBe(true);
});

View File

@ -230,6 +230,80 @@ const TEMPLATES: SeedTemplate[] = [
userPromptTemplate: 'Convert this note into a social media post:\n\n{{noteBody}}',
maxTokens: 512,
},
// ── Intake / URL-specific ──────────────────────────
{
slug: 'youtube-summary',
name: 'YouTube Video Summary',
description: 'Summarize a YouTube video from its title, description, and transcript',
category: 'extract',
inputType: 'text',
outputType: 'new_note',
systemPrompt: 'You summarize YouTube video content. Create a structured summary with: Video title, Key points (bullets), Main takeaways, and a one-paragraph overview.',
userPromptTemplate: 'Summarize this YouTube video content:\n\n{{noteBody}}',
temperature: 0.3,
maxTokens: 2048,
},
{
slug: 'youtube-takeaways',
name: 'Video Takeaways',
description: 'Extract the top actionable takeaways from a video',
category: 'extract',
inputType: 'text',
outputType: 'new_note',
systemPrompt: 'Extract the top 5-10 actionable takeaways from a video transcript or description. Return as a numbered list.',
userPromptTemplate: 'Extract key takeaways from this video:\n\n{{noteBody}}',
temperature: 0.3,
maxTokens: 1024,
},
{
slug: 'article-summary',
name: 'Article Summary',
description: 'Summarize a web article into a structured note',
category: 'extract',
inputType: 'text',
outputType: 'new_note',
systemPrompt: 'Summarize a web article into a structured note. Include: Title, TL;DR (1-2 sentences), Key points, and Notable quotes if any.',
userPromptTemplate: 'Summarize this article:\n\n{{noteBody}}',
temperature: 0.3,
maxTokens: 2048,
},
{
slug: 'tweet-thread',
name: 'Tweet Thread Summary',
description: 'Summarize a Twitter/X thread or post',
category: 'extract',
inputType: 'text',
outputType: 'new_note',
systemPrompt: "Summarize a Twitter/X thread or post. Include: Author's main argument, key claims, and your brief analysis of the discourse.",
userPromptTemplate: 'Summarize this tweet/thread:\n\n{{noteBody}}',
temperature: 0.3,
maxTokens: 1024,
},
{
slug: 'pdf-summary',
name: 'PDF Summary',
description: 'Summarize extracted PDF content',
category: 'extract',
inputType: 'text',
outputType: 'new_note',
systemPrompt: 'Summarize extracted PDF content. Include: Document type, Key sections, Main findings or content, and Action items if applicable.',
userPromptTemplate: 'Summarize this document:\n\n{{noteBody}}',
temperature: 0.3,
maxTokens: 2048,
},
{
slug: 'repo-summary',
name: 'Repo Overview',
description: 'Summarize a GitHub repository',
category: 'extract',
inputType: 'text',
outputType: 'new_note',
systemPrompt: 'Summarize a GitHub repository. Include: What it does, Tech stack, Key features, and Why it\'s notable.',
userPromptTemplate: 'Summarize this GitHub repo:\n\n{{noteBody}}',
temperature: 0.3,
maxTokens: 1024,
},
// ── Scheduled / System ───────────────────────────
{
slug: 'weekly-digest',

View File

@ -38,6 +38,7 @@ vi.mock('./modules/note-prompts/scheduler.js', () => ({
startSchedulerLoop: vi.fn(),
stopSchedulerLoop: vi.fn(),
}));
vi.mock('./modules/intake/routes.js', () => ({ intakeRoutes: vi.fn() }));
vi.mock('./lib/cosmos-init.js', () => ({ initCosmosIfNeeded: initCosmosIfNeededMock }));
vi.mock('./lib/datastore.js', () => ({ initDatastore: initDatastoreMock }));
vi.mock('./lib/config.js', () => ({
@ -77,7 +78,7 @@ describe('server bootstrap', () => {
expect(initDatastoreMock).toHaveBeenCalledOnce();
expect(createServiceAppMock).toHaveBeenCalledOnce();
expect(registerOptionalJwtContextMock).toHaveBeenCalledOnce();
expect(appMock.register).toHaveBeenCalledTimes(11);
expect(appMock.register).toHaveBeenCalledTimes(12);
expect(startServiceMock).toHaveBeenCalledWith(appMock, { port: 4016, host: '0.0.0.0' });
});
});

View File

@ -11,6 +11,7 @@ import { savedViewRoutes } from './modules/saved-views/routes.js';
import { workspaceRoutes } from './modules/workspaces/routes.js';
import { notePromptRoutes } from './modules/note-prompts/routes.js';
import { promptSchedulerRoutes, startSchedulerLoop, stopSchedulerLoop } from './modules/note-prompts/scheduler.js';
import { intakeRoutes } from './modules/intake/routes.js';
import { initCosmosIfNeeded } from './lib/cosmos-init.js';
import { initEncryption } from './lib/field-encrypt.js';
import { initDatastore } from './lib/datastore.js';
@ -65,6 +66,7 @@ await registerApiPlugin(savedViewRoutes);
await registerApiPlugin(workspaceRoutes);
await registerApiPlugin(notePromptRoutes);
await registerApiPlugin(promptSchedulerRoutes);
await registerApiPlugin(intakeRoutes);
// ── Start scheduler loop (F25) ────────────────────────────────────
startSchedulerLoop();