Telaio
API Reference

Queue API Reference

Complete reference for createQueueProducer(), startConsumer(), QueueProducer, and related types.

Queue API Reference

Telaio's queue layer wraps pg-boss with a typed registry pattern. The registry maps queue names to handler functions, and TypeScript uses that mapping to enforce correct payload types at every call site.

Import queue utilities from telaio/queue:

import { createQueueProducer, startConsumer } from 'telaio/queue';
import type { QueueRegistry, QueueJobHandler, JobDataFor, QueueProducer } from 'telaio/queue';

createQueueProducer()

function createQueueProducer<TRegistry extends QueueRegistry>(
  connectionOptions: QueueClientOptions | Record<string, unknown>,
  logger?: Logger,
): QueueProducer<TRegistry>

Creates a typed QueueProducer. The TRegistry generic is usually inferred from the registry you pass to .withQueues() -- but you can also create a standalone producer for scripts or workers that only send jobs and never consume them.

Accepts either explicit connection options or a loaded config record. When passed a config record it reads DATABASE_URL automatically.

import { createQueueProducer } from 'telaio/queue';
import type { MyQueues } from './queues/registry.js';

const producer = createQueueProducer<MyQueues>(config);

await producer.send('send-welcome-email', { userId: 'u1', email: 'alice@example.com' });

QueueProducer<TRegistry>

interface QueueProducer<TRegistry extends QueueRegistry> {
  send<K extends keyof TRegistry & string>(
    queueName: K,
    data: JobDataFor<TRegistry, K>,
    options?: SendOptions,
  ): Promise<string | null>;
}

A typed producer that narrows the data parameter based on the queue name. Attempting to pass the wrong payload shape for a given queue name is a compile-time error.

.send()

send<K extends keyof TRegistry & string>(
  queueName: K,
  data: JobDataFor<TRegistry, K>,
  options?: SendOptions,
): Promise<string | null>

Enqueues a job. Returns the job ID string on success, or null if pg-boss deduplication determined the job already exists.

ParameterTypeDescription
queueNameKThe queue name. Must be a key in TRegistry.
dataJobDataFor<TRegistry, K>The job payload. Type is inferred from the handler signature.
optionsSendOptionsOptional pg-boss send options (delay, priority, deduplication key, etc.)
// Correct payload -- compiles
await app.queue.send('send-welcome-email', {
  userId: 'u1',
  email: 'alice@example.com',
});

// Wrong payload -- compile error
await app.queue.send('send-welcome-email', {
  unknownField: true, // Type error: 'unknownField' does not exist in type
});

startConsumer()

async function startConsumer<TRegistry extends QueueRegistry>(
  registry: TRegistry,
  options: ConsumerOptions,
): Promise<void>

Starts pg-boss worker processes for all queues defined in the registry. Each key in registry is a queue name; the value is the handler function called with batches of jobs.

Call this in a separate worker process or in your main server process depending on your deployment model:

import { startConsumer } from 'telaio/queue';
import { registry } from './queues/registry.js';
import config from './config.js';

await startConsumer(registry, { connection: config });

ConsumerOptions

interface ConsumerOptions {
  connection: QueueClientOptions | Record<string, unknown>;
  logger?: Logger;
}
OptionTypeDescription
connectionQueueClientOptions | Record<string, unknown>pg-boss connection options or a loaded config record
loggerLoggerPino logger instance. If omitted, a default logger is created.

Defining a registry

import type { QueueRegistry } from 'telaio/queue';
import type { Job } from 'pg-boss';

interface SendWelcomeEmailData {
  userId: string;
  email: string;
}

interface ProcessUploadData {
  uploadId: string;
  bucket: string;
  key: string;
}

export const registry = {
  'send-welcome-email': async (jobs: Job<SendWelcomeEmailData>[]) => {
    for (const job of jobs) {
      const { userId, email } = job.data;
      // send email...
    }
  },
  'process-upload': async (jobs: Job<ProcessUploadData>[]) => {
    for (const job of jobs) {
      const { uploadId, bucket, key } = job.data;
      // process upload...
    }
  },
} satisfies QueueRegistry;

The satisfies QueueRegistry annotation is important. It allows TypeScript to infer the exact type of registry (preserving the literal queue names and payload types) while still validating the shape. Without satisfies, casting to QueueRegistry would widen the type and lose the payload inference.


QueueJobHandler<TData>

type QueueJobHandler<TData = any> = (jobs: Job<TData>[]) => Promise<void>;

The function signature for a queue consumer. Handlers always receive an array of jobs (pg-boss batches multiple jobs per invocation by default).

ParameterTypeDescription
jobsJob<TData>[]Array of pg-boss job objects with typed data

QueueRegistry

type QueueRegistry = Record<string, QueueJobHandler>;

A map of queue names to handler functions. Use satisfies QueueRegistry when defining your registry to preserve payload type inference.


JobDataFor<TRegistry, K>

type JobDataFor<TRegistry, K extends keyof TRegistry> = ...

Extracts the job payload type for queue K from a registry type. Used internally by QueueProducer.send() to narrow the data parameter.

import type { JobDataFor } from 'telaio/queue';
import type { registry } from './queues/registry.js';

type EmailPayload = JobDataFor<typeof registry, 'send-welcome-email'>;
// { userId: string; email: string }

QueueClientOptions

interface QueueClientOptions {
  connectionString: string;
}

Passed directly to pg-boss. When using a loaded config record instead, DATABASE_URL is read automatically.

On this page