feat: 完善支付通道和收款监听链路

新增 ChannelNotifyPayloadInterface 等支付插件通知契约,规范 pay_no 定位和插件返回校验。

新增微信、支付宝、收钱吧、Postar 个人收款插件适配,支持余额识别与备注识别。

新增 receipt-watcher 后端进程、Redis 队列 job 和平台事件监听,覆盖收款流水通知、商户通知、退款派发、转账派发与清算完成。

补齐个人收款监听相关系统配置、仓储、服务费冻结明细、订单后台操作和通道测试能力。

重构支付单创建、回调、费用、风控、结算和通道统计链路,统一状态流转与幂等处理。
This commit is contained in:
技术老胡
2026-05-11 16:28:48 +08:00
parent 0e5de50337
commit fd1f53f2ee
136 changed files with 14416 additions and 3992 deletions

View File

@@ -0,0 +1,48 @@
<?php
namespace app\queue\job;
use app\queue\support\AbstractQueueJob;
use app\service\payment\runtime\MerchantNotifyDispatcherService;
/**
* 商户通知任务。
*
* 负责根据通知号派发一次商户 notify_url业务失败不抛给 Redis 队列快速重试,
* 后续重试节奏由通知任务表控制。
*/
class MerchantNotifyJob extends AbstractQueueJob
{
/**
* 构造方法。
*
* @param MerchantNotifyDispatcherService $dispatcher 商户通知派发服务
* @return void
*/
public function __construct(
protected MerchantNotifyDispatcherService $dispatcher
) {
}
/**
* 处理商户通知消息。
*
* @param array<string, mixed> $data 队列消息
* @return void
*/
public function handle(array $data): void
{
$notifyNo = $this->requireString($data, 'notify_no');
$this->dispatcher->dispatchTask($notifyNo, false);
}
/**
* 获取日志名称。
*
* @return string 日志名称
*/
protected function logName(): string
{
return 'MerchantNotifyQueue';
}
}

26
app/queue/job/README.md Normal file
View File

@@ -0,0 +1,26 @@
# Queue Job 目录说明
本目录只放具体的队列业务任务类。
Job 的职责是:
- 校验队列消息 payload。
- 调用领域 Service 完成业务动作。
- 定义该任务自己的失败处理策略。
Job 不直接实现 `Webman\RedisQueue\Consumer`,也不声明队列名。队列名和 Redis 连接由 `app/queue/redis` 下的 Consumer 负责。
## 新增任务约定
1. 新建一个以 `Job` 结尾的类,例如 `TransferDispatchJob`
2. 继承 `app\queue\support\AbstractQueueJob`
3.`handle(array $data)` 中解析消息并调用对应 Service。
4. 不在 Job 中堆复杂业务逻辑,复杂流程应下沉到 `app/service`
5. Job 应保持无状态,单次消息的数据只从 `handle()` 参数传入。
## 目录边界
- 具体业务 Job 放这里。
- 队列 Consumer 放 `app/queue/redis`
- 抽象基类和队列辅助类放 `app/queue/support`
- 通用接口放 `app/common/interface`

View File

