emqx 认证和授权
EMQX 认证+授权最佳实践
一、背景(Background)
在多租户 IoT 平台或设备接入场景中,MQTT Broker 不仅需要完成客户端身份校验,还必须确保不同租户、不同设备之间的消息完全隔离,避免出现:
- 设备越权发布 / 订阅 Topic
- 不同租户之间的数据互相可见
- 设备通过通配符订阅获取非授权数据
随着设备规模增长(十万级 / 百万级),传统基于静态配置或简单用户名规则的鉴权方式难以满足安全性、可维护性和扩展性要求。
因此,需要一种 既能灵活对接外部业务系统,又能保证高性能和强隔离能力的认证与授权方案。
二、设计目标(Goals)
本方案主要解决以下问题:
- 支持 多租户 + 多设备 的 MQTT 接入模型
- 将 认证逻辑 与 授权规则 解耦,便于演进
- 确保设备 只能访问属于自身的 Topic
- 保证在高并发场景下具备良好的性能与稳定性
- 避免在 Broker 侧维护大量动态 ACL 状态
核心思想是:
HTTP 认证负责“你是谁 + 你的上下文属性”,ACL 负责“你能访问哪些 Topic”
四、核心原理(Principle)
4.1 EMQX 认证与授权的分层模型
EMQX 5.x 明确区分了两个阶段:
- 认证阶段(AuthN)
-
- 发生在 MQTT 客户端连接时
- 用于校验用户名、密码、Token 等凭证
- 可通过 HTTP 方式对接外部系统
- 允许返回并注入
client_attrs
- 授权阶段(AuthZ)
-
- 发生在客户端发布 / 订阅 Topic 时
- 基于 ACL 规则进行快速匹配
- 使用认证阶段注入的
client_attrs作为上下文变量
通过这种分层设计:
- 认证逻辑具备高度灵活性
- 授权规则具备高性能和可预测性
4.2 client_attrs 的作用
client_attrs 是 EMQX 连接级上下文属性,用于:
- 在 ACL 中作为变量进行 Topic 匹配
- 绑定客户端与租户 / 设备 / 角色等业务属性
- 在连接生命周期内缓存,避免重复计算
典型属性包括:
- tenant_id
- device_id
- device_type
五、认证配置
基于HTTP服务验证账号密码,好处是可以注入自定义属性,测试通过HTTP响应注入device_id和tenant_id 属性,为后续ACL使用
func (h *AuthHandler) Auth(c *gin.Context) {
var req dto.AuthRequest
// 支持form和json两种格式
if err := c.ShouldBind(&req); err != nil {
logger.Errorf("Auth bind error: %v", err)
c.JSON(http.StatusOK, dto.AuthResponse{Result: "deny"})
return
}
logger.Debugf("Auth request: clientid=%s, username=%s, peerhost=%s",
req.ClientID, req.Username, req.PeerHost)
resp, err := h.authService.Authenticate(&req)
if err != nil {
logger.Errorf("Auth error: %v", err)
c.JSON(http.StatusOK, dto.AuthResponse{Result: "deny"})
return
}
c.JSON(http.StatusOK, resp)
}
func (s *AuthService) Authenticate(req *dto.AuthRequest) (*dto.AuthResponse, error) {
db := database.DB()
if db == nil {
return nil, errors.New("database connection not available")
}
var user model.MqttAuthUser
result := db.Where("username = ?", req.Username).First(&user)
if result.Error != nil {
logger.Infof("Auth failed: user not found, username=%s", req.Username)
return &dto.AuthResponse{Result: "deny"}, nil
}
// 检查用户状态
if !user.Enabled {
logger.Infof("Auth failed: user disabled, username=%s", req.Username)
return &dto.AuthResponse{Result: "deny"}, nil
}
// 验证密码 (bcrypt)
if err := bcrypt.CompareHashAndPassword([]byte(user.PasswordHash), []byte(req.Password)); err != nil {
logger.Infof("Auth failed: invalid password, username=%s", req.Username)
return &dto.AuthResponse{Result: "deny"}, nil
}
//是否超级管理员
if user.IsSuperuser {
return &dto.AuthResponse{
Result: "allow",
IsSuperuser: user.IsSuperuser,
}, nil
}
// 如果指定了ClientID,验证是否匹配
if user.ClientId != "" && req.ClientID != "" && user.ClientId != req.ClientID {
logger.Infof("Auth failed: client_id mismatch, username=%s, expected=%s, got=%s",
req.Username, user.ClientId, req.ClientID)
return &dto.AuthResponse{Result: "deny"}, nil
}
logger.Infof("Auth success: username=%s, clientid=%s, is_superuser=%v, tenant_id=%d",
req.Username, req.ClientID, user.IsSuperuser, user.TenantId)
// 构建客户端属性
clientAttrs := make(map[string]string, 10)
if user.TenantId > 0 {
clientAttrs["tenant_id"] = strconv.FormatInt(user.TenantId, 10)
}
if user.Type == model.MqttAuthUserTypeDevice {
clientAttrs["device_id"] = strconv.FormatInt(user.Id, 10)
//根据user.id 关联 model.Device的MqttAuthUserId查询device_code
var device model.Device
if err := db.Where("mqtt_auth_user_id = ?", user.Id).First(&device).Error; err == nil {
clientAttrs["device_code"] = device.DeviceCode
if user.TenantId == 0 {
clientAttrs["tenant_id"] = strconv.FormatInt(device.TenantId, 10)
}
}
// 验证device类型必须要有tenantId和deviceCode
if clientAttrs["tenant_id"] == "" {
logger.Infof("Auth failed: tenant_id not found for device, username=%s", req.Username)
return &dto.AuthResponse{Result: "deny"}, nil
}
if clientAttrs["device_code"] == "" {
logger.Infof("Auth failed: device_code not found, username=%s", req.Username)
return &dto.AuthResponse{Result: "deny"}, nil
}
clientAttrs["type"] = "device"
} else {
clientAttrs["user_id"] = strconv.FormatInt(user.Id, 10)
clientAttrs["username"] = user.Username
// 验证非device类型必须要有tenantId和userId
if clientAttrs["tenant_id"] == "" {
logger.Infof("Auth failed: tenant_id not found, username=%s", req.Username)
return &dto.AuthResponse{Result: "deny"}, nil
}
if clientAttrs["user_id"] == "" {
logger.Infof("Auth failed: user_id not found, username=%s", req.Username)
return &dto.AuthResponse{Result: "deny"}, nil
}
clientAttrs["type"] = "user"
}
clientAttrs["client_id"] = user.ClientId
return &dto.AuthResponse{
Result: "allow",
IsSuperuser: user.IsSuperuser,
ClientAttrs: clientAttrs,
}, nil
}
六、授权配置
EMQX 允许创建多个授权机制,当一个客户端进行发布或者订阅操作的时候,EMQX 会按顺序逐个检查。
- 使用 ALC文件 来对设备进行规则匹配,以实现一条规则适配所有设备,只能向自己的topic发送数据,减少规则的膨胀,对内存造成负担
- 使用 HTTP服务 来对用户业务的权限进行检查,例如只允许订阅属于自己范围的设备数据
如下配置两个规则分别实现上面的1,2两条
-
1的topic 路径符合 File 配置的路径规则 并且是发布类型 (见下面例子)
-
- 如果是自己的topic 返回允许
- 如果是不自己的topic,返回拒绝
-
1的topic规则不符合,那么走下一个 HTTP服务
-
- 向/acl 接口发送请求校验,按规则返回允许或拒绝
ALC文件
client_attrs.tenant_id,client_attrs.device_id 是认证注入的参数EMQX 会自动完成替换。
此规则只允许向特定的一条 topic 发布消息,限制了一台设备只能向自己的topic 发布,用户控制设备的权限。
{allow, all, publish, ["iot/up/std/${client_attrs.tenant_id}/${client_attrs.device_id}/#"]}.
HTTP服务
参数都是是内置通配符,EMQX服务会像该地址发送请求,自动带上这些参数,后端可以根据这些参数完成校验,目的是为了验证用户的权限。
七、ACL文件性能分析
Action: 时间复杂度O(1)
Who: 时间复杂度约等于O(L),字符串数量有关,精确匹配字符串(快)和正则表达式匹配(较慢),
Topic: 递归匹配每层O(L),层级有关
Authorization Cache:默认未开启,默认1分钟,基于LRU淘汰,每个客户端默认最大条目数32
九、局限性与注意事项(Limitations)
- HTTP 认证服务需要具备高可用能力
- 认证阶段应避免复杂计算,防止连接阻塞
- client_attrs 一旦注入,生命周期与连接绑定
- 需要合理设计 Topic 规范以匹配 ACL 模板
十、总结(Summary)
基于 EMQX 的 HTTP 认证 + ACL 授权 是一种:
- 职责清晰
- 安全性强
- 性能可控
- 易于扩展
的 MQTT 安全接入最佳实践。
通过在认证阶段注入业务上下文属性,并在授权阶段利用 ACL 进行快速决策,可以在保证高吞吐的同时,实现严格的租户与设备隔离,是当前 EMQX 5.x 架构下的推荐方案。