追梦人物❤️包子 博主
一直走在追梦的路上。

实战:CEX-DEX 稳定币套利监控程序开发

2025-12-1144 阅读0 评论

文章 《CEX-DEX 稳定币套利模型》 分析了套利机会的存在条件,本文将基于该模型的结论,实战开发一款套利机会监控程序。

根据套利模型,核心需求是获取两类价格数据:一是中心化交易所(CEX)的价格 \(p\),二是去中心化交易所(DEX)的价格 \(q\)。其中 CEX 普遍提供完善且统一的接口(通过下文介绍的 ccxt 库),能直接获取盘口报价;但 DEX 接口差异较大,一部分 DEX 会贴心地部署智能合约,供开发者直接查询报价行情;另一部分则未提供这类便捷服务,需开发者根据链上基础数据自行计算价格。

因此 CEX 我们选择币安的行情报价,而 DEX 我们选择了 2 个:

  • 以太链上 V3 协议的 Uniswap
  • Base 链上的 Aerodrome(应用层略有改动但底层仍然沿用 Uniswap V3 协议)

Uniswap 部署了 Quoter 智能合约,可以通过合约调用直接查询报价;而 Aerodrome 则没有可免费调用的报价合约,因此需要通过获取其资金池的流动性和最新价格,根据文章 《Uniswap 流动性机制及相关数学原理分析》 中介绍的相关公式自行计算报价。

核心 Python 库介绍

ccxt:CEX API 聚合工具

聚合全球主流中心化交易所(CEX)的行情与交易 API,提供统一的调用接口与交互逻辑。即便仅需与单一 CEX(如币安)交互,其封装后的 API 也比直接调用交易所官方 API 更简洁、易用,能大幅降低开发成本。

web3.py:区块链合约交互核心库

提供与区块链及智能合约交互的核心 API。可通过它与 Uniswap 报价合约、Aerodrome 资金池合约等进行数据交互,例如查询合约当前状态、读取关键业务数据(如价格、 liquidity 等),是链上数据获取与合约交互的基础工具。

其他辅助库

hexbytes:专门用于处理区块链场景中的十六进制字节数据(如合约地址、交易哈希的字节格式),解决该类数据的编码、解码与格式转换问题。

项目初始化:环境搭建与依赖配置

现在开始初始化项目并安装上面介绍的依赖,推荐使用 uv

# 创建项目
uv init stablecoin-arbitrage-monitor
cd stablecoin-arbitrage-monitor

# 添加依赖
uv add ccxt web3 hexbytes

依赖安装完成后,核心开发流程可分为“行情数据获取 → 套利利润计算 → 循环监控”三步,我们先从 DEX 与 CEX 的行情获取开始。

行情数据获取:DEX 与 CEX 实现方案

Uniswap:通过 Quoter 合约直接获取报价

先来看 Uniswap Quoter 智能合约,它包含多个功能方法,其中 quoteExactInput 是我们需要重点关注的方法。

该方法需接收一个关键参数 path(即“交易路径”),其格式为 输入代币地址 + 交易费率 + 输出代币地址 的二进制编码。这里的“交易费率”指 Uniswap 资金池对应的费率(Uniswap V3 常见费率为 0.01%、0.05%、0.3%、1%,需与目标资金池费率匹配)。