@@ -0,0 +1,134 @@
<?php
namespace app\queue\job;
use app\exception\PaymentException;
use app\queue\support\AbstractQueueJob;
use app\service\payment\order\PayOrderCallbackService;
use app\service\payment\receipt\ReceiptWatcherService;
use RuntimeException;
/**
* 网页流水监听通知任务。
*
* 每条消息建议只包含一条归一化流水,便于幂等、重试和日志定位。
*/
class ReceiptFlowNotifyJob extends AbstractQueueJob
{
/**
* 构造方法。
*
* @param ReceiptWatcherService $receiptWatcherService 网页流水监听服务
* @param PayOrderCallbackService $payOrderCallbackService 支付单回调服务
* @return void
*/
public function __construct(
protected ReceiptWatcherService $receiptWatcherService,
protected PayOrderCallbackService $payOrderCallbackService
) {
}
/**
* 处理网页流水监听消息。
*
* @param array<string, mixed> $data 队列消息
* @return void
*/
public function handle(array $data): void
{
$pluginCode = $this->requireString($data, 'plugin_code', '插件编码');
$apiConfigId = (int) ($data['api_config_id'] ?? 0);
if ($apiConfigId <= 0) {
throw new RuntimeException('插件配置ID不能为空');
}
$records = $this->records($data);
foreach ($records as $record) {
$this->handleRecord($pluginCode, $apiConfigId, $data, $record);
}
}
/**
* 处理单条流水。
*
* @param string $pluginCode 插件编码
* @param int $apiConfigId 插件配置ID
* @param array<string, mixed> $data 原始队列消息
* @param array<string, mixed> $record 流水记录
* @return void
*/
private function handleRecord(string $pluginCode, int $apiConfigId, array $data, array $record): void
{
if ($this->receiptWatcherService->isFlowSeen($pluginCode, $apiConfigId, $record)) {
return;
}
$token = $this->receiptWatcherService->acquireFlowLock($pluginCode, $apiConfigId, $record);
if ($token === null) {
return;
}
try {
$payType = trim((string) ($record['pay_type'] ?? ''));
$channel = $this->receiptWatcherService->resolveChannelForFlow($pluginCode, $apiConfigId, $payType);
if (!$channel) {
throw new PaymentException('流水未匹配到可用支付通道', 40200, [
'plugin_code' => $pluginCode,
'api_config_id' => $apiConfigId,
'pay_type' => $payType,
]);
}
$payload = $data;
$payload['record'] = $record;
$payload['channel_id'] = (int) $channel->id;
$callbackPayload = $this->payOrderCallbackService->handleChannelNotifyPayload((int) $channel->id, $payload);
if (empty($callbackPayload['success'])) {
throw new RuntimeException('流水通知未确认支付成功');
}
$this->receiptWatcherService->markFlowSeen($pluginCode, $apiConfigId, $record);
} finally {
$this->receiptWatcherService->releaseFlowLock($pluginCode, $apiConfigId, $record, $token);
}
}
/**
* 读取消息中的流水列表。
*
* @param array<string, mixed> $data 队列消息
* @return array<int, array<string, mixed>> 流水列表
*/
private function records(array $data): array
{
if (isset($data['record']) && is_array($data['record'])) {
return [$data['record']];
}
if (!isset($data['records']) || !is_array($data['records'])) {
throw new RuntimeException('流水记录不能为空');
}
$records = [];
foreach ($data['records'] as $record) {
if (is_array($record)) {
$records[] = $record;
}
}
if ($records === []) {
throw new RuntimeException('流水记录不能为空');
}
return $records;
}
/**
* 获取日志名称。
*
* @return string 日志名称
*/
protected function logName(): string
{
return 'ReceiptFlowNotifyQueue';
}
}

View File

@@ -0,0 +1,49 @@
<?php
namespace app\queue\job;
use app\queue\support\AbstractQueueJob;
use app\service\payment\order\RefundDispatchService;
/**
* 退款派发任务。
*
* 负责把退款单请求发送到第三方通道,派发异常会由退款派发服务落库为失败状态。
*/
class RefundDispatchJob extends AbstractQueueJob
{
/**
* 构造方法。
*
* @param RefundDispatchService $dispatcher 退款派发服务
* @return void
*/
public function __construct(
protected RefundDispatchService $dispatcher
) {
}
/**
* 处理退款派发消息。
*
* @param array<string, mixed> $data 队列消息
* @return void
*/
public function handle(array $data): void
{
$refundNo = $this->requireString($data, 'refund_no');
$isRetry = $this->boolValue($data['is_retry'] ?? false);
$this->dispatcher->dispatch($refundNo, $isRetry, false);
}
/**
* 获取日志名称。
*
* @return string 日志名称
*/
protected function logName(): string
{
return 'RefundDispatchQueue';
}
}

View File

