Python 编写 cf 域名解析工具并编译成二进制可执行文件

70次阅读
没有评论

共计 6524 个字符,预计需要花费 17 分钟才能阅读完成。

平常会把新装服务器一些必须用到的比如 nginx、nezha-agent 放在一个文件夹,装了系统以后一般直接拷贝,简单修改一些配置后就可以使用 docker-compose 直接启动。然后就可以准备搭建服务,创建 nginx 配置文件,接下来就需要域名解析了。

由于域名托管在 cloudflare,每次都要登录,然后进入域名页面才能配置解析,非常麻烦。于是想着能不能写一个 python 文件,方便进行解析。马上动手让 gpt 写了如下的代码,再用 pyinstaller 编译成二进制文件,效果如图:

Python 编写 cf 域名解析工具并编译成二进制可执行文件

使用方式如下:

╭─root@hausen1012 ~
╰─# cf
用法: dns <action> [domain] [ip_or_target_domain]
可用操作: create(c), update(u), delete(d), query(q), query_all(qa)

首先需要创建 python 文件 cf.py,这里 CF_API_TOKEN 为了方便也可以写死再编译,由于我这里的话,文件夹拷贝过去还需要运行 shell 脚本,我就索性让 shell 脚本帮我写入环境变量了。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import requests
import sys
import re

# -------- 配置区 --------
CF_API_TOKEN = os.getenv("CF_API_TOKEN")
CF_API_BASE = os.getenv("CF_API_BASE", "https://api.cloudflare.com/client/v4")
PUBLIC_IP_URL = os.getenv("PUBLIC_IP_URL", "https://ip.3322.net/")

if not CF_API_TOKEN:
    print("❌ 请在代码或环境变量中设置 CF_API_TOKEN")
    sys.exit(1)

# -----------------------
def get_public_ip():
    """获取公网IP"""
    try:
        ip = requests.get(PUBLIC_IP_URL, timeout=5).text.strip()
        return ip
    except Exception as e:
        print(f"获取公网IP失败: {e}")
        sys.exit(1)

def is_ipv4(value):
    return bool(re.match(r'^(\d{1,3}\.){3}\d{1,3}$', value))

def is_ipv6(value):
    return bool(re.match(r'^[0-9a-fA-F:]+$', value)) and ":" in value

def get_zone_id(domain):
    """根据域名获取Zone ID"""
    headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}
    parts = domain.split('.')
    for i in range(len(parts)-1):
        zone_name = '.'.join(parts[i:])
        resp = requests.get(f"{CF_API_BASE}/zones?name={zone_name}", headers=headers).json()
        if resp.get("success") and resp.get("result"):
            return resp["result"][0]["id"]
    print(f"未找到域名 {domain} 的Zone ID")
    sys.exit(1)

def list_zones():
    """列出账户下所有域名 (Zone)"""
    headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}
    page = 1
    per_page = 50
    zones = []
    while True:
        resp = requests.get(f"{CF_API_BASE}/zones?page={page}&per_page={per_page}", headers=headers).json()
        if not resp.get("success"):
            print(f"获取Zone失败: {resp}")
            return []
        zones.extend(resp.get("result", []))
        if page >= resp.get("result_info", {}).get("total_pages", 1):
            break
        page += 1
    return zones

def get_dns_record(zone_id, record_name):
    """获取指定DNS记录"""
    headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}
    page = 1
    per_page = 100
    while True:
        resp = requests.get(
            f"{CF_API_BASE}/zones/{zone_id}/dns_records?name={record_name}&page={page}&per_page={per_page}",
            headers=headers).json()
        if not resp.get("success"):
            return None
        records = resp.get("result", [])
        if records:
            return records[0]  # 返回第一个匹配的记录
        if page >= resp.get("result_info", {}).get("total_pages", 1):
            break
        page += 1
    return None

def create_or_update_dns(zone_id, record_name, target, record_type="A"):
    """创建或更新DNS记录"""
    headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}
    record = get_dns_record(zone_id, record_name)
    data = {"type": record_type, "name": record_name, "content": target, "ttl": 120, "proxied": False}

    if record:
        record_id = record["id"]
        resp = requests.put(f"{CF_API_BASE}/zones/{zone_id}/dns_records/{record_id}", json=data, headers=headers).json()
        print(f"{'更新成功' if resp.get('success') else '更新失败'}: {record_name} -> {target} ({record_type})")
    else:
        resp = requests.post(f"{CF_API_BASE}/zones/{zone_id}/dns_records", json=data, headers=headers).json()
        print(f"{'创建成功' if resp.get('success') else '创建失败'}: {record_name} -> {target} ({record_type})")

def delete_dns(zone_id, record_name):
    """删除DNS记录"""
    headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}
    record = get_dns_record(zone_id, record_name)
    if not record:
        print(f"未找到 {record_name} 的DNS记录")
        return
    record_id = record["id"]
    resp = requests.delete(f"{CF_API_BASE}/zones/{zone_id}/dns_records/{record_id}", headers=headers).json()
    print(f"{'删除成功' if resp.get('success') else '删除失败'}: {record_name}")