举个例子:在我们的程序中,交易路径可设置为两种场景:一种是 (USDT 地址 + 目标资金池费率 + USDC 地址(用 USDT 兑换 USDC),另一种是 USDC 地址 + 目标资金池费率 + USDT 地址(用 USDC 兑换 USDT)。

所以我们需要 USDT 和 USDC 的合约地址,以及 USDC/USDT 资金池的交易费率。这些数据通常是固定的,我们可通过区块链浏览器查询后,将其硬编码到代码中:

UNISWAP_USDC_ADDR = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"  # USDC 合约地址
UNISWAP_USDT_ADDR = "0xdAC17F958D2ee523a2206206994597C13D831ec7"  # USDT 合约地址
UNISWAP_USDC_USDT_POOL_FEE = 500  # USDC/USDT 资金池交易费率(0.05%)

现在实现一个辅助函数,以代币合约地址和 fee 为参数,生成输入参数 path

def construct_path(token_in, token_out, fee):
    """
    构造 Uniswap V3 Quoter 合约 `quoteExactInput` 的参数 `path`

    Args:
        token_in (str): 输入代币的合约地址
        token_out (str): 输出代币的合约地址
        fee (int): 交易池的费率(以 basis points 为单位,如 500 表示 0.05%)

    Returns:
        HexBytes: 编码后的交易路径
    """
    # 去掉地址前缀 "0x",将地址转换为字节数组
    token_in_bytes = Web3.to_bytes(hexstr=token_in[2:])
    token_out_bytes = Web3.to_bytes(hexstr=token_out[2:])

    # 将费率转换为 3 字节的字节数组(区块链上的协议字节序大多采用大端序)
    fee_bytes = fee.to_bytes(3, byteorder="big")

    # 按照格式拼接:tokenIn + fee + tokenOut
    path_bytes = token_in_bytes + fee_bytes + token_out_bytes

    return hexbytes.HexBytes(path_bytes)

为了与 Quoter 合约交互,我们需先连接以太坊网络中的某个节点。部分节点会对外提供服务,支持通过 JSON RPC 协议进行交互(例如查询区块链的当前或历史状态、发送交易等操作)。web3.py 已封装好上述交互逻辑,因此我们只需直接实例化合约对象即可。

UNISWAP_RPC_URL = "https://ethereum-rpc.publicnode.com"
UNISWAP_QUOTER_ADDR = "0x5e55C9e631FAE526cd4B0526C4818D6e0a9eF0e3"
UNISWAP_QUOTER_ABI = """[
    {
        "inputs": [
            {"internalType": "bytes", "name": "path", "type": "bytes"},
            {"internalType": "uint256", "name": "amountIn", "type": "uint256"}
        ],
        "name": "quoteExactInput",
        "outputs": [
            {"internalType": "uint256", "name": "amountOut", "type": "uint256"},
            {"internalType": "uint160[]", "name": "sqrtPriceX96AfterList", "type": "uint160[]"},
            {"internalType": "uint32[]", "name": "initializedTicksCrossedList", "type": "uint32[]"},
            {"internalType": "uint256", "name": "gasEstimate", "type": "uint256"}
        ],
        "stateMutability": "view",
        "type": "function"
    }
]"""

def get_uniswap_quoter_contract():
    """获取 Uniswap Quoter 合约实例"""
    w3 = Web3(Web3.HTTPProvider(UNISWAP_RPC_URL))
    contract = w3.eth.contract(address=UNISWAP_QUOTER_ADDR, abi=UNISWAP_QUOTER_ABI)
    return contract

这里我们通过 PublicNode 提供的 RPC 节点连接以太坊网络,该服务为免费性质,且限流政策相对宽松,非常适合做开发测试的节点。

UNISWAP_QUOTER_ABI 定义了 Quoter 合约中 quoteExactInput 方法的接口规范。web3.py 需依赖这一规范,才能明确如何与 Quoter 合约交互(比如识别接口名称、参数类型及返回结果类型)。

万事俱备,Uniswap 的报价查询非常简单:

# 智能合约中代币数量以整数形式存储,decimals 用于定义小数位数,
# 作用是将人类可读的十进制小数转换为整数(通过乘以 10^decimals 消除小数),或反向还原,同时保证数值精度。
# USDC 和 USDT 的 decimals 都等于 6,也就是说假设 USDC 的数量是 1000.5,智能合约中实际存储的值是 1000.5*10^6 = 1000500000
DECIMALS = 6

def uniswap_quote(contract, token_in, token_out, amount_in):
    """
    查询 Uniswap 上的代币兑换汇率

    Args:
        contract: Uniswap Quoter 合约实例
        token_in: 输入代币合约地址
        token_out: 输出代币合约地址
        amount_in: 输入数量(以 USDT/USDC 为单位)

    Returns:
        float: 兑换汇率(输出数量/输入数量)
    """
    # 构造 `quoteExactInput` 的参数 `path`
    path = construct_path(token_in, token_out, fee=UNISWAP_USDC_USDT_POOL_FEE)

    # 转换为链上精度,见 `DECIMALS` 的说明
    in_amt = int(amount_in * 10**DECIMALS)

    # 调用合约查询输出数量
    res = contract.functions.quoteExactInput(path, in_amt).call()
    out_amt = res[0]

    # 返回汇率,实际应为 (out_amt / 10^DECIMALS) / (in_amt / 10^DECIMALS),输入输出代币的 decimals 相等,所以相互抵消了
    return out_amt / in_amt

Aerodrome:基于流动性数据手动计算报价

Aerodrome 获取行情报价的过程相对复杂,它未提供可免费调用的智能合约,因此需要自行计算报价。其计算原理基于《Uniswap 流动性机制及相关数学原理分析》中介绍的 Uniswap V3 数学模型,具体需依据文中公式 (38) 进行计算,为此我们需要先获取以下信息:

  • 资金池当前流动性 \(L\)
  • 资金池当前价格 \(P\)

同样需要构建合约对象,以实现与 Aerodrome 资金池合约的交互:

AERODROM_RPC_URL = "https://base-rpc.publicnode.com"
AERODROM_USDC_USDT_POOL_ADDR = "0xa41Bc0AFfbA7Fd420d186b84899d7ab2aC57fcD1"
AERODROM_POOL_ABI = """[
    {
        "inputs": [],
        "name": "liquidity",
        "outputs": [{ "internalType": "uint128", "name": "", "type": "uint128" }],
        "stateMutability": "view",
        "type": "function"
    },
    {
        "inputs": [],
        "name": "slot0",
        "outputs": [
        { "internalType": "uint160", "name": "sqrtPriceX96", "type": "uint160" },
        { "internalType": "int24", "name": "tick", "type": "int24" },
        {
            "internalType": "uint16",
            "name": "observationIndex",
            "type": "uint16"
        },
        {
            "internalType": "uint16",
            "name": "observationCardinality",
            "type": "uint16"
        },
        {
            "internalType": "uint16",
            "name": "observationCardinalityNext",
            "type": "uint16"
        },
        { "internalType": "bool", "name": "unlocked", "type": "bool" }
        ],
        "stateMutability": "view",
        "type": "function"
    }
]"""

def get_aerodrom_pool_contract():
    """获取 Aerodrome 资金池合约实例"""
    w3 = Web3(Web3.HTTPProvider(AERODROM_RPC_URL))
    contract = w3.eth.contract(
        address=AERODROM_USDC_USDT_POOL_ADDR, abi=AERODROM_POOL_ABI
    )
    return contract

然后我们先依据 Uniswap V3 的数学公式,计算出当输入指定交易量 amount_in 时,价格会被推动至何种水平;再基于这一更新后的价格,进一步算出实际能获得的输出量。

首先需完成的核心计算是:在给定交易输入量 amount_in 的前提下,确定价格会被推到的具体位置。

# Uniswap V3 Q64.96 定点数格式表示价格,具体的原理我们后续会写文章进行详细介绍
Q96 = 2**96

def get_next_sqrt_price_from_input(
    sqrt_p_x96,
    liquidity,
    amount_in,
    zero_for_one,
):
    """
    根据 Uniswap V3 公式计算输入后的新价格

    Args:
        sqrt_p_x96: 当前价格的平方根(Q64.96 定点数格式)
        liquidity: 当前流动性
        amount_in: 输入数量(已扣除手续费)
        zero_for_one: 是否以 token0 换 token1

    Returns:
        int: 新价格的平方根(Q64.96 定点数格式)
    """
    if zero_for_one:
        # token0 -> token1 的价格计算,依据公式 (14)
        numerator1 = liquidity * Q96
        return int(numerator1 / (numerator1 / sqrt_p_x96 + amount_in))
    else:
        # token1 -> token0 的价格计算,依据公式 (38)
        return sqrt_p_x96 + int(amount_in * Q96 / liquidity)

接着,我们就能计算在指定输入量下的实际输出量:

def get_amount0_delta(sqrt_ratio_a_x96, sqrt_ratio_b_x96, liquidity):
    """
    计算 token0 的输出数量

    Args:
        sqrt_ratio_a_x96: 起始价格的平方根
        sqrt_ratio_b_x96: 结束价格的平方根
        liquidity: 流动性

    Returns:
        float: token0 的数量
    """
    return (
        liquidity
        * Q96
        * (sqrt_ratio_b_x96 - sqrt_ratio_a_x96)
        / sqrt_ratio_b_x96
        / sqrt_ratio_a_x96
    )

def get_amount1_delta(sqrt_ratio_a_x96, sqrt_ratio_b_x96, liquidity):
    """
    计算 token1 的输出数量

    Args:
        sqrt_ratio_a_x96: 起始价格的平方根
        sqrt_ratio_b_x96: 结束价格的平方根
        liquidity: 流动性

    Returns:
        float: token1 的数量
    """
    return liquidity * (sqrt_ratio_b_x96 - sqrt_ratio_a_x96) / Q96

AERODROM_USDC_USDT_POOL_FEE = 70

def aerodrom_quote(contract, token_in, token_out, amount_in):
    """
    计算 Aerodrome 上的代币兑换汇率

    Args:
        contract: Aerodrome 资金池合约实例
        token_in: 输入代币合约地址
        token_out: 输出代币合约地址
        amount_in: 输入数量(以 USDT/USDC 为单位)

    Returns:
        float: 兑换汇率(输出数量/输入数量)
    """
    # 转换为链上精度,见 `DECIMALS` 的说明
    amount_in = int(amount_in * 10**DECIMALS)

    # 判断交易方向。资金池中合约地址小的代币排在前面,大的在后面。
    # 因此若输入代币的地址小于输出代币的地址,说明是 token0 换 token1,反之亦然。
    zero_for_one = token_in.lower() < token_out.lower()

    # 获取链上数据
    liquidity = contract.functions.liquidity().call()  # 当前流动性
    sqrt_price_X96 = contract.functions.slot0().call()[0]  # 当前价格

    # 扣除交易手续费
    in_amt_less_fee = amount_in * (1e6 - AERODROM_USDC_USDT_POOL_FEE) / 1e6

    # 计算交易后的新价格
    sqrt_price_next_X96 = get_next_sqrt_price_from_input(
        sqrt_price_X96, liquidity, in_amt_less_fee, zero_for_one
    )

    # 根据价格变化计算输出数量
    if zero_for_one:
        amount_out = get_amount1_delta(
            sqrt_price_X96,
            sqrt_price_next_X96,
            liquidity,
        )
    else:
        amount_out = get_amount0_delta(
            sqrt_price_X96,
            sqrt_price_next_X96,
            liquidity,
        )

    # 返回汇率。输出数量为负数,但我们只关心汇率,所以取绝对值。
    return abs(amount_out / amount_in)

[!WARNING] 价格影响假设
出于简化起见,我们假设输入量不会导致价格偏移太大。如果跨越了流动性区间,以上计算就不正确。如果需要更加精确的计算,就需要调整代码完全模拟 Uniswap V3 的交易逻辑。技术实现并不困难,但会让文章看起来过于复杂,因此这里做了简化处理。

CEX 盘口数据:币安 API 快速调用

获取币安的盘口数据非常简单,ccxt 库已经封装好了 API:

cex = ccxt.binance()
orderbook = cex.fetch_order_book("USDC/USDT", limit=1)

调用输出:

{'symbol': 'USDC/USDT', 'bids': [[0.9996, 104504279.0]], 'asks': [[0.9997, 54831253.0]], 'timestamp': None, 'datetime': None, 'nonce': 1953829892}

套利利润计算

根据套利模型中给出的公式,可计算套利利润:

logger = logging.getLogger("stablecoin")

def calc_pnl(p, q, n, cost):
    """
    计算套利利润

    Args:
        p (float): CEX USDC (USDT) 价格
        q (float): DEX USDC (USDT) 价格
        n (float): 交易数量
        cost (float): 总成本,等于 f + g + r + s
                     f - CEX 交易手续费
                     g - CEX 提现费用
                     r - DEX 交易手续费
                     s - 链上 Gas 费用

    Returns:
        float: 套利利润
    """
    return n * (q - p) - cost


def log_pnl(prefix, pnl):
    printer = logger.warning if pnl > 0 else logger.debug
    printer(f"{prefix} 本轮套利 PNL: {pnl:.6f}")

循环监控:主程序逻辑与运行

万事俱备,主程序的逻辑非常简单,只需要循环拉取数据,代入可套利公式计算即可:

import logging.config
import time

import ccxt
import hexbytes
from web3 import Web3

# 本金,单位 USDT 或者 USDC,取决于套利路径
N = 1000
# f + g + r + s, 详细含义请阅读 https://www.zmrenwu.com/posts/42/
UNISWAP_TOTAL_COST = 0 + 0.5 + 1 + 0.1
AERODROM_TOTAL_COST = 0 + 0.1 + 1 + 0.1

# ... 省略其它代码

def main():
    uniswap_quoter = get_uniswap_quoter_contract()
    aerodrom_pool = get_aerodrom_pool_contract()

    cex = ccxt.binance()
    while True:
        # Uniswap
        uniswap_q_usdc = uniswap_quote(
            uniswap_quoter,
            UNISWAP_USDC_ADDR,
            UNISWAP_USDT_ADDR,
            N,
        )
        uniswap_q_usdt = uniswap_quote(
            uniswap_quoter,
            UNISWAP_USDT_ADDR,
            UNISWAP_USDC_ADDR,
            N,
        )
        logger.debug(
            f"Uniswap 汇率: 1 USDC = {uniswap_q_usdt:.6f} USDT; 1 USDT = {uniswap_q_usdc:.6f} USDC"
        )

        # Aerodrom
        aerodrom_q_usdc = aerodrom_quote(
            aerodrom_pool,
            AERODROM_USDC_ADDR,
            AERODROM_USDT_ADDR,
            N,
        )
        aerodrom_q_usdt = aerodrom_quote(
            aerodrom_pool,
            AERODROM_USDT_ADDR,
            AERODROM_USDC_ADDR,
            N,
        )
        logger.debug(
            f"Aerodrom 汇率: 1 USDC = {aerodrom_q_usdt:.6f} USDT; 1 USDT = {aerodrom_q_usdc:.6f} USDC"
        )

        # CEX
        orderbook = cex.fetch_order_book("USDC/USDT", limit=1)
        logger.debug(orderbook)
        p_usdc = orderbook["asks"][0][0]
        p_usdt = 1 / orderbook["bids"][0][0]

        # USDT -> USDC -> USDT
        circle = "[USDT -> USDC -> USDT]"
        uniswap_pnl = calc_pnl(p_usdc, uniswap_q_usdc, N, UNISWAP_TOTAL_COST)
        aerodrom_pnl = calc_pnl(p_usdc, aerodrom_q_usdc, N, AERODROM_TOTAL_COST)
        log_pnl(circle + " Uniswap", uniswap_pnl)
        log_pnl(circle + " Aerodrom", aerodrom_pnl)

        # USDC -> USDT -> USDC
        circle = "[USDC -> USDT -> USDC]"
        uniswap_pnl = calc_pnl(p_usdt, uniswap_q_usdt, N, UNISWAP_TOTAL_COST)
        aerodrom_pnl = calc_pnl(p_usdt, aerodrom_q_usdt, N, AERODROM_TOTAL_COST)
        log_pnl(circle + " Uniswap", uniswap_pnl)
        log_pnl(circle + " Aerodrom", aerodrom_pnl)

        time.sleep(60)

实际运行结果

2025-12-11 13:14:35,756 [DEBUG] - Uniswap 汇率: 1 USDC = 0.999998 USDT; 1 USDT = 0.998998 USDC
2025-12-11 13:14:42,763 [DEBUG] - Aerodrom 汇率: 1 USDC = 0.998918 USDT; 1 USDT = 1.000943 USDC
2025-12-11 13:14:42,763 [DEBUG] - [USDT -> USDC -> USDT] Uniswap 本轮套利 PNL: -2.302005
2025-12-11 13:14:42,763 [WARNING] - [USDT -> USDC -> USDT] Aerodrom 本轮套利 PNL: 0.042907
2025-12-11 13:14:42,764 [DEBUG] - [USDC -> USDT -> USDC] Uniswap 本轮套利 PNL: -2.001831
2025-12-11 13:14:42,764 [DEBUG] - [USDC -> USDT -> USDC] Aerodrom 本轮套利 PNL: -2.682090

从日志可以看到:

  • WARNING 级别的日志表示发现了套利机会(PNL > 0)
  • Aerodrome 上出现了 USDT → USDC → USDT 的套利机会,1000 USDT 本金下利润约为 0.043 USDT
  • 其他方向的套利都是亏损的(DEBUG 级别日志)

这个价差确实存在,主要原因之一是:将 Base 链上的 USDT 转入其他主流链时会产生跨链费用成本较高,这导致价差没能被套利行为及时抹平。若要实际操作套利,还需额外考虑跨链成本和操作执行时间这两个关键因素。

总结

我们成功开发了套利机会监控程序,实现了两种不同的方案:

  1. 直接获取报价行情:适用于像 Uniswap 这样提供 Quoter 合约的 DEX
  2. 自行计算报价:适用于像 Aerodrome 这样需要根据流动性数据手动计算的 DEX

这两套方案可以迁移到更多的 DEX,实现多平台套利机会监控。

从运行结果看,Aerodrome 确实存在价差,但主要是因为 USDT 跨链成本导致的。如果要实际进行套利,还需完善模型,考虑跨链成本等因素。

相关资源

免责声明

本文内容仅为区块链技术开发的学术探讨与技术实践分享,不构成任何形式的投资建议、交易指导或财务顾问服务。加密货币及区块链相关交易存在极高的市场风险、技术风险与政策风险,可能导致本金大幅亏损。
文中所提供的代码、数据及运行结果仅作技术演示使用,不保证其准确性、完整性、时效性及可操作性。开发者在实际使用时需自行验证代码逻辑、补充安全机制,并承担因代码运行、交易操作所产生的全部法律责任与经济损失。任何依据本文内容进行的投资或交易行为,均由行为人自行决策并承担全部风险,本文作者及相关方不对任何由此产生的损失负责。

-- EOF --

0 评论
登录后回复