Queues
Type-safe pg-boss job queues with a registry pattern, typed producer, and graceful consumer shutdown.
Queues
Telaio's queue module wraps pg-boss with a registry pattern that gives you end-to-end type safety between producers and consumers. Job payload types are inferred from the registry -- you cannot send a job with the wrong shape.
Registry pattern
Define your queues as a plain object and use satisfies to lock in types while preserving the literal key names for inference:
// src/queues/registry/index.ts
import type { QueueJobHandler } from 'telaio/queue';
import type { Job } from 'pg-boss';
export const registry = {
'send-welcome-email': async (jobs: Job<{ userId: string }>[]) => {
for (const job of jobs) {
await sendWelcomeEmail(job.data.userId);
}
},
'process-payment': async (jobs: Job<{ orderId: string; amount: number }>[]) => {
for (const job of jobs) {
await processPayment(job.data);
}
},
} satisfies Record<string, QueueJobHandler>;
export type AppRegistry = typeof registry;The satisfies keyword verifies each value matches QueueJobHandler while keeping the object type narrow enough for TypeScript to infer per-key payload types.
Sending jobs (producer)
After calling .withQueues(registry) on the builder, app.queue is a typed producer:
// Correct -- TypeScript knows userId is required
await app.queue.send('send-welcome-email', { userId: 'user_123' });
// TypeScript error -- amount is not in send-welcome-email's payload
await app.queue.send('send-welcome-email', { amount: 100 });
// Correct
await app.queue.send('process-payment', { orderId: 'ord_456', amount: 49.99 });Pass pg-boss SendOptions as an optional third argument to control retries, delay, priority, and expiry:
await app.queue.send(
'process-payment',
{ orderId: 'ord_456', amount: 49.99 },
{ retryLimit: 3, retryDelay: 30 },
);Standalone producer
If you need a producer outside the builder (e.g., in a script or Lambda):
import { createQueueProducer } from 'telaio/queue';
import type { AppRegistry } from './queues/registry/index.js';
const queue = createQueueProducer<AppRegistry>(config);
await queue.send('send-welcome-email', { userId: 'user_123' });Consumer process
Consumers run in a separate long-lived process. Use startConsumer directly or run it via the telaio consumer CLI command.
// src/consumer.ts
import { startConsumer } from 'telaio/queue';
import { registry } from './queues/registry/index.js';
import { loadConfig } from 'telaio/config';
const config = loadConfig({ modules: { database: true } });
await startConsumer(registry, { connection: config });startConsumer connects to pg-boss, subscribes each key in the registry to its handler function, and blocks until the process receives SIGINT or SIGTERM. On shutdown it drains in-flight jobs before exiting.
Handler errors
If a handler throws, pg-boss marks the job as failed and will retry it according to the queue's retry policy. Do not swallow errors inside handlers -- let them propagate so the job enters the retry/dead-letter flow.
JobDataFor utility type
Extract a job's payload type from the registry without repeating it:
import type { JobDataFor } from 'telaio/queue';
import type { AppRegistry } from './queues/registry/index.js';
type PaymentPayload = JobDataFor<AppRegistry, 'process-payment'>;
// { orderId: string; amount: number }Builder integration
import { registry } from './queues/registry/index.js';
const app = await createApp({ config })
.withQueues(registry)
.build();
app.queue // QueueProducer<typeof registry>Pass explicit connection options if your queue database differs from the main pool:
.withQueues(registry, {
connection: { connectionString: process.env.QUEUE_DATABASE_URL },
})Types reference
// A single queue handler
type QueueJobHandler<TData = any> = (jobs: Job<TData>[]) => Promise<void>;
// Registry type
type QueueRegistry = Record<string, QueueJobHandler>;
// Extract payload type from a registry
type JobDataFor<TRegistry extends QueueRegistry, K extends keyof TRegistry> = ...;
// Typed producer interface
interface QueueProducer<TRegistry extends QueueRegistry> {
send<K extends keyof TRegistry & string>(
queueName: K,
data: JobDataFor<TRegistry, K>,
options?: SendOptions,
): Promise<string | null>;
}