Telaio
Modules

Queues

Type-safe pg-boss job queues with a registry pattern, typed producer, and graceful consumer shutdown.

Queues

Telaio's queue module wraps pg-boss with a registry pattern that gives you end-to-end type safety between producers and consumers. Job payload types are inferred from the registry -- you cannot send a job with the wrong shape.

Registry pattern

Define your queues as a plain object and use satisfies to lock in types while preserving the literal key names for inference:

// src/queues/registry/index.ts
import type { QueueJobHandler } from 'telaio/queue';
import type { Job } from 'pg-boss';

export const registry = {
  'send-welcome-email': async (jobs: Job<{ userId: string }>[]) => {
    for (const job of jobs) {
      await sendWelcomeEmail(job.data.userId);
    }
  },
  'process-payment': async (jobs: Job<{ orderId: string; amount: number }>[]) => {
    for (const job of jobs) {
      await processPayment(job.data);
    }
  },
} satisfies Record<string, QueueJobHandler>;

export type AppRegistry = typeof registry;

The satisfies keyword verifies each value matches QueueJobHandler while keeping the object type narrow enough for TypeScript to infer per-key payload types.

Sending jobs (producer)

After calling .withQueues(registry) on the builder, app.queue is a typed producer:

// Correct -- TypeScript knows userId is required
await app.queue.send('send-welcome-email', { userId: 'user_123' });

// TypeScript error -- amount is not in send-welcome-email's payload
await app.queue.send('send-welcome-email', { amount: 100 });

// Correct
await app.queue.send('process-payment', { orderId: 'ord_456', amount: 49.99 });

Pass pg-boss SendOptions as an optional third argument to control retries, delay, priority, and expiry:

await app.queue.send(
  'process-payment',
  { orderId: 'ord_456', amount: 49.99 },
  { retryLimit: 3, retryDelay: 30 },
);

Standalone producer

If you need a producer outside the builder (e.g., in a script or Lambda):

import { createQueueProducer } from 'telaio/queue';
import type { AppRegistry } from './queues/registry/index.js';

const queue = createQueueProducer<AppRegistry>(config);
await queue.send('send-welcome-email', { userId: 'user_123' });

Consumer process

Consumers run in a separate long-lived process. Use startConsumer directly or run it via the telaio consumer CLI command.

// src/consumer.ts
import { startConsumer } from 'telaio/queue';
import { registry } from './queues/registry/index.js';
import { loadConfig } from 'telaio/config';

const config = loadConfig({ modules: { database: true } });

await startConsumer(registry, { connection: config });

startConsumer connects to pg-boss, subscribes each key in the registry to its handler function, and blocks until the process receives SIGINT or SIGTERM. On shutdown it drains in-flight jobs before exiting.

Handler errors

If a handler throws, pg-boss marks the job as failed and will retry it according to the queue's retry policy. Do not swallow errors inside handlers -- let them propagate so the job enters the retry/dead-letter flow.

JobDataFor utility type

Extract a job's payload type from the registry without repeating it:

import type { JobDataFor } from 'telaio/queue';
import type { AppRegistry } from './queues/registry/index.js';

type PaymentPayload = JobDataFor<AppRegistry, 'process-payment'>;
// { orderId: string; amount: number }

Builder integration

import { registry } from './queues/registry/index.js';

const app = await createApp({ config })
  .withQueues(registry)
  .build();

app.queue  // QueueProducer<typeof registry>

Pass explicit connection options if your queue database differs from the main pool:

.withQueues(registry, {
  connection: { connectionString: process.env.QUEUE_DATABASE_URL },
})

Types reference

// A single queue handler
type QueueJobHandler<TData = any> = (jobs: Job<TData>[]) => Promise<void>;

// Registry type
type QueueRegistry = Record<string, QueueJobHandler>;

// Extract payload type from a registry
type JobDataFor<TRegistry extends QueueRegistry, K extends keyof TRegistry> = ...;

// Typed producer interface
interface QueueProducer<TRegistry extends QueueRegistry> {
  send<K extends keyof TRegistry & string>(
    queueName: K,
    data: JobDataFor<TRegistry, K>,
    options?: SendOptions,
  ): Promise<string | null>;
}

On this page