fix(platform-service): harden broadcast metrics and export job lifecycle

This commit is contained in:
saravanakumardb1 2026-03-22 11:57:47 -07:00
parent dda38aa009
commit 265599d005
3 changed files with 40 additions and 18 deletions

View File

@ -324,7 +324,7 @@ export async function recordReadReceipt(
userId: string, userId: string,
productId: string, productId: string,
action: 'read' | 'click' | 'dismiss' action: 'read' | 'click' | 'dismiss'
): Promise<void> { ): Promise<boolean> {
const container = getContainer('broadcast_reads'); const container = getContainer('broadcast_reads');
const id = `${broadcastId}:${userId}`; const id = `${broadcastId}:${userId}`;
const now = new Date().toISOString(); const now = new Date().toISOString();
@ -334,6 +334,13 @@ export async function recordReadReceipt(
if (existing) { if (existing) {
const receipt = existing as BroadcastRead; const receipt = existing as BroadcastRead;
const alreadyRecorded =
(action === 'read' && !!receipt.readAt) ||
(action === 'click' && !!receipt.clickedAt) ||
(action === 'dismiss' && !!receipt.dismissedAt);
if (alreadyRecorded) {
return false;
}
const updates: Partial<BroadcastRead> & { updatedAt: string } = { const updates: Partial<BroadcastRead> & { updatedAt: string } = {
updatedAt: now, updatedAt: now,
}; };
@ -345,6 +352,7 @@ export async function recordReadReceipt(
...receipt, ...receipt,
...updates, ...updates,
}); });
return true;
} else { } else {
const receipt: BroadcastRead = { const receipt: BroadcastRead = {
id, id,
@ -357,6 +365,7 @@ export async function recordReadReceipt(
createdAt: now, createdAt: now,
}; };
await container.items.create(receipt); await container.items.create(receipt);
return true;
} }
} catch (err) { } catch (err) {
if ((err as { code?: number }).code === 404) { if ((err as { code?: number }).code === 404) {
@ -372,6 +381,7 @@ export async function recordReadReceipt(
createdAt: now, createdAt: now,
}; };
await container.items.create(receipt); await container.items.create(receipt);
return true;
} else { } else {
throw err; throw err;
} }

View File

@ -3,6 +3,7 @@
* @module broadcasts/routes * @module broadcasts/routes
*/ */
import { randomUUID } from 'node:crypto';
import type { FastifyInstance } from 'fastify'; import type { FastifyInstance } from 'fastify';
import { import {
UnauthorizedError, UnauthorizedError,
@ -78,7 +79,7 @@ async function adminRoutes(app: FastifyInstance): Promise<void> {
const now = new Date().toISOString(); const now = new Date().toISOString();
const broadcast: Broadcast = { const broadcast: Broadcast = {
id: `bcast_${Date.now()}_${Math.random().toString(36).slice(2, 7)}`, id: `bcast_${randomUUID()}`,
productId, productId,
...input, ...input,
status: input.scheduledAt ? BroadcastStatus.SCHEDULED : BroadcastStatus.DRAFT, status: input.scheduledAt ? BroadcastStatus.SCHEDULED : BroadcastStatus.DRAFT,
@ -317,12 +318,12 @@ async function adminRoutes(app: FastifyInstance): Promise<void> {
const now = new Date().toISOString(); const now = new Date().toISOString();
const cloned: Broadcast = { const cloned: Broadcast = {
...existing, ...existing,
id: `bcast_${Date.now()}_${Math.random().toString(36).slice(2, 7)}`, id: `bcast_${randomUUID()}`,
title: `${existing.title} (Clone)`, title: `${existing.title} (Clone)`,
status: BroadcastStatus.DRAFT, status: BroadcastStatus.DRAFT,
variant: variant ?? 'treatment', variant: variant ?? 'treatment',
parentBroadcastId: existing.id, parentBroadcastId: existing.id,
experimentId: existing.experimentId ?? `exp_${Date.now()}`, experimentId: existing.experimentId ?? `exp_${randomUUID()}`,
metrics: { metrics: {
targetedCount: 0, targetedCount: 0,
sentCount: 0, sentCount: 0,
@ -380,12 +381,14 @@ async function publicRoutes(app: FastifyInstance): Promise<void> {
await repo.updateInAppMessageStatus(id, userId, 'read'); await repo.updateInAppMessageStatus(id, userId, 'read');
// Record read receipt // Record read receipt
await repo.recordReadReceipt(message.broadcastId, userId, productId, 'read'); const recorded = await repo.recordReadReceipt(message.broadcastId, userId, productId, 'read');
// Update broadcast metrics // Update broadcast metrics
await repo.updateBroadcastMetrics(message.broadcastId, productId, { if (recorded) {
openedCount: 1, // Will be incremented properly in real implementation await repo.updateBroadcastMetrics(message.broadcastId, productId, {
}); openedCount: 1,
});
}
return { success: true }; return { success: true };
}); });
@ -401,11 +404,18 @@ async function publicRoutes(app: FastifyInstance): Promise<void> {
if (!message) throw new NotFoundError('Message not found'); if (!message) throw new NotFoundError('Message not found');
await repo.updateInAppMessageStatus(id, userId, 'dismissed'); await repo.updateInAppMessageStatus(id, userId, 'dismissed');
await repo.recordReadReceipt(message.broadcastId, userId, productId, 'dismiss'); const recorded = await repo.recordReadReceipt(
message.broadcastId,
userId,
productId,
'dismiss'
);
await repo.updateBroadcastMetrics(message.broadcastId, productId, { if (recorded) {
dismissedCount: 1, await repo.updateBroadcastMetrics(message.broadcastId, productId, {
}); dismissedCount: 1,
});
}
return { success: true }; return { success: true };
}); });
@ -420,11 +430,13 @@ async function publicRoutes(app: FastifyInstance): Promise<void> {
const message = messages.find(m => m.id === id); const message = messages.find(m => m.id === id);
if (!message) throw new NotFoundError('Message not found'); if (!message) throw new NotFoundError('Message not found');
await repo.recordReadReceipt(message.broadcastId, userId, productId, 'click'); const recorded = await repo.recordReadReceipt(message.broadcastId, userId, productId, 'click');
await repo.updateBroadcastMetrics(message.broadcastId, productId, { if (recorded) {
clickedCount: 1, await repo.updateBroadcastMetrics(message.broadcastId, productId, {
}); clickedCount: 1,
});
}
return { success: true, redirectUrl: message.ctaUrl }; return { success: true, redirectUrl: message.ctaUrl };
}); });

View File

@ -64,7 +64,7 @@ export async function exportRoutes(app: FastifyInstance) {
const log = req.log; const log = req.log;
process.nextTick(async () => { process.nextTick(async () => {
try { try {
await repo.updateExportJob({ const processingJob = await repo.updateExportJob({
...created, ...created,
status: 'processing', status: 'processing',
startedAt: new Date().toISOString(), startedAt: new Date().toISOString(),
@ -73,7 +73,7 @@ export async function exportRoutes(app: FastifyInstance) {
const serialized = created.format === 'json' ? JSON.stringify(rows, null, 2) : toCsv(rows); const serialized = created.format === 'json' ? JSON.stringify(rows, null, 2) : toCsv(rows);
const fileName = `${created.type}-${access.productId}-${Date.now()}.${created.format}`; const fileName = `${created.type}-${access.productId}-${Date.now()}.${created.format}`;
await repo.updateExportJob({ await repo.updateExportJob({
...created, ...processingJob,
status: 'ready', status: 'ready',
data: serialized, data: serialized,
rowCount: rows.length, rowCount: rows.length,