链上充值监听与自动划转资金流程实现
链上充值监听与自动划转是数字货币信用卡系统的核心环节,需要实时、准确地捕捉用户充值行为并自动完成资金归集。以下是完整实现方案,包括架构设计、核心流程和代码实现。
一、整体架构设计
核心组件
- 充值地址生成服务:为每个用户生成唯一的充值地址(基于HD钱包派生)
- 区块监听服务:实时同步区块链数据,监控指定地址的交易
- 交易解析服务:验证交易合法性,提取关键信息(金额、发送方等)
- 充值确认服务:根据区块链确认数判断交易是否有效
- 资金划转服务:将到账资金划转到平台归集钱包或用户可用余额
- 通知服务:向用户推送充值到账信息
二、详细流程设计
1. 充值地址生成与关联
- 基于用户ID和HD钱包路径,为每个用户生成唯一充值地址
- 地址格式:支持多种区块链(BTC使用bech32,ETH使用十六进制等)
- 地址与用户账户一对一绑定,存入数据库并关联用户ID
2. 区块监听流程
- 连接区块链全节点或第三方API(如Infura、Alchemy)
- 从最新区块开始监听,同时回溯检查历史区块(防止遗漏)
- 对每个区块,解析所有交易,筛选涉及平台充值地址的交易
- 采用增量同步策略,记录已处理的区块高度,避免重复处理
3. 交易验证与确认
- 验证交易是否成功(区块确认数 >= 系统设定阈值,如ETH需要12个确认)
- 验证交易金额是否满足最小充值限额
- 检查交易是否已被处理(防止重复入账)
- 计算实际到账金额(扣除区块链手续费后)
4. 资金划转与记账
- 定时/自动将到账资金从用户充值地址划转到平台归集钱包(冷钱包)
- 或直接增加用户账户可用余额(信用额度)
- 生成充值记录和系统内转账记录,确保账目清晰
- 触发后续业务流程(如自动激活卡片、提升额度等)
充值确认 → 本地事务(创建充值记录+写入消息表) → 定时任务发送消息 → MQ队列 → 消费消息(更新余额)↓ ↓ ↓状态跟踪 失败重试机制 消费重试+死信
三、核心代码实现
1. 充值地址生成服务
package com.digitalcredit.deposit.service;import com.digitalcredit.user.entity.User;
import com.digitalcredit.user.service.UserService;
import com.digitalcredit.wallet.entity.DepositAddress;
import com.digitalcredit.wallet.entity.enums.BlockchainType;
import com.digitalcredit.wallet.repository.DepositAddressRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.web3j.crypto.*;
import org.web3j.utils.Numeric;import java.util.HashMap;
import java.util.Map;
import java.util.Optional;@Service
public class DepositAddressService {@Autowiredprivate DepositAddressRepository depositAddressRepository;@Autowiredprivate UserService userService;// 平台根钱包(用于派生用户充值地址)private final Credentials platformRootCredentials;// 不同区块链的HD路径前缀private static final Map<BlockchainType, String> BIP44_PATH_PREFIXES = new HashMap<>();static {BIP44_PATH_PREFIXES.put(BlockchainType.BITCOIN, "m/44'/0'/0'/0/");BIP44_PATH_PREFIXES.put(BlockchainType.ETHEREUM, "m/44'/60'/0'/0/");BIP44_PATH_PREFIXES.put(BlockchainType.BSC, "m/44'/56'/0'/0/");}public DepositAddressService(String platformRootPrivateKey) {// 初始化平台根钱包this.platformRootCredentials = Credentials.create(platformRootPrivateKey);}/*** 为用户生成指定区块链的充值地址*/@Transactionalpublic DepositAddress generateDepositAddress(Long userId, BlockchainType blockchainType) {// 1. 验证用户User user = userService.getUserById(userId);if (user == null) {throw new IllegalArgumentException("User not found");}// 2. 检查用户是否已有该链的充值地址Optional<DepositAddress> existingAddress = depositAddressRepository.findByUserIdAndBlockchainType(userId, blockchainType);if (existingAddress.isPresent()) {return existingAddress.get();}// 3. 生成用户唯一索引(可基于用户ID或自增序列)long userIndex = generateUserIndex(userId);// 4. 构建BIP44路径String path = BIP44_PATH_PREFIXES.get(blockchainType) + userIndex;// 5. 从根钱包派生出用户充值地址的私钥ECKeyPair userKeyPair = deriveKeyPairFromPath(platformRootCredentials.getEcKeyPair(), path);String address;// 6. 根据不同区块链生成地址switch (blockchainType) {case ETHEREUM:case BSC:address = "0x" + Keys.getAddress(userKeyPair);break;case BITCOIN:// 比特币地址生成逻辑(使用bitcoinj等库)address = generateBitcoinAddress(userKeyPair);break;default:throw new UnsupportedOperationException("Unsupported blockchain type");}// 7. 保存充值地址(私钥加密存储)DepositAddress depositAddress = new DepositAddress();depositAddress.setUserId(userId);depositAddress.setBlockchainType(blockchainType);depositAddress.setAddress(address);depositAddress.setDerivationPath(path);// 加密存储私钥depositAddress.setEncryptedPrivateKey(encryptPrivateKey(Numeric.toHexStringWithPrefix(userKeyPair.getPrivateKey())));depositAddress.setCreatedAt(System.currentTimeMillis());return depositAddressRepository.save(depositAddress);}/*** 根据BIP44路径从父密钥派生子密钥*/private ECKeyPair deriveKeyPairFromPath(ECKeyPair parent, String path) {String[] pathElements = path.split("/");ECKeyPair currentKeyPair = parent;for (String element : pathElements) {if (element.equals("m")) continue;boolean hardened = element.endsWith("'");int index = Integer.parseInt(hardened ? element.substring(0, element.length() - 1) : element);if (hardened) {index += 0x80000000; // 强化派生索引偏移}// 使用BIP32派生算法currentKeyPair = HDKeyGenerator.deriveChildKey(currentKeyPair, index);}return currentKeyPair;}/*** 生成用户唯一索引*/private long generateUserIndex(Long userId) {// 可以使用userId的哈希或数据库自增序列return Math.abs(userId.hashCode() % 1000000);}/*** 加密私钥(生产环境使用AES-256加密)*/private String encryptPrivateKey(String privateKey) {// 实际实现应使用安全的加密算法,密钥存储在安全的密钥管理服务中return EncryptionUtil.encrypt(privateKey, System.getenv("PRIVATE_KEY_ENCRYPTION_KEY"));}/*** 获取用户的充值地址*/public String getUserDepositAddress(Long userId, BlockchainType blockchainType) {DepositAddress address = depositAddressRepository.findByUserIdAndBlockchainType(userId, blockchainType).orElseThrow(() -> new IllegalArgumentException("No deposit address found for user and blockchain type"));return address.getAddress();}/*** 生成比特币地址(简化实现)*/private String generateBitcoinAddress(ECKeyPair keyPair) {// 实际项目中使用bitcoinj等专业库实现// 这里仅作示例return BitcoinAddressGenerator.generateFromKeyPair(keyPair, false);}
}
2. 区块监听与交易处理服务
package com.digitalcredit.blockchain.service;import com.digitalcredit.blockchain.config.BlockchainNodeConfig;
import com.digitalcredit.blockchain.entity.ChainTransaction;
import com.digitalcredit.blockchain.entity.enums.TransactionStatus;
import com.digitalcredit.blockchain.repository.ChainTransactionRepository;
import com.digitalcredit.deposit.service.DepositProcessingService;
import com.digitalcredit.wallet.entity.DepositAddress;
import com.digitalcredit.wallet.repository.DepositAddressRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.methods.response.EthBlock;
import org.web3j.protocol.core.methods.response.EthBlockNumber;
import org.web3j.protocol.http.HttpService;import javax.annotation.PostConstruct;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;@Service
public class FaultTolerantBlockchainListener {private static final Logger logger = LoggerFactory.getLogger(FaultTolerantBlockchainListener.class);// 每个链的节点配置private final Map<String, List<BlockchainNodeConfig>> nodeConfigs = new ConcurrentHashMap<>();// 当前活跃的Web3j客户端private final Map<String, Web3j> activeWeb3jClients = new ConcurrentHashMap<>();// 节点健康状态private final Map<String, Map<String, Boolean>> nodeHealthStatus = new ConcurrentHashMap<>();// 平台充值地址缓存(小写)private final Set<String> platformAddresses = ConcurrentHashMap.newKeySet();// 区块处理线程池private final ExecutorService blockProcessingExecutor = new ThreadPoolExecutor(4, 16, 5, TimeUnit.MINUTES,new LinkedBlockingQueue<>(1000),new ThreadFactory() {private int counter = 0;@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, "block-processor-" + counter++);thread.setDaemon(true);return thread;}},new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时让提交者执行,防止任务丢失);@Autowiredprivate List<BlockchainNodeConfig> allNodeConfigs;@Autowiredprivate DepositAddressRepository depositAddressRepository;@Autowiredprivate ChainTransactionRepository transactionRepository;@Autowiredprivate DepositProcessingService depositProcessingService;@Autowiredprivate BlockchainNodeHealthChecker nodeHealthChecker;// 上次处理的区块高度private final Map<String, BigInteger> lastProcessedBlock = new ConcurrentHashMap<>();// 确认数阈值配置private final Map<String, Integer> confirmationThresholds = new HashMap<>();{confirmationThresholds.put("ETH", 12);confirmationThresholds.put("BSC", 12);confirmationThresholds.put("BTC", 6);confirmationThresholds.put("LTC", 6);}@PostConstructpublic void initialize() {// 1. 按链类型分组节点配置for (BlockchainNodeConfig config : allNodeConfigs) {nodeConfigs.computeIfAbsent(config.getChainType(), k -> new ArrayList<>()).add(config);nodeHealthStatus.computeIfAbsent(config.getChainType(), k -> new ConcurrentHashMap<>()).put(config.getNodeUrl(), true);}// 2. 初始化活跃客户端initializeActiveClients();// 3. 加载充值地址缓存refreshAddressCache();// 4. 加载上次处理的区块高度loadLastProcessedBlocks();// 5. 启动节点健康检查nodeHealthChecker.start();logger.info("Blockchain listener initialized with {} chains", nodeConfigs.size());}/*** 初始化活跃客户端(选择健康的节点)*/private void initializeActiveClients() {for (String chainType : nodeConfigs.keySet()) {try {Web3j client = getHealthyWeb3jClient(chainType);if (client != null) {activeWeb3jClients.put(chainType, client);logger.info("Initialized active client for chain: {}", chainType);}} catch (Exception e) {logger.error("Failed to initialize client for chain: {}", chainType, e);}}}/*** 获取健康的Web3j客户端(带故障转移)*/private Web3j getHealthyWeb3jClient(String chainType) {List<BlockchainNodeConfig> configs = nodeConfigs.getOrDefault(chainType, Collections.emptyList());if (configs.isEmpty()) {logger.warn("No node configs for chain: {}", chainType);return null;}// 按优先级排序,优先选择主节点configs.sort(Comparator.comparingInt(BlockchainNodeConfig::getPriority));// 尝试连接健康节点for (BlockchainNodeConfig config : configs) {if (nodeHealthStatus.get(chainType).getOrDefault(config.getNodeUrl(), false)) {try {Web3j client = Web3j.build(new HttpService(config.getNodeUrl()));// 测试连接client.ethBlockNumber().sendAsync().get(5, TimeUnit.SECONDS);return client;} catch (Exception e) {logger.warn("Node {} for chain {} is unhealthy: {}", config.getNodeUrl(), chainType, e.getMessage());nodeHealthStatus.get(chainType).put(config.getNodeUrl(), false);}}}logger.error("No healthy nodes available for chain: {}", chainType);return null;}/*** 定时刷新地址缓存(每30分钟)*/@Scheduled(fixedRate = 30 * 60 * 1000)public void refreshAddressCache() {try {long start = System.currentTimeMillis();List<String> addresses = depositAddressRepository.findAllActive().stream().map(DepositAddress::getAddress).map(String::toLowerCase).collect(Collectors.toList());platformAddresses.clear();platformAddresses.addAll(addresses);logger.info("Refreshed deposit addresses cache, count: {}, time: {}ms",addresses.size(), System.currentTimeMillis() - start);} catch (Exception e) {logger.error("Failed to refresh address cache", e);}}/*** 加载上次处理的区块高度*/private void loadLastProcessedBlocks() {for (String chainType : nodeConfigs.keySet()) {BigInteger lastBlock = transactionRepository.findMaxBlockNumberByChain(chainType).orElse(BigInteger.ZERO);lastProcessedBlock.put(chainType, lastBlock);logger.info("Loaded last processed block for {}: {}", chainType, lastBlock);}}/*** 定时监听新区块(每5秒)*/@Scheduled(fixedRate = 5000)public void listenForNewBlocks() {for (String chainType : new ArrayList<>(nodeConfigs.keySet())) {try {processChain(chainType);} catch (Exception e) {logger.error("Error processing chain: {}", chainType, e);// 尝试切换客户端Web3j newClient = getHealthyWeb3jClient(chainType);if (newClient != null) {activeWeb3jClients.put(chainType, newClient);logger.info("Switched to new client for chain: {}", chainType);}}}}/*** 处理单个链的区块*/private void processChain(String chainType) throws Exception {Web3j web3j = activeWeb3jClients.get(chainType);if (web3j == null) {logger.warn("No active client for chain: {}", chainType);return;}// 获取最新区块高度EthBlockNumber blockNumberResp = web3j.ethBlockNumber().send();BigInteger latestBlock = blockNumberResp.getBlockNumber();// 获取上次处理的区块高度BigInteger startBlock = lastProcessedBlock.getOrDefault(chainType, BigInteger.ZERO);// 计算需要处理的区块范围if (latestBlock.compareTo(startBlock) <= 0) {// 没有新区块,检查确认中的交易checkPendingTransactions(chainType, latestBlock);return;}// 限制每次处理的区块数量,防止过载BigInteger endBlock = startBlock.add(BigInteger.valueOf(10));if (endBlock.compareTo(latestBlock) > 0) {endBlock = latestBlock;}logger.info("Processing chain {}: blocks {} to {}", chainType, startBlock, endBlock);// 并行处理区块for (BigInteger blockNum = startBlock.add(BigInteger.ONE); blockNum.compareTo(endBlock) <= 0; blockNum = blockNum.add(BigInteger.ONE)) {processSingleBlockAsync(chainType, web3j, blockNum);}// 更新最后处理的区块高度lastProcessedBlock.put(chainType, endBlock);}/*** 异步处理单个区块*/@Async("blockProcessingExecutor")@Retryable(value = {Exception.class},maxAttempts = 3,backoff = @Backoff(delay = 1000, multiplier = 2))public void processSingleBlockAsync(String chainType, Web3j web3j, BigInteger blockNumber) {try {processSingleBlock(chainType, web3j, blockNumber);} catch (Exception e) {logger.error("Failed to process block {} on chain {} after retries", blockNumber, chainType, e);// 记录失败的区块,供后续手动处理transactionRepository.recordFailedBlock(chainType, blockNumber, e.getMessage());}}/*** 处理单个区块*/private void processSingleBlock(String chainType, Web3j web3j, BigInteger blockNumber) throws Exception {EthBlock block = web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(blockNumber), true // 包含交易详情).send();if (block.getBlock() == null) {logger.warn("Block {} not found on chain {}", blockNumber, chainType);return;}// 处理区块中的交易for (EthBlock.TransactionResult<?> txResult : block.getBlock().getTransactions()) {EthBlock.TransactionObject tx = (EthBlock.TransactionObject) txResult.get();// 检查是否是平台地址的入金交易if (tx.getTo() != null && platformAddresses.contains(tx.getTo().toLowerCase())) {processDepositTransaction(chainType, tx, blockNumber);}}logger.debug("Processed block {} on chain {}, transactions: {}",blockNumber, chainType, block.getBlock().getTransactions().size());}/*** 处理充值交易*/private void processDepositTransaction(String chainType, EthBlock.TransactionObject tx, BigInteger blockNumber) {// 检查交易是否已处理if (transactionRepository.existsByTxHashAndChainType(tx.getHash(), chainType)) {return;}// 创建交易记录ChainTransaction transaction = new ChainTransaction();transaction.setTxHash(tx.getHash());transaction.setChainType(chainType);transaction.setFromAddress(tx.getFrom());transaction.setToAddress(tx.getTo());transaction.setAmount(tx.getValue());transaction.setBlockNumber(blockNumber);transaction.setGasPrice(tx.getGasPrice());transaction.setGasUsed(tx.getGas());transaction.setInputData(tx.getInput());transaction.setTimestamp(System.currentTimeMillis());// 初始状态:待确认int requiredConfirmations = confirmationThresholds.getOrDefault(chainType, 12);transaction.setStatus(TransactionStatus.PENDING_CONFIRMATION);transaction.setRequiredConfirmations(requiredConfirmations);transaction.setCurrentConfirmations(BigInteger.ZERO);transactionRepository.save(transaction);logger.info("Found new deposit transaction {} on chain {}, amount: {}",tx.getHash(), chainType, tx.getValue());}/*** 检查待确认的交易*/private void checkPendingTransactions(String chainType, BigInteger latestBlock) {try {List<ChainTransaction> pendingTxs = transactionRepository.findByChainTypeAndStatus(chainType, TransactionStatus.PENDING_CONFIRMATION);for (ChainTransaction tx : pendingTxs) {// 计算当前确认数BigInteger confirmations = latestBlock.subtract(tx.getBlockNumber());tx.setCurrentConfirmations(confirmations);// 检查是否达到确认阈值if (confirmations.compareTo(BigInteger.valueOf(tx.getRequiredConfirmations())) >= 0) {tx.setStatus(TransactionStatus.CONFIRMED);transactionRepository.save(tx);// 提交给充值处理服务depositProcessingService.processConfirmedDeposit(tx);} else if (System.currentTimeMillis() - tx.getTimestamp() > 24 * 60 * 60 * 1000) {// 超过24小时未确认,标记为失败tx.setStatus(TransactionStatus.CONFIRMATION_FAILED);transactionRepository.save(tx);logger.warn("Transaction {} on chain {} failed to confirm within 24h",tx.getTxHash(), chainType);} else {transactionRepository.save(tx);}}} catch (Exception e) {logger.error("Error checking pending transactions for chain {}", chainType, e);}}
}
3. 充值处理与资金划转服务(RocketMQ)
0). 数据库设计
-- 充值记录表
CREATE TABLE deposit_record (id BIGINT AUTO_INCREMENT PRIMARY KEY,tx_hash VARCHAR(66) NOT NULL COMMENT '区块链交易哈希',user_id BIGINT NOT NULL COMMENT '用户ID',chain_type VARCHAR(20) NOT NULL COMMENT '区块链类型(ETH/BSC等)',deposit_address VARCHAR(66) NOT NULL COMMENT '充值地址',amount DECIMAL(30,18) NOT NULL COMMENT '充值金额',block_number BIGINT NOT NULL COMMENT '区块高度',status VARCHAR(20) NOT NULL COMMENT '状态(PENDING/SUCCESS/FAILED)',failure_reason TEXT COMMENT '失败原因',created_at BIGINT NOT NULL COMMENT '创建时间戳',completed_at BIGINT COMMENT '完成时间戳',UNIQUE KEY uk_tx_hash (tx_hash),KEY idx_user_id (user_id),KEY idx_status_create_time (status, created_at)
) ENGINE=InnoDB COMMENT='充值记录表';-- 本地消息表
CREATE TABLE local_message (id BIGINT AUTO_INCREMENT PRIMARY KEY,business_key VARCHAR(66) NOT NULL COMMENT '业务唯一标识(如交易哈希)',message_type VARCHAR(20) NOT NULL COMMENT '消息类型(DEPOSIT_ARRIVAL/FUND_COLLECTION)',topic VARCHAR(50) NOT NULL COMMENT 'MQ主题',content TEXT NOT NULL COMMENT '消息内容(JSON)',status VARCHAR(20) NOT NULL COMMENT '状态(PENDING/SENDING/SENT/FAILED)',send_count INT NOT NULL DEFAULT 0 COMMENT '发送次数',max_retry_count INT NOT NULL DEFAULT 3 COMMENT '最大重试次数',next_send_time BIGINT NOT NULL COMMENT '下次发送时间戳',last_error TEXT COMMENT '最后错误信息',created_at BIGINT NOT NULL COMMENT '创建时间戳',updated_at BIGINT COMMENT '更新时间戳',UNIQUE KEY uk_business_key_type (business_key, message_type),KEY idx_status_next_send (status, next_send_time)
) ENGINE=InnoDB COMMENT='本地消息表';-- 用户账户表(简化)
CREATE TABLE user_account (id BIGINT AUTO_INCREMENT PRIMARY KEY,user_id BIGINT NOT NULL UNIQUE COMMENT '用户ID',available_balance DECIMAL(30,18) NOT NULL DEFAULT 0 COMMENT '可用余额',total_balance DECIMAL(30,18) NOT NULL DEFAULT 0 COMMENT '总余额',updated_at BIGINT NOT NULL COMMENT '更新时间戳'
) ENGINE=InnoDB COMMENT='用户账户表';
1). 消息与业务实体类
package com.digitalcredit.message.entity;import lombok.Data;import javax.persistence.*;
import java.math.BigDecimal;@Data
@Entity
@Table(name = "local_message")
public class LocalMessage {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;@Column(name = "business_key", nullable = false, length = 66)private String businessKey; // 交易哈希@Column(name = "message_type", nullable = false, length = 20)@Enumerated(EnumType.STRING)private MessageType messageType; // 消息类型@Column(name = "topic", nullable = false, length = 50)private String topic; // MQ主题@Column(name = "content", nullable = false, columnDefinition = "TEXT")private String content; // 消息内容(JSON)@Column(name = "status", nullable = false, length = 20)@Enumerated(EnumType.STRING)private MessageStatus status; // 消息状态@Column(name = "send_count", nullable = false)private Integer sendCount = 0; // 发送次数@Column(name = "max_retry_count", nullable = false)private Integer maxRetryCount = 3; // 最大重试次数@Column(name = "next_send_time", nullable = false)private Long nextSendTime; // 下次发送时间戳@Column(name = "last_error", columnDefinition = "TEXT")private String lastError; // 最后错误信息@Column(name = "created_at", nullable = false)private Long createdAt; // 创建时间戳@Column(name = "updated_at")private Long updatedAt; // 更新时间戳// 消息类型枚举public enum MessageType {DEPOSIT_ARRIVAL, // 资金到账FUND_COLLECTION // 资金归集}// 消息状态枚举public enum MessageStatus {PENDING, // 待发送SENDING, // 发送中SENT, // 已发送FAILED // 发送失败}
}package com.digitalcredit.deposit.entity;import lombok.Data;import javax.persistence.*;
import java.math.BigDecimal;@Data
@Entity
@Table(name = "deposit_record")
public class DepositRecord {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;@Column(name = "tx_hash", nullable = false, length = 66, unique = true)private String txHash; // 区块链交易哈希@Column(name = "user_id", nullable = false)private Long userId; // 用户ID@Column(name = "chain_type", nullable = false, length = 20)private String chainType; // 区块链类型@Column(name = "deposit_address", nullable = false, length = 66)private String depositAddress; // 充值地址@Column(name = "amount", nullable = false, precision = 30, scale = 18)private BigDecimal amount; // 充值金额@Column(name = "block_number", nullable = false)private Long blockNumber; // 区块高度@Column(name = "status", nullable = false, length = 20)@Enumerated(EnumType.STRING)private DepositStatus status; // 状态@Column(name = "failure_reason", columnDefinition = "TEXT")private String failureReason; // 失败原因@Column(name = "created_at", nullable = false)private Long createdAt; // 创建时间戳@Column(name = "completed_at")private Long completedAt; // 完成时间戳// 充值状态枚举public enum DepositStatus {PENDING, // 待处理SUCCESS, // 成功FAILED // 失败}
}
2). 资金到账消息消费者
package com.digitalcredit.mq.consumer;import com.alibaba.fastjson.JSON;
import com.digitalcredit.account.service.AccountService;
import com.digitalcredit.deposit.entity.DepositRecord;
import com.digitalcredit.deposit.entity.enums.DepositStatus;
import com.digitalcredit.deposit.repository.DepositRecordRepository;
import com.digitalcredit.mq.message.DepositMessage;
import org.apache.rocketmq.client.consumer.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;import java.util.List;
import java.util.Optional;@Component
public class DepositMessageListener implements MessageListenerConcurrently {private static final Logger logger = LoggerFactory.getLogger(DepositMessageListener.class);@Autowiredprivate AccountService accountService;@Autowiredprivate DepositRecordRepository depositRecordRepository;/*** 处理资金划转消息*/@Override@Transactionalpublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {String txHash = msg.getKeys();logger.info("开始处理资金划转消息,txHash: {}, 重试次数: {}", txHash, msg.getReconsumeTimes());// 1. 解析消息DepositMessage message = JSON.parseObject(new String(msg.getBody(), "UTF-8"), DepositMessage.class);// 2. 防重复处理(检查是否已处理)Optional<DepositRecord> recordOpt = depositRecordRepository.findByTxHash(txHash);if (recordOpt.isEmpty()) {logger.error("未找到对应的充值记录,txHash: {}", txHash);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}DepositRecord record = recordOpt.get();if (record.getStatus() == DepositStatus.SUCCESS) {logger.info("消息已处理,无需重复处理,txHash: {}", txHash);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 3. 执行资金划转(增加用户余额)accountService.increaseBalance(message.getUserId(),message.getAmount(),"DEPOSIT",txHash);// 4. 更新充值记录状态record.setStatus(DepositStatus.SUCCESS);record.setCompletedAt(System.currentTimeMillis());depositRecordRepository.save(record);logger.info("资金划转处理成功,txHash: {}", txHash);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {logger.error("资金划转处理失败,msgId: {}", msg.getMsgId(), e);// 判断是否达到最大重试次数if (msg.getReconsumeTimes() >= 5) {// 记录失败原因,后续人工处理String txHash = msg.getKeys();updateDepositRecordToFailed(txHash, e.getMessage());logger.warn("消息达到最大重试次数,将进入死信队列,txHash: {}", txHash);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 不再重试,进入死信队列}// 未达最大重试次数,返回重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}/*** 更新充值记录为失败状态*/private void updateDepositRecordToFailed(String txHash, String reason) {try {Optional<DepositRecord> recordOpt = depositRecordRepository.findByTxHash(txHash);if (recordOpt.isPresent()) {DepositRecord record = recordOpt.get();record.setStatus(DepositStatus.FAILED);record.setFailureReason(reason);record.setCompletedAt(System.currentTimeMillis());depositRecordRepository.save(record);}} catch (Exception e) {logger.error("更新充值记录为失败状态失败,txHash: {}", txHash, e);}}
}
四、关键技术点说明
1. 区块链节点连接策略
- 主备节点机制:同时连接多个节点,当主节点故障时自动切换到备用节点
- 连接池管理:维护长期连接,减少握手开销
- 超时重试:设置合理的超时时间和重试策略,确保网络波动时的稳定性
2. 交易去重与幂等性保障
- 使用交易哈希作为唯一标识,确保每笔交易只处理一次
- 数据库唯一索引:在充值记录表中对交易哈希和链类型创建唯一索引
- 状态机设计:明确的状态流转(PENDING → CONFIRMED/INVALID/ERROR),避免重复处理
3. 性能优化措施
- 充值地址缓存:将平台所有充值地址加载到内存,减少数据库查询
- 批量处理:区块处理采用批量方式,提高效率
- 异步处理:交易解析和资金划转采用异步方式,避免阻塞监听线程
- 分区表:充值记录表按时间分区,提高查询效率
4. 容错与恢复机制
- 断点续传:记录已处理的区块高度,服务重启后从断点继续处理
- 重试队列:处理失败的交易加入重试队列,定时重试
- 监控告警:关键指标(区块同步延迟、未确认充值数量)监控和告警
- 手动干预接口:提供手动处理异常充值的接口
五、安全措施
-
私钥安全
- 充值地址私钥加密存储(AES-256)
- 加密密钥通过KMS(密钥管理服务)管理
- 敏感操作(如资金划转)需要多重签名
-
交易验证
- 多重验证:不仅验证地址,还验证金额和交易状态
- 防重放攻击:检查交易是否属于当前链
-
异常监控
- 大额充值预警:超过阈值的充值触发人工审核
- 异常地址监控:对黑名单地址的转账进行拦截
- 频率限制:监控同一地址的频繁转账行为
六、总结
链上充值监听与自动划转系统需要兼顾实时性、准确性和安全性。通过合理的架构设计和技术选型,可以实现高效、可靠的充值处理流程。
核心要点:
- 采用HD钱包技术为每个用户生成唯一充值地址
- 实时区块监听与交易解析,确保不遗漏任何充值
- 基于区块确认数的交易有效性验证机制
- 完善的异常处理和容错恢复机制
- 严格的安全措施保护用户资金安全
实际部署时,应根据支持的区块链类型和用户规模进行水平扩展,确保系统在高并发场景下的稳定性和处理能力。