def query_dns(zone_id, record_name):
    """查询单个DNS记录"""
    record = get_dns_record(zone_id, record_name)
    if not record:
        print(f"{record_name} 没有记录")
        return
    print(f"查询结果: {record_name} -> {record['content']} (Type: {record['type']}, TTL: {record['ttl']}, Proxied: {record['proxied']})")

def query_all_dns(zone_id):
    """查询Zone下所有DNS记录"""
    headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}
    page = 1
    per_page = 100
    all_records = []
    while True:
        resp = requests.get(f"{CF_API_BASE}/zones/{zone_id}/dns_records?page={page}&per_page={per_page}", headers=headers).json()
        if not resp.get("success"):
            print(f"查询失败: {resp}")
            return
        records = resp.get("result", [])
        if not records:
            break
        all_records.extend(records)
        if page >= resp.get("result_info", {}).get("total_pages", 1):
            break
        page += 1

    if not all_records:
        print("没有找到任何DNS记录")
        return

    print(f"Zone下所有DNS记录:")
    for rec in all_records:
        print(f"  {rec['name']} -> {rec['content']} (Type: {rec['type']}, TTL: {rec['ttl']}, Proxied: {rec.get('proxied', '-')})")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("用法: dns <action> [domain] [ip_or_target_domain]")
        print("可用操作: create(c), update(u), delete(d), query(q), query_all(qa)")
        sys.exit(1)

    action = sys.argv[1].strip().lower()
    domain = sys.argv[2].strip() if len(sys.argv) > 2 else None
    third_arg = sys.argv[3].strip() if len(sys.argv) > 3 else None

    action_map = {
        "c": "create",
        "u": "update",
        "d": "delete",
        "q": "query",
        "qa": "query_all",
        "create": "create",
        "update": "update",
        "delete": "delete",
        "query": "query",
        "query_all": "query_all"
    }

    action_full = action_map.get(action)
    if not action_full:
        print(f"无效操作: {action}")
        print("可用操作: create(c), update(u), delete(d), query(q), query_all(qa)")
        sys.exit(1)

    # 新逻辑:q / qa 如果没有 domain,就查询所有 zone
    if action_full in ["query", "query_all"] and not domain:
        zones = list_zones()
        if not zones:
            print("没有找到任何Zone")
            sys.exit(0)
        for z in zones:
            zid, zname = z["id"], z["name"]
            print(f"=== Zone: {zname} ({zid}) ===")
            if action_full == "query":
                print(f"域名: {zname}, 状态: {z['status']}, 创建时间: {z['created_on']}")
            elif action_full == "query_all":
                query_all_dns(zid)
        sys.exit(0)

    # 其他操作必须有 domain
    if not domain:
        print(f"{action_full} 操作需要指定域名")
        sys.exit(1)

    # 获取 Zone ID
    zone_id = get_zone_id(domain)

    # 执行操作
    if action_full in ["create", "update"]:
        if third_arg:
            if is_ipv4(third_arg):
                record_type = "A"
                target = third_arg
            elif is_ipv6(third_arg):
                record_type = "AAAA"
                target = third_arg
            else:
                record_type = "CNAME"
                target = third_arg
        else:
            record_type = "A"
            target = get_public_ip()

        print(f"使用{record_type}记录: {domain} -> {target}")
        create_or_update_dns(zone_id, domain, target, record_type)

    elif action_full == "delete":
        delete_dns(zone_id, domain)
    elif action_full == "query":
        query_dns(zone_id, domain)
    elif action_full == "query_all":
        query_all_dns(zone_id)

将文件编译成二进制文件也非常简单,首先创建虚拟环境,否则可能报错 error: externally-managed-environment

Debian/Ubuntu 系列(尤其是 Debian 12 / Ubuntu 23.04+) 新加限制(PEP 668)。系统自带的 python3 环境是 “externally-managed”,不允许随便用 pip 往里面装包,避免破坏系统自带的软件。

python3 -m venv myenv

source myenv/bin/activate

然后安装 pyinstallerpipreqspipreqs 是为了生成 requirements.txtpyinstaller 用来生成二进制文件。

pip install pyinstaller pipreqs

安装依赖:

pipreqs ./ --encoding=utf8 --ignore myenv

pip install -r requirements.txt

编译成二进制文件:

pyinstaller --onefile cf.py

然后在 ./dist/ 目录下就可以看见生成的二进制文件了。

AD:【腾讯云服务器大降价】2核4G 222元/3年 1核2G 38元/年
正文完
 0
阿蛮君
版权声明:本站原创文章,由 阿蛮君 于2025-08-23发表,共计6524字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
Copyright © 2023-2025 阿蛮君博客 湘ICP备2023001393号
本网站由 亿信互联 提供云计算服务 | 又拍云CDN 提供安全防护和加速服务
Powered by Wordpress  Theme by Puock