@@ -0,0 +1,48 @@
<?php
namespace app\queue\job;
use app\queue\support\AbstractQueueJob;
use app\service\payment\settlement\SettlementAutomationService;
/**
* 清算自动入账任务。
*
* 只处理已生成清算单的自动入账,是否满足自动入账策略由清算服务判断。
*/
class SettlementCompleteJob extends AbstractQueueJob
{
/**
* 构造方法。
*
* @param SettlementAutomationService $settlementAutomationService 清算自动化服务
* @return void
*/
public function __construct(
protected SettlementAutomationService $settlementAutomationService
) {
}
/**
* 处理清算自动入账消息。
*
* @param array<string, mixed> $data 队列消息
* @return void
*/
public function handle(array $data): void
{
$settleNo = $this->requireString($data, 'settle_no', '清算单号');
$this->settlementAutomationService->completeAutoSettlement($settleNo);
}
/**
* 获取日志名称。
*
* @return string 日志名称
*/
protected function logName(): string
{
return 'SettlementCompleteQueue';
}
}

View File

@@ -0,0 +1,47 @@
<?php
namespace app\queue\job;
use app\queue\support\AbstractQueueJob;
use app\service\payment\transfer\TransferService;
/**
* 转账通道派发任务。
*
* 负责把已扣款落库的转账单请求发送到第三方通道。
*/
class TransferDispatchJob extends AbstractQueueJob
{
/**
* 构造方法。
*
* @param TransferService $transferService 转账服务
* @return void
*/
public function __construct(
protected TransferService $transferService
) {
}
/**
* 处理转账派发消息。
*
* @param array<string, mixed> $data 队列消息
* @return void
*/
public function handle(array $data): void
{
$bizNo = $this->requireString($data, 'biz_no');
$this->transferService->dispatchQueuedTransfer($bizNo);
}
/**
* 获取日志名称。
*
* @return string 日志名称
*/
protected function logName(): string
{
return 'TransferDispatchQueue';
}
}

View File

@@ -0,0 +1,49 @@
<?php
namespace app\queue\job;
use app\queue\support\AbstractQueueJob;
use app\service\payment\transfer\TransferService;
/**
* 转账通道查单任务。
*
* 负责对仍处于处理中的转账单做延迟查单,并按退避间隔继续投递下一次查单。
*/
class TransferQueryJob extends AbstractQueueJob
{
/**
* 构造方法。
*
* @param TransferService $transferService 转账服务
* @return void
*/
public function __construct(
protected TransferService $transferService
) {
}
/**
* 处理转账查单消息。
*
* @param array<string, mixed> $data 队列消息
* @return void
*/
public function handle(array $data): void
{
$bizNo = $this->requireString($data, 'biz_no');
$attempt = max(0, (int) ($data['attempt'] ?? 0));
$this->transferService->queryQueuedTransfer($bizNo, $attempt);
}
/**
* 获取日志名称。
*
* @return string 日志名称
*/
protected function logName(): string
{
return 'TransferQueryQueue';
}
}

View File

@@ -0,0 +1,32 @@
<?php
namespace app\queue\redis;
use app\common\constant\PaymentQueueConstant;
use app\queue\job\MerchantNotifyJob;
use app\queue\support\AbstractRedisConsumer;
/**
* 商户通知队列消费者。
*
* 只声明队列名和业务 Job具体消费逻辑统一放在 MerchantNotifyJob 中。
*/
class MerchantNotify extends AbstractRedisConsumer
{
/**
* 队列名称。
*
* @var string
*/
public $queue = PaymentQueueConstant::MERCHANT_NOTIFY;
/**
* 获取任务类名。
*
* @return class-string<MerchantNotifyJob> 任务类名
*/
protected function jobClass(): string
{
return MerchantNotifyJob::class;
}
}

33
app/queue/redis/README.md Normal file
View File

