This commit is contained in:
孟帅
2023-11-25 18:36:11 +08:00
parent 40117c700d
commit 70e9f966c3
142 changed files with 5407 additions and 2058 deletions

View File

@@ -6,36 +6,55 @@
package cron
import (
"bufio"
"context"
"fmt"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcron"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/gfile"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
"hotgo/internal/consts"
"hotgo/internal/dao"
"hotgo/internal/model/entity"
"hotgo/utility/simple"
"os"
"strings"
"sync"
)
var crons = &cronManager{
tasks: make(map[string]*TaskItem),
tasks: make(map[string]*TaskItem),
loggers: make(map[string]*glog.Logger),
}
// Cron 定时任务接口
type Cron interface {
// GetName 获取任务名称
GetName() string
// Execute 执行一次任务
Execute(ctx context.Context)
// Execute 执行任务
Execute(ctx context.Context, parser *Parser) (err error)
}
// Parser 任务执行参数
type Parser struct {
Args []string // 任务参数
Logger *glog.Logger // 日志管理实例
}
// Log 任务调度日志
type Log struct {
FileName string `json:"fileName" dc:"文件名称"`
SizeFormat string `json:"sizeFormat" dc:"文件大小"`
Contents string `json:"contents" dc:"文件内容"`
}
// consumerManager 任务管理者
type cronManager struct {
tasks map[string]*TaskItem
tasks map[string]*TaskItem
loggers map[string]*glog.Logger
sync.RWMutex
}
@@ -62,7 +81,8 @@ func Register(c Cron) {
Logger().Debugf(gctx.GetInitCtx(), "cron.Register name:%v duplicate registration.", name)
return
}
crons.tasks[name] = &TaskItem{Name: c.GetName(), Fun: c.Execute}
crons.tasks[name] = &TaskItem{Name: c.GetName(), Fun: GenExecuteFun(c.Execute)}
}
// StopALL 停止所有任务
@@ -88,27 +108,29 @@ func StartALL(sysCron []*entity.SysCron) (err error) {
return gerror.Newf("该任务没有加入任务列表:%v", cron.Name)
}
sn := GenCronSn(cron)
// 没有则添加
if gcron.Search(cron.Name) == nil {
if gcron.Search(sn) == nil {
var (
t *gcron.Entry
ctx = context.WithValue(gctx.New(), consts.ContextKeyCronArgs, strings.Split(cron.Params, consts.CronSplitStr))
ctx = GenCronCtx(cron)
)
switch cron.Policy {
case consts.CronPolicySame:
t, err = gcron.Add(ctx, cron.Pattern, f.Fun, cron.Name)
t, err = gcron.Add(ctx, cron.Pattern, f.Fun, sn)
case consts.CronPolicySingle:
t, err = gcron.AddSingleton(ctx, cron.Pattern, f.Fun, cron.Name)
t, err = gcron.AddSingleton(ctx, cron.Pattern, f.Fun, sn)
case consts.CronPolicyOnce:
t, err = gcron.AddOnce(ctx, cron.Pattern, f.Fun, cron.Name)
t, err = gcron.AddOnce(ctx, cron.Pattern, f.Fun, sn)
case consts.CronPolicyTimes:
if f.Count <= 0 {
f.Count = 1
}
t, err = gcron.AddTimes(ctx, cron.Pattern, int(cron.Count), f.Fun, cron.Name)
t, err = gcron.AddTimes(ctx, cron.Pattern, int(cron.Count), f.Fun, sn)
default:
return gerror.Newf("使用无效的策略, cron.Policy=%v", cron.Policy)
@@ -122,7 +144,7 @@ func StartALL(sysCron []*entity.SysCron) (err error) {
}
}
gcron.Start(cron.Name)
gcron.Start(sn)
// 执行完毕,单次和多次执行的任务更新状态
if cron.Policy == consts.CronPolicyOnce || cron.Policy == consts.CronPolicyTimes {
@@ -144,14 +166,14 @@ func RefreshStatus(sysCron *entity.SysCron) (err error) {
}
if sysCron.Status == consts.StatusEnabled {
return Start(sysCron)
return ResetStart(sysCron)
}
return Stop(sysCron)
}
// Stop 停止单个任务
func Stop(sysCron *entity.SysCron) (err error) {
cr := gcron.Search(sysCron.Name)
cr := gcron.Search(GenCronSn(sysCron))
if cr == nil {
return
}
@@ -159,6 +181,17 @@ func Stop(sysCron *entity.SysCron) (err error) {
return
}
// ResetStart 重置任务
func ResetStart(sysCron *entity.SysCron) (err error) {
if err = Stop(sysCron); err != nil {
return
}
if err = Delete(sysCron); err != nil {
return
}
return Start(sysCron)
}
// Once 立即执行一次某个任务
func Once(ctx context.Context, sysCron *entity.SysCron) error {
crons.RLock()
@@ -167,7 +200,7 @@ func Once(ctx context.Context, sysCron *entity.SysCron) error {
for _, v := range crons.tasks {
if v.Name == sysCron.Name {
simple.SafeGo(ctx, func(ctx context.Context) {
v.Fun(ctx)
v.Fun(GenCronCtx(sysCron))
})
return nil
}
@@ -182,7 +215,7 @@ func Delete(sysCron *entity.SysCron) (err error) {
}
for _, v := range gcron.Entries() {
if v.Name == sysCron.Name {
if v.Name == GenCronSn(sysCron) {
gcron.Remove(v.Name)
}
}
@@ -195,10 +228,86 @@ func Start(sysCron *entity.SysCron) (err error) {
return
}
c := gcron.Search(sysCron.Name)
c := gcron.Search(GenCronSn(sysCron))
if c != nil {
c.Start()
return
}
return StartALL([]*entity.SysCron{sysCron})
}
// DispatchLog 查看指定任务的调度日志
func DispatchLog(sysCron *entity.SysCron) (log *Log, err error) {
path := fmt.Sprintf("%v/%v", Logger().GetConfig().Path, GenCronSn(sysCron))
file, err := FindLastModifiedFile(path)
if err != nil {
return nil, err
}
if len(file) == 0 || !gfile.IsFile(file) {
err = gerror.New("未找到日志!")
return
}
log = new(Log)
log.FileName = file
log.SizeFormat = gfile.SizeFormat(file)
if gfile.Size(file) > 1024*50 {
log.Contents, err = ReadLastLines(file, 100)
if err != nil {
return nil, err
}
} else {
log.Contents = gfile.GetContents(file)
}
return
}
func ReadLastLines(filename string, lineCount int) (string, error) {
file, err := os.Open(filename)
if err != nil {
return "", err
}
defer file.Close()
scanner := bufio.NewScanner(file)
lines := make([]string, 0, lineCount)
for scanner.Scan() {
lines = append(lines, scanner.Text())
if len(lines) > lineCount {
lines = lines[1:]
}
}
if err = scanner.Err(); err != nil {
return "", err
}
result := strings.Join(lines, "\n")
return result, nil
}
func FindLastModifiedFile(dirPath string) (string, error) {
if !gfile.Exists(dirPath) {
return "", gerror.New("该任务暂未产生日志!")
}
files, err := gfile.ScanDir(dirPath, "*.log", true)
if err != nil {
return "", err
}
var lastModifiedFile string
var lastModifiedTime int64 = 0
for _, file := range files {
modTime := gfile.MTimestamp(file)
if modTime > lastModifiedTime {
lastModifiedTime = modTime
lastModifiedFile = file
}
}
return lastModifiedFile, nil
}

View File

@@ -0,0 +1,86 @@
// Package cron
// @Link https://github.com/bufanyun/hotgo
// @Copyright Copyright (c) 2023 HotGo CLI
// @Author Ms <133814250@qq.com>
// @License https://github.com/bufanyun/hotgo/blob/master/LICENSE
package cron
import (
"context"
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
"hotgo/internal/consts"
"hotgo/internal/model/entity"
"strings"
)
// GenCronSn 生成任务序列号
func GenCronSn(sysCron *entity.SysCron) string {
return fmt.Sprintf("%s@%d", sysCron.Name, sysCron.Id)
}
// GenCronCtx 生成任务上下文
func GenCronCtx(sysCron *entity.SysCron) (ctx context.Context) {
ctx = context.WithValue(gctx.New(), consts.ContextKeyCronArgs, strings.Split(sysCron.Params, consts.CronSplitStr))
ctx = context.WithValue(ctx, consts.ContextKeyCronSn, GenCronSn(sysCron))
return ctx
}
func GenLoggerByCtx(ctx context.Context) *glog.Logger {
sn, ok := ctx.Value(consts.ContextKeyCronSn).(string)
if !ok {
Logger().Panic(ctx, "获取定时任务序列号失败!")
}
logger, ok := crons.loggers[sn]
if ok {
return logger
}
logger = glog.New()
if err := logger.SetConfig(Logger().GetConfig()); err != nil {
Logger().Panic(ctx, err)
}
logger.SetFlags(glog.F_TIME_STD | glog.F_FILE_SHORT)
// 设置子路径
if err := logger.SetPath(fmt.Sprintf("%v/%v", logger.GetPath(), sn)); err != nil {
Logger().Panic(ctx, err)
}
crons.Lock()
defer crons.Unlock()
crons.loggers[sn] = logger
return logger
}
// GenExecuteFun 生成执行过程
func GenExecuteFun(fun func(ctx context.Context, parser *Parser) (err error)) func(ctx context.Context) {
return func(ctx context.Context) {
args, ok := ctx.Value(consts.ContextKeyCronArgs).([]string)
if !ok {
Logger().Panic(ctx, "执行定时任务时,参数解析失败!")
return
}
parser := new(Parser)
parser.Args = args
parser.Logger = GenLoggerByCtx(ctx)
st := gtime.Now()
err := g.Try(ctx, func(ctx context.Context) {
if err := fun(ctx, parser); err != nil {
panic(err)
}
})
milliseconds := gtime.Now().Sub(st).Milliseconds() // 执行耗时
if err != nil {
parser.Logger.Errorf(ctx, "execute failed, took %vms, err:%+v", milliseconds, err)
return
}
parser.Logger.Infof(ctx, "execute success, took %vms.", milliseconds)
}
}