The ingestion worker subscribes to MQTT topics and processes incoming telemetry in the ingestion pipeline.

Configuration

Required:

  • MQTT_URL

Optional TLS settings:

  • MQTT_CA_PATH
  • MQTT_CERT_PATH
  • MQTT_KEY_PATH

Pipeline overview

The ingestion worker lives under backend/apps/ingestion-worker/src/workers/ingestion. Key services include:

  • mqtt-subscriber.service.ts: connects to the broker and subscribes
  • validator.service.ts: validates incoming payloads
  • rate-limiter.service.ts: applies ingestion throttling
  • batch-writer.service.ts: buffers and writes telemetry
  • dlq.service.ts: handles failed messages

Use this directory as the entry point for any ingestion changes.

flowchart LR MQTT[(MQTT broker)] --> Subscriber[mqtt-subscriber] Subscriber --> Validator[validator] Validator --> RateLimiter[rate-limiter] RateLimiter --> BatchWriter[batch-writer] BatchWriter --> DB[(PostgreSQL)] Validator -->|invalid| DLQ[dlq] BatchWriter -->|failed write| DLQ