@@ -0,0 +1,33 @@
# Redis Consumer 目录说明
本目录只放会被 `webman/redis-queue` 扫描的正式 Redis 队列 Consumer。
当前配置位于 `config/plugin/webman/redis-queue/process.php`,其中 `consumer_dir` 指向本目录。队列进程启动时会递归扫描本目录下的 PHP 文件,只要类实现了 `Webman\RedisQueue\Consumer`,就会被实例化并订阅队列。
## Consumer 职责
Consumer 应保持很薄,只负责:
- 声明队列名。
- 声明对应 Job 类。
- 适配 Redis 队列框架。
队列名统一从 `app\common\constant\PaymentQueueConstant` 引用,避免生产者和消费者各自维护字符串。
业务处理应放在 `app/queue/job` 下的 Job 类中。
## 禁止放入
- 抽象基类。
- 示例 Consumer。
- 不希望生产环境订阅的临时测试类。
- 复杂业务逻辑。
这些文件可能被队列进程扫描并误订阅。
## 新增任务约定
1. 先在 `app/common/constant/PaymentQueueConstant.php` 登记队列名。
2. 再在 `app/queue/job` 新增业务 Job。
3. 最后在本目录新增 Consumer继承 `app\queue\support\AbstractRedisConsumer`
4. Consumer 只设置 `$queue`,并通过 `jobClass()` 返回对应 Job 类名。

View File

@@ -0,0 +1,30 @@
<?php
namespace app\queue\redis;
use app\common\constant\PaymentQueueConstant;
use app\queue\job\ReceiptFlowNotifyJob;
use app\queue\support\AbstractRedisConsumer;
/**
* 网页流水监听通知队列消费者。
*/
class ReceiptFlowNotify extends AbstractRedisConsumer
{
/**
* 队列名称。
*
* @var string
*/
public $queue = PaymentQueueConstant::RECEIPT_FLOW_NOTIFY;
/**
* 获取任务类名。
*
* @return class-string<ReceiptFlowNotifyJob> 任务类名
*/
protected function jobClass(): string
{
return ReceiptFlowNotifyJob::class;
}
}

View File

@@ -0,0 +1,32 @@
<?php
namespace app\queue\redis;
use app\common\constant\PaymentQueueConstant;
use app\queue\job\RefundDispatchJob;
use app\queue\support\AbstractRedisConsumer;
/**
* 退款通道请求队列消费者。
*
* 只声明队列名和业务 Job具体消费逻辑统一放在 RefundDispatchJob 中。
*/
class RefundDispatch extends AbstractRedisConsumer
{
/**
* 队列名称。
*
* @var string
*/
public $queue = PaymentQueueConstant::REFUND_DISPATCH;
/**
* 获取任务类名。
*
* @return class-string<RefundDispatchJob> 任务类名
*/
protected function jobClass(): string
{
return RefundDispatchJob::class;
}
}

View File

@@ -0,0 +1,30 @@
<?php
namespace app\queue\redis;
use app\common\constant\PaymentQueueConstant;
use app\queue\job\SettlementCompleteJob;
use app\queue\support\AbstractRedisConsumer;
/**
* 清算自动入账队列消费者。
*/
class SettlementComplete extends AbstractRedisConsumer
{
/**
* 队列名称。
*
* @var string
*/
public $queue = PaymentQueueConstant::SETTLEMENT_COMPLETE;
/**
* 获取任务类名。
*
* @return class-string<SettlementCompleteJob> 任务类名
*/
protected function jobClass(): string
{
return SettlementCompleteJob::class;
}
}

View File

@@ -0,0 +1,32 @@
<?php
namespace app\queue\redis;
use app\common\constant\PaymentQueueConstant;
use app\queue\job\TransferDispatchJob;
use app\queue\support\AbstractRedisConsumer;
/**
* 转账通道派发队列消费者。
*
* 只声明队列名和业务 Job具体消费逻辑统一放在 TransferDispatchJob 中。
*/
class TransferDispatch extends AbstractRedisConsumer
{
/**
* 队列名称。
*
* @var string
*/
public $queue = PaymentQueueConstant::TRANSFER_DISPATCH;
/**
* 获取任务类名。
*
* @return class-string<TransferDispatchJob> 任务类名
*/
protected function jobClass(): string
{
return TransferDispatchJob::class;
}
}

View File

