Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/lamassuiot/lamassuiot/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Event bus engines enable asynchronous communication between Lamassu services using the publish-subscribe pattern. Use Cases:
  • Certificate lifecycle events (issued, revoked, expired)
  • Device registration and status changes
  • CA creation and updates
  • Alert notifications
  • Service-to-service communication
All event bus engines implement the EventBusEngine interface from github.com/lamassuiot/lamassuiot/core/v3/pkg/engines/eventbus.

Available Engines

AMQP Event Bus Engine

Publish and subscribe to events using AMQP (RabbitMQ) with topic-based routing.Import Path:
import "github.com/lamassuiot/lamassuiot/engines/eventbus/amqp/v3"

Configuration

event_bus:
  provider: amqp
  config:
    protocol: amqps  # or 'amqp'
    hostname: rabbitmq.example.com
    port: 5671
    exchange: lamassu-events
    basic_auth:
      enabled: true
      username: lamassu
      password: ${RABBITMQ_PASSWORD}
    client_tls_auth:
      enabled: true
      cert_file: /etc/lamassu/client.crt
      key_file: /etc/lamassu/client.key
Config Struct:
type AMQPConnection struct {
    BasicConnection  // Hostname, Port
    Exchange         string
    Protocol         AMQPProtocol  // "amqp" or "amqps"
    BasicAuth        AMQPConnectionBasicAuth
    ClientTLSAuth    struct {
        Enabled  bool
        CertFile string
        KeyFile  string
    }
}

type AMQPConnectionBasicAuth struct {
    Enabled  bool
    Username string
    Password config.Password
}

Topic Routing

Events are published to topics with hierarchical routing keys:
lamassu.ca.created
lamassu.ca.updated
lamassu.certificate.issued
lamassu.certificate.revoked
lamassu.device.registered
lamassu.device.status_changed

Publisher Setup

engine, err := amqp.NewAmqpEngine(config, serviceID, logger)
publisher, err := engine.Publisher()

// Publish event
event := models.Event{
    Type: "lamassu.certificate.issued",
    Payload: map[string]interface{}{
        "serial_number": "1a2b3c4d",
        "subject": "CN=device-001",
    },
}

msg := message.NewMessage(uuid.New().String(), eventBytes)
err = publisher.Publish("lamassu.certificate.issued", msg)

Subscriber Setup

subscriber, err := engine.Subscriber()

// Subscribe to topic
messages, err := subscriber.Subscribe(ctx, "lamassu.certificate.*")

for msg := range messages {
    var event models.Event
    json.Unmarshal(msg.Payload, &event)
    
    // Process event
    handleEvent(event)
    
    msg.Ack()
}

RabbitMQ Setup

Docker Compose:
services:
  rabbitmq:
    image: rabbitmq:3-management-alpine
    ports:
      - "5672:5672"   # AMQP
      - "15672:15672" # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: lamassu
      RABBITMQ_DEFAULT_PASS: changeme
    volumes:
      - rabbitmq-data:/var/lib/rabbitmq
Create Exchange:
# Via management API
curl -u lamassu:changeme -X PUT \
  http://localhost:15672/api/exchanges/%2F/lamassu-events \
  -H 'content-type: application/json' \
  -d '{
    "type": "topic",
    "durable": true
  }'

TLS Configuration

Enable TLS in RabbitMQ:
% rabbitmq.conf
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ca.pem
ssl_options.certfile   = /etc/rabbitmq/server.crt
ssl_options.keyfile    = /etc/rabbitmq/server.key
ssl_options.verify     = verify_peer
ssl_options.fail_if_no_peer_cert = true
Source: engines/eventbus/amqp/engine.go:17

Event Message Format

All events follow the CloudEvents specification:
type Event struct {
    ID              string                 `json:"id"`
    Type            string                 `json:"type"`
    Source          string                 `json:"source"`
    SpecVersion     string                 `json:"specversion"`
    Time            time.Time              `json:"time"`
    DataContentType string                 `json:"datacontenttype"`
    Data            map[string]interface{} `json:"data"`
}
Example:
{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "type": "lamassu.certificate.issued",
  "source": "ca-service",
  "specversion": "1.0",
  "time": "2024-01-15T10:30:00Z",
  "datacontenttype": "application/json",
  "data": {
    "serial_number": "1a2b3c4d5e6f",
    "subject": "CN=device-001,O=Acme Corp",
    "issuer_ca_id": "root-ca",
    "valid_from": "2024-01-15T10:30:00Z",
    "valid_to": "2025-01-15T10:30:00Z"
  }
}

Event Types

Event TypePayloadTriggered By
lamassu.ca.createdCA detailsCA creation
lamassu.ca.updatedCA ID, changesCA metadata update
lamassu.certificate.issuedSerial, subject, CACertificate signing
lamassu.certificate.revokedSerial, reasonCertificate revocation
lamassu.device.registeredDevice ID, DMSDevice enrollment
lamassu.device.status_changedDevice ID, statusDevice lifecycle
lamassu.dms.createdDMS ID, settingsDMS registration

Monitoring

Metrics

# Messages published
lamassu_eventbus_messages_published_total{topic="lamassu.certificate.issued"} 1234

# Messages consumed
lamassu_eventbus_messages_consumed_total{topic="lamassu.certificate.issued"} 1230

# Consumer lag
lamassu_eventbus_consumer_lag{topic="lamassu.certificate.issued"} 4

# Publish errors
lamassu_eventbus_publish_errors_total{topic="lamassu.ca.created"} 2

Dead Letter Queues

AMQP (RabbitMQ):
# View DLQ messages
rabbitmqadmin get queue=lamassu-events-dlq count=10
AWS SQS:
# View DLQ messages
aws sqs receive-message \
  --queue-url https://sqs.us-east-1.amazonaws.com/123456789012/lamassu-alerts-dlq \
  --max-number-of-messages 10

Troubleshooting

Symptoms:
could not generate Event Bus Publisher: dial tcp: connection refused
Solutions:
  • Verify RabbitMQ is running: docker ps | grep rabbitmq
  • Check port accessibility: telnet rabbitmq.example.com 5672
  • Review RabbitMQ logs: docker logs rabbitmq
Symptoms:
AccessDenied: User is not authorized to perform: sns:Publish
Solutions:
  • Verify IAM permissions include sns:Publish
  • Check service role: aws sts get-caller-identity
  • Test permissions: aws sns publish --topic-arn <arn> --message "test"
Cause: Consumer not acking messages or queue misconfigured.Solutions:
  • Check consumer logs for errors
  • Verify queue bindings: rabbitmqadmin list bindings
  • Increase visibility timeout (SQS)
  • Check DLQ for failed messages

Next Steps

AWS IoT Connector

Integrate with AWS IoT Core

Alerts

Configure event-based alerting