截取输入输出流写日志的简单脚本

简单的例子A -> B

A -> B

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 创建管道:获取读端和写端的文件描述符
read_fd, write_fd = os.pipe()

# 启动进程A(生产者)
process_a = Popen(
["python", "-c", "import sys; print('Hello from A!'); sys.stdout.flush()"],
stdout=write_fd, # 将A的stdout重定向到管道的写端,理解成指针。
close_fds=True # 关闭其他无关文件描述符
)

# 启动进程B(消费者)
process_b = Popen(
["python", "-c", "import sys; print('B received:', sys.stdin.readline().strip())"],
stdin=read_fd, # 将B的stdin重定向到管道的读端,理解成指针。
close_fds=True
)

开 子进程 A 和 B。
子进程里写的 sys.stdout.flush()sys.stdin.readline(),都是对子进程自己来说的!默认 sys.stdout到屏幕,sys.stdin从键盘读,但是我们给它重定向到pipe管道的两端。

read_fd, write_fd = os.pipe()

  • 创建了一个匿名管道,得到两个文件描述符:
    • read_fd读端,只能用来给别人读。
    • write_fd写端,只能给别人写入。

sys模块是干什么的?

  • sys模块是Python里访问自己进程stdinstdoutstderr的一个接口。

实现脚本

截屏2025-04-26 21.12.21

脚本目的:实现对一个进程的输入输出流的截获

我们先理清一个逻辑:
直接运行 一个 脚本A 如果能直接和 B 通信,那么我们只需要 用 C 包裹住 A ,确保 A 的所有输入 均 来自 C 的输出,确保 A 的所有输出均 运达 C 的 输入,就能让外人 以为 C 就是 A,来进行通信,这里是不需要管 C 输出到 B ,B 的输出运达 C 这一端的!!!
至于 A 是怎么 和 B 建立起通信的,B 是怎么和 A 建立起 通信的,这个 我们不考虑。

先理解上面的逻辑很有必要。
说说 设计思路:
1、 python 脚本.py uv XXX.py XXX XXX 中,【uv XXX.py XXX XXX】是被包裹的A ,我们在 【脚本.py】中开启一个 子进程 来实现它。
2、我们在 【脚本.py】中 开启三个 线程 来实现 对 A 正常输入、输出和错误输出的 拦截对接;

3、python 脚本.py uv XXX.py XXX XXX 就是 前面逻辑的C。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
vim mcp_logger.py
------------------------ mcp_logger.py -----------------------
#!/usr/bin/env python3

import sys
import subprocess
import threading
import argparse
import os

# --- Configuration ---
LOG_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), "mcp_io.log")
# --- End Configuration ---

# --- Argument Parsing ---
parser = argparse.ArgumentParser(
description="Wrap a command, passing STDIN/STDOUT verbatim while logging them.",
usage="%(prog)s <command> [args...]"
)
# Capture the command and all subsequent arguments
parser.add_argument('command', nargs=argparse.REMAINDER,
help='The command and its arguments to execute.')

open(LOG_FILE, 'w', encoding='utf-8')

if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)

args = parser.parse_args()

if not args.command:
print("Error: No command provided.", file=sys.stderr)
parser.print_help(sys.stderr)
sys.exit(1)

target_command = args.command
# --- End Argument Parsing ---

# --- I/O Forwarding Functions ---
# These will run in separate threads

def forward_and_log_stdin(proxy_stdin, target_stdin, log_file):
"""Reads from proxy's stdin, logs it, writes to target's stdin."""
try:
while True:
# Read line by line from the script's actual stdin
line_bytes = proxy_stdin.readline()
if not line_bytes: # EOF reached
break

# Decode for logging (assuming UTF-8, adjust if needed)
try:
line_str = line_bytes.decode('utf-8')
except UnicodeDecodeError:
line_str = f"[Non-UTF8 data, {len(line_bytes)} bytes]\n" # Log representation

# Log with prefix
log_file.write(f"输入: {line_str}")
log_file.flush() # Ensure log is written promptly

# Write the original bytes to the target process's stdin
target_stdin.write(line_bytes)
target_stdin.flush() # Ensure target receives it promptly

except Exception as e:
# Log errors happening during forwarding
try:
log_file.write(f"!!! STDIN Forwarding Error: {e}\n")
log_file.flush()
except: pass # Avoid errors trying to log errors if log file is broken

finally:
# Important: Close the target's stdin when proxy's stdin closes
# This signals EOF to the target process (like test.sh's read loop)
try:
target_stdin.close()
log_file.write("--- STDIN stream closed to target ---\n")
log_file.flush()
except Exception as e:
try:
log_file.write(f"!!! Error closing target STDIN: {e}\n")
log_file.flush()
except: pass


