emqx 认证和授权

EMQX 认证+授权最佳实践

一、背景(Background)

在多租户 IoT 平台或设备接入场景中,MQTT Broker 不仅需要完成客户端身份校验,还必须确保不同租户、不同设备之间的消息完全隔离,避免出现:

  • 设备越权发布 / 订阅 Topic
  • 不同租户之间的数据互相可见
  • 设备通过通配符订阅获取非授权数据

随着设备规模增长(十万级 / 百万级),传统基于静态配置或简单用户名规则的鉴权方式难以满足安全性、可维护性和扩展性要求。

因此,需要一种 既能灵活对接外部业务系统,又能保证高性能和强隔离能力的认证与授权方案

二、设计目标(Goals)

本方案主要解决以下问题:

  • 支持 多租户 + 多设备 的 MQTT 接入模型
  • 认证逻辑授权规则 解耦,便于演进
  • 确保设备 只能访问属于自身的 Topic
  • 保证在高并发场景下具备良好的性能与稳定性
  • 避免在 Broker 侧维护大量动态 ACL 状态

核心思想是:

HTTP 认证负责“你是谁 + 你的上下文属性”,ACL 负责“你能访问哪些 Topic”

四、核心原理(Principle)

4.1 EMQX 认证与授权的分层模型

EMQX 5.x 明确区分了两个阶段:

  1. 认证阶段(AuthN)
    • 发生在 MQTT 客户端连接时
    • 用于校验用户名、密码、Token 等凭证
    • 可通过 HTTP 方式对接外部系统
    • 允许返回并注入 client_attrs
  1. 授权阶段(AuthZ)
    • 发生在客户端发布 / 订阅 Topic 时
    • 基于 ACL 规则进行快速匹配
    • 使用认证阶段注入的 client_attrs 作为上下文变量

通过这种分层设计:

  • 认证逻辑具备高度灵活性
  • 授权规则具备高性能和可预测性

4.2 client_attrs 的作用

client_attrs 是 EMQX 连接级上下文属性,用于:

  • 在 ACL 中作为变量进行 Topic 匹配
  • 绑定客户端与租户 / 设备 / 角色等业务属性
  • 在连接生命周期内缓存,避免重复计算

典型属性包括:

  • tenant_id
  • device_id
  • device_type

五、认证配置

基于HTTP服务验证账号密码,好处是可以注入自定义属性,测试通过HTTP响应注入device_id和tenant_id 属性,为后续ACL使用

func (h *AuthHandler) Auth(c *gin.Context) {
	var req dto.AuthRequest

	// 支持form和json两种格式
	if err := c.ShouldBind(&req); err != nil {
		logger.Errorf("Auth bind error: %v", err)
		c.JSON(http.StatusOK, dto.AuthResponse{Result: "deny"})
		return
	}

	logger.Debugf("Auth request: clientid=%s, username=%s, peerhost=%s",
		req.ClientID, req.Username, req.PeerHost)

	resp, err := h.authService.Authenticate(&req)
	if err != nil {
		logger.Errorf("Auth error: %v", err)
		c.JSON(http.StatusOK, dto.AuthResponse{Result: "deny"})
		return
	}

	c.JSON(http.StatusOK, resp)
}