@@ -0,0 +1,32 @@
<?php
namespace app\queue\redis;
use app\common\constant\PaymentQueueConstant;
use app\queue\job\TransferQueryJob;
use app\queue\support\AbstractRedisConsumer;
/**
* 转账通道查单队列消费者。
*
* 只声明队列名和业务 Job具体消费逻辑统一放在 TransferQueryJob 中。
*/
class TransferQuery extends AbstractRedisConsumer
{
/**
* 队列名称。
*
* @var string
*/
public $queue = PaymentQueueConstant::TRANSFER_QUERY;
/**
* 获取任务类名。
*
* @return class-string<TransferQueryJob> 任务类名
*/
protected function jobClass(): string
{
return TransferQueryJob::class;
}
}

View File

@@ -0,0 +1,73 @@
<?php
namespace app\queue\support;
use app\common\interface\QueueJobInterface;
use RuntimeException;
use support\Log;
use Throwable;
/**
* 队列任务基类。
*
* 提供消息字段校验、布尔值解析和统一失败日志,具体业务 Job 只需要实现 handle。
*/
abstract class AbstractQueueJob implements QueueJobInterface
{
/**
* 默认消费失败处理。
*
* @param Throwable $exception 异常
* @param array<string, mixed> $package 原始队列包
* @return void
*/
public function failed(Throwable $exception, array $package): void
{
Log::warning(sprintf(
'[%s] 消费失败 queue=%s attempts=%s error=%s',
$this->logName(),
(string) ($package['queue'] ?? ''),
(string) ($package['attempts'] ?? ''),
$exception->getMessage()
));
}
/**
* 读取必填字符串字段。
*
* @param array<string, mixed> $data 队列消息
* @param string $key 字段名
* @param string $label 字段显示名
* @return string 字段值
*/
protected function requireString(array $data, string $key, string $label = ''): string
{
$value = trim((string) ($data[$key] ?? ''));
if ($value === '') {
throw new RuntimeException(($label !== '' ? $label : $key) . ' 不能为空');
}
return $value;
}
/**
* 解析布尔字段。
*
* @param mixed $value 字段值
* @return bool 布尔结果
*/
protected function boolValue(mixed $value): bool
{
return filter_var($value, FILTER_VALIDATE_BOOL);
}
/**
* 获取日志名称。
*
* @return string 日志名称
*/
protected function logName(): string
{
return static::class;
}
}

View File

@@ -0,0 +1,82 @@
<?php
namespace app\queue\support;
use app\common\interface\QueueJobInterface;
use RuntimeException;
use support\Log;
use Throwable;
use Webman\RedisQueue\Consumer;
/**
* Redis 队列消费者基类。
*
* 统一把 webman/redis-queue 的 Consumer 协议适配到业务 Job具体消费者只需要声明
* 队列名和 Job 类名。
*/
abstract class AbstractRedisConsumer implements Consumer
{
/**
* Redis 队列连接名。
*
* @var string
*/
public $connection = 'default';
/**
* 获取任务类名。
*
* @return class-string<QueueJobInterface> 任务类名
*/
abstract protected function jobClass(): string;
/**
* 消费队列消息。
*
* @param mixed $data 队列消息
* @return void
*/
public function consume($data): void
{
$this->job()->handle(is_array($data) ? $data : []);
}
/**
* 处理消费失败。
*
* @param Throwable $exception 异常
* @param array<string, mixed> $package 原始队列包
* @return void
*/
public function onConsumeFailure(Throwable $exception, array $package): void
{
try {
$this->job()->failed($exception, $package);
} catch (Throwable $failureException) {
Log::warning(sprintf(
'[QueueConsumer] 失败处理异常 job=%s queue=%s error=%s failure_error=%s',
$this->jobClass(),
(string) ($package['queue'] ?? ''),
$exception->getMessage(),
$failureException->getMessage()
));
}
}
/**
* 从容器中获取任务实例。
*
* Job 不保存单次消费的可变状态,使用 container_get 复用实例,避免每条消息重复构造依赖。
*
* @return QueueJobInterface 任务实例
*/
private function job(): QueueJobInterface
{
$job = container_get($this->jobClass());
if (!$job instanceof QueueJobInterface) {
throw new RuntimeException('队列任务必须实现 QueueJobInterface' . $this->jobClass());
}
return $job;
}
}