def forward_and_log_stdout(target_stdout, proxy_stdout, log_file):
"""Reads from target's stdout, logs it, writes to proxy's stdout."""
try:
while True:
# Read line by line from the target process's stdout
line_bytes = target_stdout.readline()
if not line_bytes: # EOF reached (process exited or closed stdout)
break

# Decode for logging
try:
line_str = line_bytes.decode('utf-8')
except UnicodeDecodeError:
line_str = f"[Non-UTF8 data, {len(line_bytes)} bytes]\n"

# Log with prefix
log_file.write(f"输出: {line_str}")
log_file.flush()

# Write the original bytes to the script's actual stdout
proxy_stdout.write(line_bytes)
proxy_stdout.flush() # Ensure output is seen promptly

except Exception as e:
try:
log_file.write(f"!!! STDOUT Forwarding Error: {e}\n")
log_file.flush()
except: pass
finally:
try:
log_file.flush()
except: pass
# Don't close proxy_stdout (sys.stdout) here

# --- Main Execution ---
process = None
log_f = None
exit_code = 1 # Default exit code in case of early failure

try:
# Open log file in append mode ('a') for the threads
log_f = open(LOG_FILE, 'a', encoding='utf-8')

# Start the target process
# We use pipes for stdin/stdout
# We work with bytes (bufsize=0 for unbuffered binary, readline() still works)
# stderr=subprocess.PIPE could be added to capture stderr too if needed.
process = subprocess.Popen(
target_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, # Capture stderr too, good practice
bufsize=0 # Use 0 for unbuffered binary I/O
)

# Pass binary streams to threads
stdin_thread = threading.Thread(
target=forward_and_log_stdin,
args=(sys.stdin.buffer, process.stdin, log_f),
daemon=True # Allows main thread to exit even if this is stuck (e.g., waiting on stdin) - reconsider if explicit join is needed
)

stdout_thread = threading.Thread(
target=forward_and_log_stdout,
args=(process.stdout, sys.stdout.buffer, log_f),
daemon=True
)

# Optional: Handle stderr similarly (log and pass through)
stderr_thread = threading.Thread(
target=forward_and_log_stdout, # Can reuse the function
args=(process.stderr, sys.stderr.buffer, log_f), # Pass stderr streams
# Add a different prefix in the function if needed, or modify function
# For now, it will log with "STDOUT:" prefix - might want to change function
# Let's modify the function slightly for this
daemon=True
)
# A slightly modified version for stderr logging
def forward_and_log_stderr(target_stderr, proxy_stderr, log_file):
"""Reads from target's stderr, logs it, writes to proxy's stderr."""
try:
while True:
line_bytes = target_stderr.readline()
if not line_bytes: break
try: line_str = line_bytes.decode('utf-8')
except UnicodeDecodeError: line_str = f"[Non-UTF8 data, {len(line_bytes)} bytes]\n"
log_file.write(f"STDERR: {line_str}") # Use STDERR prefix
log_file.flush()
proxy_stderr.write(line_bytes)
proxy_stderr.flush()
except Exception as e:
try:
log_file.write(f"!!! STDERR Forwarding Error: {e}\n")
log_file.flush()
except: pass
finally:
try:
log_file.flush()
except: pass

stderr_thread = threading.Thread(
target=forward_and_log_stderr,
args=(process.stderr, sys.stderr.buffer, log_f),
daemon=True
)


# Start the forwarding threads
stdin_thread.start()
stdout_thread.start()
stderr_thread.start() # Start stderr thread too

# Wait for the target process to complete
process.wait()
exit_code = process.returncode

# Wait briefly for I/O threads to finish flushing last messages
# Since they are daemons, they might exit abruptly with the main thread.
# Joining them ensures cleaner shutdown and logging.
# We need to make sure the pipes are closed so the reads terminate.
# process.wait() ensures target process is dead, pipes should close naturally.
stdin_thread.join(timeout=1.0) # Add timeout in case thread hangs
stdout_thread.join(timeout=1.0)
stderr_thread.join(timeout=1.0)


except Exception as e:
print(f"MCP Logger Error: {e}", file=sys.stderr)
# Try to log the error too
if log_f and not log_f.closed:
try:
log_f.write(f"!!! MCP Logger Main Error: {e}\n")
log_f.flush()
except: pass # Ignore errors during final logging attempt
exit_code = 1 # Indicate logger failure

finally:
# Ensure the process is terminated if it's still running (e.g., if logger crashed)
if process and process.poll() is None:
try:
process.terminate()
process.wait(timeout=1.0) # Give it a moment to terminate
except: pass # Ignore errors during cleanup
if process.poll() is None: # Still running?
try: process.kill() # Force kill
except: pass # Ignore kill errors

# Final log message
if log_f and not log_f.closed:
try:
log_f.close()
except: pass # Ignore errors during final logging attempt

# Exit with the target process's exit code
sys.exit(exit_code)
------------------------ mcp_logger.py -----------------------