func (s *AuthService) Authenticate(req *dto.AuthRequest) (*dto.AuthResponse, error) {
	db := database.DB()
	if db == nil {
		return nil, errors.New("database connection not available")
	}

	var user model.MqttAuthUser
	result := db.Where("username = ?", req.Username).First(&user)
	if result.Error != nil {
		logger.Infof("Auth failed: user not found, username=%s", req.Username)
		return &dto.AuthResponse{Result: "deny"}, nil
	}

	// 检查用户状态
	if !user.Enabled {
		logger.Infof("Auth failed: user disabled, username=%s", req.Username)
		return &dto.AuthResponse{Result: "deny"}, nil
	}

	// 验证密码 (bcrypt)
	if err := bcrypt.CompareHashAndPassword([]byte(user.PasswordHash), []byte(req.Password)); err != nil {
		logger.Infof("Auth failed: invalid password, username=%s", req.Username)
		return &dto.AuthResponse{Result: "deny"}, nil
	}

	//是否超级管理员
	if user.IsSuperuser {
		return &dto.AuthResponse{
			Result:      "allow",
			IsSuperuser: user.IsSuperuser,
		}, nil
	}

	// 如果指定了ClientID,验证是否匹配
	if user.ClientId != "" && req.ClientID != "" && user.ClientId != req.ClientID {
		logger.Infof("Auth failed: client_id mismatch, username=%s, expected=%s, got=%s",
			req.Username, user.ClientId, req.ClientID)
		return &dto.AuthResponse{Result: "deny"}, nil
	}

	logger.Infof("Auth success: username=%s, clientid=%s, is_superuser=%v, tenant_id=%d",
		req.Username, req.ClientID, user.IsSuperuser, user.TenantId)

	// 构建客户端属性
	clientAttrs := make(map[string]string, 10)

	if user.TenantId > 0 {
		clientAttrs["tenant_id"] = strconv.FormatInt(user.TenantId, 10)
	}
	if user.Type == model.MqttAuthUserTypeDevice {
		clientAttrs["device_id"] = strconv.FormatInt(user.Id, 10)
		//根据user.id 关联 model.Device的MqttAuthUserId查询device_code
		var device model.Device
		if err := db.Where("mqtt_auth_user_id = ?", user.Id).First(&device).Error; err == nil {
			clientAttrs["device_code"] = device.DeviceCode
			if user.TenantId == 0 {
				clientAttrs["tenant_id"] = strconv.FormatInt(device.TenantId, 10)
			}
		}
		// 验证device类型必须要有tenantId和deviceCode
		if clientAttrs["tenant_id"] == "" {
			logger.Infof("Auth failed: tenant_id not found for device, username=%s", req.Username)
			return &dto.AuthResponse{Result: "deny"}, nil
		}
		if clientAttrs["device_code"] == "" {
			logger.Infof("Auth failed: device_code not found, username=%s", req.Username)
			return &dto.AuthResponse{Result: "deny"}, nil
		}
		clientAttrs["type"] = "device"
	} else {
		clientAttrs["user_id"] = strconv.FormatInt(user.Id, 10)
		clientAttrs["username"] = user.Username
		// 验证非device类型必须要有tenantId和userId
		if clientAttrs["tenant_id"] == "" {
			logger.Infof("Auth failed: tenant_id not found, username=%s", req.Username)
			return &dto.AuthResponse{Result: "deny"}, nil
		}
		if clientAttrs["user_id"] == "" {
			logger.Infof("Auth failed: user_id not found, username=%s", req.Username)
			return &dto.AuthResponse{Result: "deny"}, nil
		}
		clientAttrs["type"] = "user"
	}
	clientAttrs["client_id"] = user.ClientId

	return &dto.AuthResponse{
		Result:      "allow",
		IsSuperuser: user.IsSuperuser,
		ClientAttrs: clientAttrs,
	}, nil
}

六、授权配置

EMQX 允许创建多个授权机制,当一个客户端进行发布或者订阅操作的时候,EMQX 会按顺序逐个检查。

  1. 使用 ALC文件 来对设备进行规则匹配,以实现一条规则适配所有设备,只能向自己的topic发送数据,减少规则的膨胀,对内存造成负担
  2. 使用 HTTP服务 来对用户业务的权限进行检查,例如只允许订阅属于自己范围的设备数据

如下配置两个规则分别实现上面的1,2两条

  • 1的topic 路径符合 File 配置的路径规则 并且是发布类型 (见下面例子)

    • 如果是自己的topic 返回允许
    • 如果是不自己的topic,返回拒绝
  • 1的topic规则不符合,那么走下一个 HTTP服务

    • 向/acl 接口发送请求校验,按规则返回允许或拒绝

ALC文件

client_attrs.tenant_id,client_attrs.device_id 是认证注入的参数EMQX 会自动完成替换。

此规则只允许向特定的一条 topic 发布消息,限制了一台设备只能向自己的topic 发布,用户控制设备的权限。

{allow, all, publish, ["iot/up/std/${client_attrs.tenant_id}/${client_attrs.device_id}/#"]}.

HTTP服务

参数都是是内置通配符,EMQX服务会像该地址发送请求,自动带上这些参数,后端可以根据这些参数完成校验,目的是为了验证用户的权限。

七、ACL文件性能分析

Action: 时间复杂度O(1)

Who: 时间复杂度约等于O(L),字符串数量有关,精确匹配字符串(快)和正则表达式匹配(较慢),

Topic: 递归匹配每层O(L),层级有关

Authorization Cache:默认未开启,默认1分钟,基于LRU淘汰,每个客户端默认最大条目数32

九、局限性与注意事项(Limitations)

  • HTTP 认证服务需要具备高可用能力
  • 认证阶段应避免复杂计算,防止连接阻塞
  • client_attrs 一旦注入,生命周期与连接绑定
  • 需要合理设计 Topic 规范以匹配 ACL 模板

十、总结(Summary)

基于 EMQX 的 HTTP 认证 + ACL 授权 是一种:

  • 职责清晰
  • 安全性强
  • 性能可控
  • 易于扩展

的 MQTT 安全接入最佳实践。

通过在认证阶段注入业务上下文属性,并在授权阶段利用 ACL 进行快速决策,可以在保证高吞吐的同时,实现严格的租户与设备隔离,是当前 EMQX 5.x 架构下的推荐方案。