Documentation Index Fetch the complete documentation index at: https://mintlify.com/lamassuiot/lamassuiot/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Event bus engines enable asynchronous communication between Lamassu services using the publish-subscribe pattern.
Use Cases:
Certificate lifecycle events (issued, revoked, expired)
Device registration and status changes
CA creation and updates
Alert notifications
Service-to-service communication
All event bus engines implement the EventBusEngine interface from github.com/lamassuiot/lamassuiot/core/v3/pkg/engines/eventbus.
Available Engines
AMQP (RabbitMQ)
AWS SNS/SQS
AMQP Event Bus Engine Publish and subscribe to events using AMQP (RabbitMQ) with topic-based routing. Import Path: import " github.com/lamassuiot/lamassuiot/engines/eventbus/amqp/v3 "
Configuration event_bus :
provider : amqp
config :
protocol : amqps # or 'amqp'
hostname : rabbitmq.example.com
port : 5671
exchange : lamassu-events
basic_auth :
enabled : true
username : lamassu
password : ${RABBITMQ_PASSWORD}
client_tls_auth :
enabled : true
cert_file : /etc/lamassu/client.crt
key_file : /etc/lamassu/client.key
Config Struct: type AMQPConnection struct {
BasicConnection // Hostname, Port
Exchange string
Protocol AMQPProtocol // "amqp" or "amqps"
BasicAuth AMQPConnectionBasicAuth
ClientTLSAuth struct {
Enabled bool
CertFile string
KeyFile string
}
}
type AMQPConnectionBasicAuth struct {
Enabled bool
Username string
Password config . Password
}
Topic Routing Events are published to topics with hierarchical routing keys: lamassu.ca.created
lamassu.ca.updated
lamassu.certificate.issued
lamassu.certificate.revoked
lamassu.device.registered
lamassu.device.status_changed
Publisher Setup engine , err := amqp . NewAmqpEngine ( config , serviceID , logger )
publisher , err := engine . Publisher ()
// Publish event
event := models . Event {
Type : "lamassu.certificate.issued" ,
Payload : map [ string ] interface {}{
"serial_number" : "1a2b3c4d" ,
"subject" : "CN=device-001" ,
},
}
msg := message . NewMessage ( uuid . New (). String (), eventBytes )
err = publisher . Publish ( "lamassu.certificate.issued" , msg )
Subscriber Setup subscriber , err := engine . Subscriber ()
// Subscribe to topic
messages , err := subscriber . Subscribe ( ctx , "lamassu.certificate.*" )
for msg := range messages {
var event models . Event
json . Unmarshal ( msg . Payload , & event )
// Process event
handleEvent ( event )
msg . Ack ()
}
RabbitMQ Setup Docker Compose: services :
rabbitmq :
image : rabbitmq:3-management-alpine
ports :
- "5672:5672" # AMQP
- "15672:15672" # Management UI
environment :
RABBITMQ_DEFAULT_USER : lamassu
RABBITMQ_DEFAULT_PASS : changeme
volumes :
- rabbitmq-data:/var/lib/rabbitmq
Create Exchange: # Via management API
curl -u lamassu:changeme -X PUT \
http://localhost:15672/api/exchanges/%2F/lamassu-events \
-H 'content-type: application/json' \
-d '{
"type": "topic",
"durable": true
}'
TLS Configuration Enable TLS in RabbitMQ: % rabbitmq.conf
listeners . ssl . default = 5671
ssl_options . cacertfile = / etc / rabbitmq / ca . pem
ssl_options . certfile = / etc / rabbitmq / server . crt
ssl_options . keyfile = / etc / rabbitmq / server . key
ssl_options . verify = verify_peer
ssl_options . fail_if_no_peer_cert = true
Source: engines/eventbus/amqp/engine.go:17AWS SNS/SQS Event Bus Engine Publish to SNS topics and consume from SQS queues with AWS-native integration. Import Path: import " github.com/lamassuiot/lamassuiot/engines/eventbus/aws/v3 "
Configuration event_bus :
provider : aws_sqs_sns
config :
aws_region : us-east-1
aws_access_key_id : ${AWS_ACCESS_KEY_ID}
aws_secret_access_key : ${AWS_SECRET_ACCESS_KEY}
# Optional: assume role
aws_role_arn : arn:aws:iam::123456789012:role/LamassuEventBus
Config Struct: type AWSSDKConfig struct {
Region string
AccessKeyID string
SecretAccessKey string
SessionToken string
RoleARN string
Endpoint string // For LocalStack
}
Topic Structure SNS topics created automatically per event type: arn:aws:sns:us-east-1:123456789012:lamassu-ca-created
arn:aws:sns:us-east-1:123456789012:lamassu-certificate-issued
arn:aws:sns:us-east-1:123456789012:lamassu-device-registered
Publisher Setup engine , err := aws . NewAWSEngine ( config , serviceID , logger )
publisher , err := engine . Publisher ()
// Publish to SNS topic
msg := message . NewMessage ( uuid . New (). String (), eventBytes )
msg . Metadata . Set ( "topic-arn" , topicARN )
err = publisher . Publish ( "lamassu.certificate.issued" , msg )
SNS Message Attributes: {
"MessageAttributes" : {
"event_type" : {
"DataType" : "String" ,
"StringValue" : "lamassu.certificate.issued"
},
"service_id" : {
"DataType" : "String" ,
"StringValue" : "ca-service"
}
}
}
Subscriber Setup subscriber , err := engine . Subscriber ()
// Subscribe to SQS queue
messages , err := subscriber . Subscribe ( ctx , queueURL )
for msg := range messages {
// Process SQS message
var snsEvent SNSEvent
json . Unmarshal ( msg . Payload , & snsEvent )
// Extract original event
var event models . Event
json . Unmarshal ([] byte ( snsEvent . Message ), & event )
handleEvent ( event )
msg . Ack () // Delete from SQS
}
AWS Infrastructure Setup Terraform: # SNS Topic
resource "aws_sns_topic" "lamassu_events" {
name = "lamassu-certificate-issued"
tags = {
Service = "lamassu"
}
}
# SQS Queue
resource "aws_sqs_queue" "lamassu_alerts" {
name = "lamassu-alerts"
visibility_timeout_seconds = 300
message_retention_seconds = 1209600 # 14 days
redrive_policy = jsonencode ({
deadLetterTargetArn = aws_sqs_queue.lamassu_dlq.arn
maxReceiveCount = 3
})
}
# Dead Letter Queue
resource "aws_sqs_queue" "lamassu_dlq" {
name = "lamassu-alerts-dlq"
}
# SNS to SQS Subscription
resource "aws_sns_topic_subscription" "lamassu_alerts" {
topic_arn = aws_sns_topic . lamassu_events . arn
protocol = "sqs"
endpoint = aws_sqs_queue . lamassu_alerts . arn
filter_policy = jsonencode ({
event_type = [ "lamassu.certificate.issued" ]
})
}
IAM Policy: {
"Version" : "2012-10-17" ,
"Statement" : [
{
"Effect" : "Allow" ,
"Action" : [
"sns:Publish" ,
"sns:CreateTopic" ,
"sns:GetTopicAttributes"
],
"Resource" : "arn:aws:sns:*:*:lamassu-*"
},
{
"Effect" : "Allow" ,
"Action" : [
"sqs:ReceiveMessage" ,
"sqs:DeleteMessage" ,
"sqs:GetQueueAttributes" ,
"sqs:GetQueueUrl"
],
"Resource" : "arn:aws:sqs:*:*:lamassu-*"
}
]
}
LocalStack Testing event_bus :
provider : aws_sqs_sns
config :
aws_region : us-east-1
aws_access_key_id : test
aws_secret_access_key : test
endpoint : http://localstack:4566
Source: engines/eventbus/aws/engine.go:17
All events follow the CloudEvents specification:
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Source string `json:"source"`
SpecVersion string `json:"specversion"`
Time time . Time `json:"time"`
DataContentType string `json:"datacontenttype"`
Data map [ string ] interface {} `json:"data"`
}
Example:
{
"id" : "550e8400-e29b-41d4-a716-446655440000" ,
"type" : "lamassu.certificate.issued" ,
"source" : "ca-service" ,
"specversion" : "1.0" ,
"time" : "2024-01-15T10:30:00Z" ,
"datacontenttype" : "application/json" ,
"data" : {
"serial_number" : "1a2b3c4d5e6f" ,
"subject" : "CN=device-001,O=Acme Corp" ,
"issuer_ca_id" : "root-ca" ,
"valid_from" : "2024-01-15T10:30:00Z" ,
"valid_to" : "2025-01-15T10:30:00Z"
}
}
Event Types
Event Type Payload Triggered By lamassu.ca.createdCA details CA creation lamassu.ca.updatedCA ID, changes CA metadata update lamassu.certificate.issuedSerial, subject, CA Certificate signing lamassu.certificate.revokedSerial, reason Certificate revocation lamassu.device.registeredDevice ID, DMS Device enrollment lamassu.device.status_changedDevice ID, status Device lifecycle lamassu.dms.createdDMS ID, settings DMS registration
Monitoring
Metrics
# Messages published
lamassu_eventbus_messages_published_total{topic="lamassu.certificate.issued"} 1234
# Messages consumed
lamassu_eventbus_messages_consumed_total{topic="lamassu.certificate.issued"} 1230
# Consumer lag
lamassu_eventbus_consumer_lag{topic="lamassu.certificate.issued"} 4
# Publish errors
lamassu_eventbus_publish_errors_total{topic="lamassu.ca.created"} 2
Dead Letter Queues
AMQP (RabbitMQ):
# View DLQ messages
rabbitmqadmin get queue=lamassu-events-dlq count= 10
AWS SQS:
# View DLQ messages
aws sqs receive-message \
--queue-url https://sqs.us-east-1.amazonaws.com/123456789012/lamassu-alerts-dlq \
--max-number-of-messages 10
Troubleshooting
Symptoms: could not generate Event Bus Publisher: dial tcp: connection refused
Solutions:
Verify RabbitMQ is running: docker ps | grep rabbitmq
Check port accessibility: telnet rabbitmq.example.com 5672
Review RabbitMQ logs: docker logs rabbitmq
Symptoms: AccessDenied: User is not authorized to perform: sns:Publish
Solutions:
Verify IAM permissions include sns:Publish
Check service role: aws sts get-caller-identity
Test permissions: aws sns publish --topic-arn <arn> --message "test"
Messages Not Being Consumed
Cause: Consumer not acking messages or queue misconfigured.Solutions:
Check consumer logs for errors
Verify queue bindings: rabbitmqadmin list bindings
Increase visibility timeout (SQS)
Check DLQ for failed messages
Next Steps
AWS IoT Connector Integrate with AWS IoT Core
Alerts Configure event-based alerting