共计 6524 个字符,预计需要花费 17 分钟才能阅读完成。
平常会把新装服务器一些必须用到的比如 nginx、nezha-agent 放在一个文件夹,装了系统以后一般直接拷贝,简单修改一些配置后就可以使用 docker-compose 直接启动。然后就可以准备搭建服务,创建 nginx 配置文件,接下来就需要域名解析了。
由于域名托管在 cloudflare,每次都要登录,然后进入域名页面才能配置解析,非常麻烦。于是想着能不能写一个 python 文件,方便进行解析。马上动手让 gpt 写了如下的代码,再用 pyinstaller 编译成二进制文件,效果如图:
使用方式如下:
╭─root@hausen1012 ~
╰─# cf
用法: dns <action> [domain] [ip_or_target_domain]
可用操作: create(c), update(u), delete(d), query(q), query_all(qa)
首先需要创建 python 文件 cf.py
,这里 CF_API_TOKEN
为了方便也可以写死再编译,由于我这里的话,文件夹拷贝过去还需要运行 shell 脚本,我就索性让 shell 脚本帮我写入环境变量了。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import requests
import sys
import re
# -------- 配置区 --------
CF_API_TOKEN = os.getenv("CF_API_TOKEN")
CF_API_BASE = os.getenv("CF_API_BASE", "https://api.cloudflare.com/client/v4")
PUBLIC_IP_URL = os.getenv("PUBLIC_IP_URL", "https://ip.3322.net/")
if not CF_API_TOKEN:
print("❌ 请在代码或环境变量中设置 CF_API_TOKEN")
sys.exit(1)
# -----------------------
def get_public_ip():
"""获取公网IP"""
try:
ip = requests.get(PUBLIC_IP_URL, timeout=5).text.strip()
return ip
except Exception as e:
print(f"获取公网IP失败: {e}")
sys.exit(1)
def is_ipv4(value):
return bool(re.match(r'^(\d{1,3}\.){3}\d{1,3}$', value))
def is_ipv6(value):
return bool(re.match(r'^[0-9a-fA-F:]+$', value)) and ":" in value
def get_zone_id(domain):
"""根据域名获取Zone ID"""
headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}
parts = domain.split('.')
for i in range(len(parts)-1):
zone_name = '.'.join(parts[i:])
resp = requests.get(f"{CF_API_BASE}/zones?name={zone_name}", headers=headers).json()
if resp.get("success") and resp.get("result"):
return resp["result"][0]["id"]
print(f"未找到域名 {domain} 的Zone ID")
sys.exit(1)
def list_zones():
"""列出账户下所有域名 (Zone)"""
headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}
page = 1
per_page = 50
zones = []
while True:
resp = requests.get(f"{CF_API_BASE}/zones?page={page}&per_page={per_page}", headers=headers).json()
if not resp.get("success"):
print(f"获取Zone失败: {resp}")
return []
zones.extend(resp.get("result", []))
if page >= resp.get("result_info", {}).get("total_pages", 1):
break
page += 1
return zones
def get_dns_record(zone_id, record_name):
"""获取指定DNS记录"""
headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}
page = 1
per_page = 100
while True:
resp = requests.get(
f"{CF_API_BASE}/zones/{zone_id}/dns_records?name={record_name}&page={page}&per_page={per_page}",
headers=headers).json()
if not resp.get("success"):
return None
records = resp.get("result", [])
if records:
return records[0] # 返回第一个匹配的记录
if page >= resp.get("result_info", {}).get("total_pages", 1):
break
page += 1
return None
def create_or_update_dns(zone_id, record_name, target, record_type="A"):
"""创建或更新DNS记录"""
headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}
record = get_dns_record(zone_id, record_name)
data = {"type": record_type, "name": record_name, "content": target, "ttl": 120, "proxied": False}
if record:
record_id = record["id"]
resp = requests.put(f"{CF_API_BASE}/zones/{zone_id}/dns_records/{record_id}", json=data, headers=headers).json()
print(f"{'更新成功' if resp.get('success') else '更新失败'}: {record_name} -> {target} ({record_type})")
else:
resp = requests.post(f"{CF_API_BASE}/zones/{zone_id}/dns_records", json=data, headers=headers).json()
print(f"{'创建成功' if resp.get('success') else '创建失败'}: {record_name} -> {target} ({record_type})")
def delete_dns(zone_id, record_name):
"""删除DNS记录"""
headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}
record = get_dns_record(zone_id, record_name)
if not record:
print(f"未找到 {record_name} 的DNS记录")
return
record_id = record["id"]
resp = requests.delete(f"{CF_API_BASE}/zones/{zone_id}/dns_records/{record_id}", headers=headers).json()
print(f"{'删除成功' if resp.get('success') else '删除失败'}: {record_name}")
def query_dns(zone_id, record_name):
"""查询单个DNS记录"""
record = get_dns_record(zone_id, record_name)
if not record:
print(f"{record_name} 没有记录")
return
print(f"查询结果: {record_name} -> {record['content']} (Type: {record['type']}, TTL: {record['ttl']}, Proxied: {record['proxied']})")
def query_all_dns(zone_id):
"""查询Zone下所有DNS记录"""
headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}
page = 1
per_page = 100
all_records = []
while True:
resp = requests.get(f"{CF_API_BASE}/zones/{zone_id}/dns_records?page={page}&per_page={per_page}", headers=headers).json()
if not resp.get("success"):
print(f"查询失败: {resp}")
return
records = resp.get("result", [])
if not records:
break
all_records.extend(records)
if page >= resp.get("result_info", {}).get("total_pages", 1):
break
page += 1
if not all_records:
print("没有找到任何DNS记录")
return
print(f"Zone下所有DNS记录:")
for rec in all_records:
print(f" {rec['name']} -> {rec['content']} (Type: {rec['type']}, TTL: {rec['ttl']}, Proxied: {rec.get('proxied', '-')})")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: dns <action> [domain] [ip_or_target_domain]")
print("可用操作: create(c), update(u), delete(d), query(q), query_all(qa)")
sys.exit(1)
action = sys.argv[1].strip().lower()
domain = sys.argv[2].strip() if len(sys.argv) > 2 else None
third_arg = sys.argv[3].strip() if len(sys.argv) > 3 else None
action_map = {
"c": "create",
"u": "update",
"d": "delete",
"q": "query",
"qa": "query_all",
"create": "create",
"update": "update",
"delete": "delete",
"query": "query",
"query_all": "query_all"
}
action_full = action_map.get(action)
if not action_full:
print(f"无效操作: {action}")
print("可用操作: create(c), update(u), delete(d), query(q), query_all(qa)")
sys.exit(1)
# 新逻辑:q / qa 如果没有 domain,就查询所有 zone
if action_full in ["query", "query_all"] and not domain:
zones = list_zones()
if not zones:
print("没有找到任何Zone")
sys.exit(0)
for z in zones:
zid, zname = z["id"], z["name"]
print(f"=== Zone: {zname} ({zid}) ===")
if action_full == "query":
print(f"域名: {zname}, 状态: {z['status']}, 创建时间: {z['created_on']}")
elif action_full == "query_all":
query_all_dns(zid)
sys.exit(0)
# 其他操作必须有 domain
if not domain:
print(f"{action_full} 操作需要指定域名")
sys.exit(1)
# 获取 Zone ID
zone_id = get_zone_id(domain)
# 执行操作
if action_full in ["create", "update"]:
if third_arg:
if is_ipv4(third_arg):
record_type = "A"
target = third_arg
elif is_ipv6(third_arg):
record_type = "AAAA"
target = third_arg
else:
record_type = "CNAME"
target = third_arg
else:
record_type = "A"
target = get_public_ip()
print(f"使用{record_type}记录: {domain} -> {target}")
create_or_update_dns(zone_id, domain, target, record_type)
elif action_full == "delete":
delete_dns(zone_id, domain)
elif action_full == "query":
query_dns(zone_id, domain)
elif action_full == "query_all":
query_all_dns(zone_id)
将文件编译成二进制文件也非常简单,首先创建虚拟环境,否则可能报错 error: externally-managed-environment
。
Debian/Ubuntu 系列(尤其是 Debian 12 / Ubuntu 23.04+) 新加限制(PEP 668)。系统自带的 python3 环境是 “externally-managed”,不允许随便用 pip 往里面装包,避免破坏系统自带的软件。
python3 -m venv myenv
source myenv/bin/activate
然后安装 pyinstaller
和 pipreqs
,pipreqs
是为了生成 requirements.txt
,pyinstaller
用来生成二进制文件。
pip install pyinstaller pipreqs
安装依赖:
pipreqs ./ --encoding=utf8 --ignore myenv
pip install -r requirements.txt
编译成二进制文件:
pyinstaller --onefile cf.py
然后在 ./dist/
目录下就可以看见生成的二进制文件了。