Background Workers
ReliaPulse uses BullMQ workers for background processing. Workers run as separate processes and handle time-consuming tasks asynchronously.
Worker Architecture
┌─────────────────────────────────────────────────────────────┐
│ Redis (BullMQ) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ uptime │ │ metrics │ │notifications│ ... │
│ │ queue │ │ queue │ │ queue │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
└─────────┼────────────────┼────────────────┼─────────────────┘
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Uptime │ │ Metrics │ │Notification│
│ Worker │ │ Worker │ │ Worker │
└───────────┘ └───────────┘ └───────────┘Available Workers
Uptime Worker
Performs HTTP health checks on ENDPOINT type components.
Queue Name: uptime
Job Types:
check- Single health check executionschedule- Schedule recurring checks
Process Flow:
1. Receive job with componentId
2. Fetch component configuration
3. Build HTTP request (method, headers, body)
4. Execute request with timeout
5. Evaluate conditions (status code, body, response time)
6. Update component status
7. Create incident if thresholds exceeded
8. Queue notifications if status changedConfiguration:
{
componentId: string;
organizationId: string;
url: string;
method: "GET" | "POST" | "HEAD" | ...;
headers?: Record<string, string>;
expectedStatus?: number;
conditions?: Condition[];
timeout?: number;
}Metrics Worker
Polls external metrics from integrations (Datadog, Prometheus, etc.).
Queue Name: metrics
Job Types:
poll- Fetch metrics from integrationschedule- Schedule recurring polls
Process Flow:
1. Receive job with metricsQueryId
2. Load integration and query config
3. Call integration adapter
4. Parse response and extract values
5. Store data points
6. Evaluate thresholds
7. Update component status if linked
8. Trigger calculated metrics recomputationSupported Integrations:
- Prometheus (PromQL)
- Datadog (DQL)
- New Relic (NRQL)
- Grafana (various datasources)
- Pingdom (check status)
Notification Worker
Delivers notifications through various channels.
Queue Name: notifications
Job Types:
send- Send single notificationbroadcast- Send to all subscribers
Channels:
| Channel | Delivery Method |
|---|---|
| Resend API | |
| SMS | Twilio API |
| Slack | Webhook with Block Kit |
| Discord | Webhook with embeds |
| Teams | Adaptive Cards webhook |
| Webhook | Custom HTTP POST |
| Push | Web Push API |
Job Data:
{
type: "incident" | "maintenance" | "component";
entityId: string;
organizationId: string;
channels?: string[]; // Specific channels, or all if empty
subscribers?: string[]; // Specific subscribers, or all if empty
}On-Call Worker
Handles on-call alerting and escalation.
Queue Name: oncall
Job Types:
alert- Create and deliver on-call alertescalate- Escalate unacknowledged alert
Process Flow:
1. Receive alert job
2. Determine current on-call responder
3. Create OnCallAlert record
4. Send notification (SMS, email, push)
5. Wait for acknowledgment
6. If timeout, escalate to next level
7. Repeat until acknowledged or max escalationsCleanup Worker
Removes old data to manage database size.
Queue Name: cleanup
Schedule: Daily at midnight
Data Cleaned:
ExtractedValueDataPoint- 7 daysCalculatedMetricDataPoint- 7 daysMetricDataPoint- 30 days (configurable)UptimeCheck- 90 days
Calculated Metrics Worker
Recomputes calculated metrics when source data changes.
Queue Name: calculated-metrics
Trigger Events:
- ExtractedValue updated (from uptime checks)
- MetricsQuery value updated (from metrics polling)
Process Flow:
1. Receive job with calculatedMetricId
2. Load formula and variable mappings
3. Fetch latest values for all variables
4. Evaluate formula with mathjs
5. Store result as data point
6. Update lastValue and statusJob Scheduling
Recurring Jobs
Workers schedule their own recurring jobs:
// Uptime worker schedules checks
await uptimeQueue.add(
'schedule',
{ componentId },
{
repeat: {
every: component.checkInterval * 1000,
},
jobId: `uptime-${componentId}`,
}
);Job Priority
Higher priority jobs are processed first:
await queue.add('urgent-notification', data, {
priority: 1, // Lower number = higher priority
});Retry Logic
Failed jobs are retried with exponential backoff:
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s
},
}Worker Configuration
Environment Variables
# Redis connection
REDIS_URL=redis://localhost:6379
# Worker concurrency
WORKER_CONCURRENCY=10
# Log level
LOG_LEVEL=infoConcurrency
Each worker type has configurable concurrency:
const worker = new Worker('uptime', processJob, {
concurrency: 10, // Process 10 jobs simultaneously
});Graceful Shutdown
Workers handle SIGTERM for graceful shutdown:
process.on('SIGTERM', async () => {
await worker.close();
process.exit(0);
});Monitoring Workers
Health Check
Workers expose health status:
// Check if worker is processing
const isRunning = worker.isRunning();
const isPaused = worker.isPaused();Job Events
Workers emit events for monitoring:
worker.on('completed', (job) => {
log.info({ jobId: job.id }, 'Job completed');
});
worker.on('failed', (job, err) => {
log.error({ jobId: job.id, err }, 'Job failed');
});Queue Metrics
BullMQ provides queue statistics:
const counts = await queue.getJobCounts();
// { waiting: 10, active: 2, completed: 100, failed: 5 }Logging
Workers use structured JSON logging:
{
"level": "info",
"time": "2026-01-24T12:00:00.000Z",
"worker": "uptime",
"jobId": "job-123",
"organizationId": "org-456",
"componentId": "comp-789",
"msg": "Health check completed"
}Docker Deployment
Workers run as a separate container:
# docker-compose.yml
services:
worker:
build: .
command: npm run worker
environment:
- REDIS_URL=redis://redis:6379
- DATABASE_URL=${DATABASE_URL}
depends_on:
- redis
- dbScaling Workers
Horizontal Scaling
Add more worker containers for higher throughput:
# docker-compose.yml
services:
worker:
deploy:
replicas: 3Queue Isolation
Run separate workers for different queues:
services:
worker-uptime:
command: npm run worker:uptime
worker-notifications:
command: npm run worker:notificationsBest Practices
- Idempotency - Jobs should be safe to retry
- Timeouts - Set appropriate job timeouts
- Error Handling - Log errors with context
- Monitoring - Track queue depth and processing time
- Backpressure - Limit queue size to prevent memory issues