asyncdefmake_nws_request(url: str) -> dict[str, Any] | None: """Make a request to the NWS API with proper error handling.""" headers = { "User-Agent": USER_AGENT, "Accept": "application/geo+json" } asyncwith httpx.AsyncClient() as client: try: response = await client.get(url, headers=headers, timeout=30.0) response.raise_for_status() return response.json() except Exception: returnNone
defformat_alert(feature: dict) -> str: """Format an alert feature into a readable string.""" props = feature["properties"] returnf""" Event: {props.get('event', 'Unknown')} Area: {props.get('areaDesc', 'Unknown')} Severity: {props.get('severity', 'Unknown')} Description: {props.get('description', 'No description available')} Instructions: {props.get('instruction', 'No specific instructions provided')} """
@mcp.tool() asyncdefget_alerts(state: str) -> str: """Get weather alerts for a US state. Args: state: Two-letter US state code (e.g. CA, NY) """ url = f"{NWS_API_BASE}/alerts/active/area/{state}" data = await make_nws_request(url)
ifnot data or"features"notin data: return"Unable to fetch alerts or no alerts found."
ifnot data["features"]: return"No active alerts for this state."
alerts = [format_alert(feature) for feature in data["features"]] return"\n---\n".join(alerts)
@mcp.tool() asyncdefget_forecast(latitude: float, longitude: float) -> str: """Get weather forecast for a location. Args: latitude: Latitude of the location longitude: Longitude of the location """ # First get the forecast grid endpoint points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}" points_data = await make_nws_request(points_url)
ifnot points_data: return"Unable to fetch forecast data for this location."
# Get the forecast URL from the points response forecast_url = points_data["properties"]["forecast"] forecast_data = await make_nws_request(forecast_url)
ifnot forecast_data: return"Unable to fetch detailed forecast."
# Format the periods into a readable forecast periods = forecast_data["properties"]["periods"] forecasts = [] for period in periods[:5]: # Only show next 5 periods forecast = f""" {period['name']}: Temperature: {period['temperature']}°{period['temperatureUnit']} Wind: {period['windSpeed']}{period['windDirection']} Forecast: {period['detailedForecast']} """ forecasts.append(forecast)
return"\n---\n".join(forecasts)
if __name__ == "__main__": # Initialize and run the server mcp.run(transport='stdio') # 使用标准输入输出作为通信通道。
# --- Argument Parsing --- parser = argparse.ArgumentParser( description="Wrap a command, passing STDIN/STDOUT verbatim while logging them.", usage="%(prog)s <command> [args...]" ) # Capture the command and all subsequent arguments parser.add_argument('command', nargs=argparse.REMAINDER, help='The command and its arguments to execute.')
ifnot args.command: print("Error: No command provided.", file=sys.stderr) parser.print_help(sys.stderr) sys.exit(1)
target_command = args.command # --- End Argument Parsing ---
# --- I/O Forwarding Functions --- # These will run in separate threads
defforward_and_log_stdin(proxy_stdin, target_stdin, log_file): """Reads from proxy's stdin, logs it, writes to target's stdin.""" try: whileTrue: # Read line by line from the script's actual stdin line_bytes = proxy_stdin.readline() ifnot line_bytes: # EOF reached break
# Log with prefix log_file.write(f"输入: {line_str}") log_file.flush() # Ensure log is written promptly
# Write the original bytes to the target process's stdin target_stdin.write(line_bytes) target_stdin.flush() # Ensure target receives it promptly
except Exception as e: # Log errors happening during forwarding try: log_file.write(f"!!! STDIN Forwarding Error: {e}\n") log_file.flush() except: pass# Avoid errors trying to log errors if log file is broken
finally: # Important: Close the target's stdin when proxy's stdin closes # This signals EOF to the target process (like test.sh's read loop) try: target_stdin.close() log_file.write("--- STDIN stream closed to target ---\n") log_file.flush() except Exception as e: try: log_file.write(f"!!! Error closing target STDIN: {e}\n") log_file.flush() except: pass
defforward_and_log_stdout(target_stdout, proxy_stdout, log_file): """Reads from target's stdout, logs it, writes to proxy's stdout.""" try: whileTrue: # Read line by line from the target process's stdout line_bytes = target_stdout.readline() ifnot line_bytes: # EOF reached (process exited or closed stdout) break
# Log with prefix log_file.write(f"输出: {line_str}") log_file.flush()
# Write the original bytes to the script's actual stdout proxy_stdout.write(line_bytes) proxy_stdout.flush() # Ensure output is seen promptly
except Exception as e: try: log_file.write(f"!!! STDOUT Forwarding Error: {e}\n") log_file.flush() except: pass finally: try: log_file.flush() except: pass # Don't close proxy_stdout (sys.stdout) here
# --- Main Execution --- process = None log_f = None exit_code = 1# Default exit code in case of early failure
try: # Open log file in append mode ('a') for the threads log_f = open(LOG_FILE, 'a', encoding='utf-8')
# Start the target process # We use pipes for stdin/stdout # We work with bytes (bufsize=0 for unbuffered binary, readline() still works) # stderr=subprocess.PIPE could be added to capture stderr too if needed. process = subprocess.Popen( target_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, # Capture stderr too, good practice bufsize=0# Use 0 for unbuffered binary I/O )
# Pass binary streams to threads stdin_thread = threading.Thread( target=forward_and_log_stdin, args=(sys.stdin.buffer, process.stdin, log_f), daemon=True# Allows main thread to exit even if this is stuck (e.g., waiting on stdin) - reconsider if explicit join is needed )
# Optional: Handle stderr similarly (log and pass through) stderr_thread = threading.Thread( target=forward_and_log_stdout, # Can reuse the function args=(process.stderr, sys.stderr.buffer, log_f), # Pass stderr streams # Add a different prefix in the function if needed, or modify function # For now, it will log with "STDOUT:" prefix - might want to change function # Let's modify the function slightly for this daemon=True ) # A slightly modified version for stderr logging defforward_and_log_stderr(target_stderr, proxy_stderr, log_file): """Reads from target's stderr, logs it, writes to proxy's stderr.""" try: whileTrue: line_bytes = target_stderr.readline() ifnot line_bytes: break try: line_str = line_bytes.decode('utf-8') except UnicodeDecodeError: line_str = f"[Non-UTF8 data, {len(line_bytes)} bytes]\n" log_file.write(f"STDERR: {line_str}") # Use STDERR prefix log_file.flush() proxy_stderr.write(line_bytes) proxy_stderr.flush() except Exception as e: try: log_file.write(f"!!! STDERR Forwarding Error: {e}\n") log_file.flush() except: pass finally: try: log_file.flush() except: pass
# Start the forwarding threads stdin_thread.start() stdout_thread.start() stderr_thread.start() # Start stderr thread too
# Wait for the target process to complete process.wait() exit_code = process.returncode
# Wait briefly for I/O threads to finish flushing last messages # Since they are daemons, they might exit abruptly with the main thread. # Joining them ensures cleaner shutdown and logging. # We need to make sure the pipes are closed so the reads terminate. # process.wait() ensures target process is dead, pipes should close naturally. stdin_thread.join(timeout=1.0) # Add timeout in case thread hangs stdout_thread.join(timeout=1.0) stderr_thread.join(timeout=1.0)
except Exception as e: print(f"MCP Logger Error: {e}", file=sys.stderr) # Try to log the error too if log_f andnot log_f.closed: try: log_f.write(f"!!! MCP Logger Main Error: {e}\n") log_f.flush() except: pass# Ignore errors during final logging attempt exit_code = 1# Indicate logger failure
finally: # Ensure the process is terminated if it's still running (e.g., if logger crashed) if process and process.poll() isNone: try: process.terminate() process.wait(timeout=1.0) # Give it a moment to terminate except: pass# Ignore errors during cleanup if process.poll() isNone: # Still running? try: process.kill() # Force kill except: pass# Ignore kill errors
# Final log message if log_f andnot log_f.closed: try: log_f.close() except: pass# Ignore errors during final logging attempt
# Exit with the target process's exit code sys.exit(exit_code) ------------------------ mcp_logger.py -----------------------
输入:{"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{}, "clientInfo":{"name":"Cline","version":"3.13.3"}},"jsonrpc":"2.0","id":0} 输出:{"jsonrpc":"2.0","id":0,"result":{"protocolVersion":"2024-11-05", "capabilities":{"experimental":{},"prompts":{"listChanged":false}, "resources":{"subscribe":false,"listChanged":false},"tools":{"listChanged":false}}, "serverInfo":{"name":"weather","version":"1.6.0"}}} 输入:{"method":"notifications/initialized","jsonrpc":"2.0"} 输入:{"method":"tools/list","jsonrpc":"2.0","id":1} 输出:{"jsonrpc":"2.0","id":1,"result":{"tools":[{"name":"get_alerts", "description":"Get weather alerts for a US state.\n\n Args:\n state: Two-letter US state code (e.g. CA, NY)\n ","inputSchema":{"properties":{"state":{"title":"State","type":"string"}},"required":["state"],"title":"get_alertsArguments","type":"object"}},{"name":"get_forecast","description":"Get weather forecast for a location.\n\n Args:\n latitude: Latitude of the location\n longitude: Longitude of the location\n ","inputSchema":{"properties":{"latitude":{"title":"Latitude","type":"number"},"longitude":{"title":"Longitude","type":"number"}}, "required":["latitude","longitude"],"title":"get_forecastArguments","type":"object"}}]}} 输入:{"method":"resources/list","jsonrpc":"2.0","id":2} 输出:{"jsonrpc":"2.0","id":2,"result":{"resources":[]}} 输入:{"method":"resources/templates/list","jsonrpc":"2.0","id":3} 输出:{"jsonrpc":"2.0","id":3,"result":{"resourceTemplates":[]}}
输入:{"method":"tools/call","params":{"name":"get_forecast","arguments":{"latitude":37.7749,"longitude":-122.4194}},"jsonrpc":"2.0","id":4} 输出:{"jsonrpc":"2.0","id":4,"result":{"content":[{"type":"text","text":"\nToday:\nTemperature: 60°F\nWind: 6 to 10 mph W\nForecast: Partly sunny, with a high near 60. West wind 6 to 10 mph.\n\n---\n\nTonight:\nTemperature: 49°F\nWind: 2 to 10 mph SW\nForecast: Patchy fog after 3am. Mostly clear, with a low around 49. Southwest wind 2 to 10 mph.\n\n---\n\nMonday:\nTemperature: 65°F\nWind: 1 to 10 mph SW\nForecast: Patchy fog before 8am. Mostly sunny. High near 65, with temperatures falling to around 63 in the afternoon. Southwest wind 1 to 10 mph.\n\n---\n\nMonday Night:\nTemperature: 51°F\nWind: 2 to 10 mph WSW\nForecast: Patchy fog after 4am. Mostly clear, with a low around 51. West southwest wind 2 to 10 mph.\n\n---\n\nTuesday:\nTemperature: 67°F\nWind: 2 to 9 mph SW\nForecast: Patchy fog before 7am. Mostly sunny, with a high near 67. Southwest wind 2 to 9 mph.\n"}],"isError":false}} 输入:{"method":"tools/call","params":{"name":"get_forecast","arguments":{"latitude":37.7749,"longitude":-122.4194}},"jsonrpc":"2.0","id":5} 输出:{"jsonrpc":"2.0","id":5,"result":{"content":[{"type":"text","text":"\nToday:\nTemperature: 60°F\nWind: 6 to 10 mph W\nForecast: Partly sunny, with a high near 60. West wind 6 to 10 mph.\n\n---\n\nTonight:\nTemperature: 49°F\nWind: 2 to 10 mph SW\nForecast: Patchy fog after 3am. Mostly clear, with a low around 49. Southwest wind 2 to 10 mph.\n\n---\n\nMonday:\nTemperature: 65°F\nWind: 1 to 10 mph SW\nForecast: Patchy fog before 8am. Mostly sunny. High near 65, with temperatures falling to around 63 in the afternoon. Southwest wind 1 to 10 mph.\n\n---\n\nMonday Night:\nTemperature: 51°F\nWind: 2 to 10 mph WSW\nForecast: Patchy fog after 4am. Mostly clear, with a low around 51. West southwest wind 2 to 10 mph.\n\n---\n\nTuesday:\nTemperature: 67°F\nWind: 2 to 9 mph SW\nForecast: Patchy fog before 7am. Mostly sunny, with a high near 67. Southwest wind 2 to 9 mph.\n"}],"isError":false}}
# 先启动 MCP server (weather) (chen) MacBook-Pro-2:weather chenyushao$ /Users/chenyushao/.local/bin/uv --directory /Users/chenyushao/mcp_server/weather run weather.py
我们用 前文中的json格式,自己改一个 和 MCP Host 交互
1 2 3 4 5 6 7
(weather) (chen) MacBook-Pro-2:weather chenyushao$ /Users/chenyushao/.local/bin/uv --directory /Users/chenyushao/mcp_server/weather run weather.py {"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"chenyushao","version":"3.13.3"}},"jsonrpc":"2.0","id":0} {"jsonrpc":"2.0","id":0,"result":{"protocolVersion":"2024-11-05","capabilities":{"experimental":{},"prompts":{"listChanged":false},"resources":{"subscribe":false,"listChanged":false},"tools":{"listChanged":false}},"serverInfo":{"name":"weather","version":"1.6.0"}}} {"method":"notifications/initialized","jsonrpc":"2.0"} {"method":"tools/list","jsonrpc":"2.0","id":1} {"jsonrpc":"2.0","id":1,"result":{"tools":[{"name":"get_alerts","description":"Get weather alerts for a US state.\n\n Args:\n state: Two-letter US state code (e.g. CA, NY)\n ","inputSchema":{"properties":{"state":{"title":"State","type":"string"}},"required":["state"],"title":"get_alertsArguments","type":"object"}},{"name":"get_forecast","description":"Get weather forecast for a location.\n\n Args:\n latitude: Latitude of the location\n longitude: Longitude of the location\n ","inputSchema":{"properties":{"latitude":{"title":"Latitude","type":"number"},"longitude":{"title":"Longitude","type":"number"}},"required":["latitude","longitude"],"title":"get_forecastArguments","type":"object"}}]}}