模块化上传驱动,使用泛型优化工具库降低冗余

This commit is contained in:
孟帅
2023-06-02 20:29:08 +08:00
parent fdc48b9335
commit 62ecbb7f26
96 changed files with 1276 additions and 1483 deletions

View File

@@ -22,41 +22,42 @@ import (
// ClientConfig 客户端配置
type ClientConfig struct {
Addr string
Auth *AuthMeta
Timeout time.Duration
ConnectInterval time.Duration
MaxConnectCount uint
ConnectCount uint
AutoReconnect bool
LoginEvent CallbackEvent
CloseEvent CallbackEvent
Addr string // 连接地址
Auth *AuthMeta // 认证元数据
Timeout time.Duration // 连接超时时间
ConnectInterval time.Duration // 重连时间间隔
MaxConnectCount uint // 最大重连次数0不限次数
ConnectCount uint // 已重连次数
AutoReconnect bool // 是否开启自动重连
LoginEvent CallbackEvent // 登录成功事件
CloseEvent CallbackEvent // 连接关闭事件
}
// Client 客户端
type Client struct {
Ctx context.Context
Logger *glog.Logger
IsLogin bool // 是否已登录
addr string
auth *AuthMeta
rpc *Rpc
timeout time.Duration
connectInterval time.Duration
maxConnectCount uint
connectCount uint
autoReconnect bool
loginEvent CallbackEvent
closeEvent CallbackEvent
sync.Mutex
heartbeat int64
routers map[string]RouterHandler
conn *gtcp.Conn
wg sync.WaitGroup
closeFlag bool // 关闭标签,关闭以后可以重连
stopFlag bool // 停止标签,停止以后不能重连
Ctx context.Context // 上下文
Logger *glog.Logger // 日志处理器
IsLogin bool // 是否已登录
addr string // 连接地址
auth *AuthMeta // 认证元数据
rpc *Rpc // rpc协议支持
timeout time.Duration // 连接超时时间
connectInterval time.Duration // 重连时间间隔
maxConnectCount uint // 最大重连次数0不限次数
connectCount uint // 已重连次数
autoReconnect bool // 是否开启自动重连
loginEvent CallbackEvent // 登录成功事件
closeEvent CallbackEvent // 连接关闭事件
sync.Mutex // 状态锁
heartbeat int64 // 心跳
routers map[string]RouterHandler // 已注册的路由
conn *gtcp.Conn // 连接对象
wg sync.WaitGroup // 状态控制
closeFlag bool // 关闭标签,关闭以后可以重连
stopFlag bool // 停止标签,停止以后不能重连
}
// NewClient 初始化一个tcp客户端
func NewClient(config *ClientConfig) (client *Client, err error) {
client = new(Client)
@@ -110,7 +111,7 @@ func NewClient(config *ClientConfig) (client *Client, err error) {
return
}
// Start 启动
// Start 启动tcp连接
func (client *Client) Start() (err error) {
client.Lock()
defer client.Unlock()
@@ -133,7 +134,6 @@ func (client *Client) Start() (err error) {
simple.SafeGo(client.Ctx, func(ctx context.Context) {
client.connect()
})
return
}
@@ -165,6 +165,7 @@ func (client *Client) RegisterRouter(routers map[string]RouterHandler) (err erro
return
}
// dial
func (client *Client) dial() *gtcp.Conn {
for {
conn, err := gtcp.NewConn(client.addr, client.timeout)
@@ -218,6 +219,7 @@ reconnect:
client.startCron()
}
// read
func (client *Client) read() {
simple.SafeGo(client.Ctx, func(ctx context.Context) {
defer func() {
@@ -347,7 +349,6 @@ func (client *Client) Write(data interface{}) error {
return gerror.Newf("client json message pointer required: %+v", data)
}
msg := &Message{Router: msgType.Elem().Name(), Data: data}
return SendPkg(client.conn, msg)
}
@@ -379,7 +380,6 @@ func (client *Client) RpcRequest(ctx context.Context, data interface{}) (res int
err = gerror.New("traceID is required")
return
}
return client.rpc.Request(key, func() {
_ = client.Write(data)
})

View File

@@ -13,21 +13,24 @@ import (
"hotgo/internal/consts"
)
// getCronKey 生成客户端定时任务名称
func (client *Client) getCronKey(s string) string {
return fmt.Sprintf("tcp.client_%s_%s:%s", s, client.auth.Group, client.auth.Name)
}
// stopCron 停止定时任务
func (client *Client) stopCron() {
for _, v := range gcron.Entries() {
gcron.Remove(v.Name)
}
}
// startCron 启动定时任务
func (client *Client) startCron() {
// 心跳超时检查
if gcron.Search(client.getCronKey(consts.TCPCronHeartbeatVerify)) == nil {
_, _ = gcron.AddSingleton(client.Ctx, "@every 600s", func(ctx context.Context) {
if client.heartbeat < gtime.Timestamp()-600 {
if client.heartbeat < gtime.Timestamp()-consts.TCPHeartbeatTimeout {
client.Logger.Debugf(client.Ctx, "client heartbeat timeout, about to reconnect..")
client.Destroy()
}

View File

@@ -37,6 +37,7 @@ func (client *Client) serverLogin() {
}
}
// onResponseServerLogin 接收服务登陆响应结果
func (client *Client) onResponseServerLogin(ctx context.Context, args ...interface{}) {
var in *msgin.ResponseServerLogin
if err := gconv.Scan(args[0], &in); err != nil {
@@ -58,6 +59,7 @@ func (client *Client) onResponseServerLogin(ctx context.Context, args ...interfa
}
}
// onResponseServerHeartbeat 接收心跳响应结果
func (client *Client) onResponseServerHeartbeat(ctx context.Context, args ...interface{}) {
var in *msgin.ResponseServerHeartbeat
if err := gconv.Scan(args[0], &in); err != nil {

View File

@@ -19,6 +19,7 @@ type AuthMeta struct {
EndAt *gtime.Time `json:"-"`
}
// Context tcp上下文
type Context struct {
Conn *gtcp.Conn `json:"conn"`
Auth *AuthMeta `json:"auth"` // 认证元数据

View File

@@ -14,8 +14,10 @@ import (
"github.com/gogf/gf/v2/util/gconv"
)
var GoPool = grpool.New(100)
// GoPool 初始化一个协程池,用于处理消息处理
var GoPool = grpool.New(20)
// RouterHandler 路由消息处理器
type RouterHandler func(ctx context.Context, args ...interface{})
// Message 路由消息
@@ -24,6 +26,7 @@ type Message struct {
Data interface{} `json:"data"`
}
// SendPkg 打包发送的数据包
func SendPkg(conn *gtcp.Conn, message *Message) error {
b, err := json.Marshal(message)
if err != nil {
@@ -32,6 +35,7 @@ func SendPkg(conn *gtcp.Conn, message *Message) error {
return conn.SendPkg(b)
}
// RecvPkg 解包
func RecvPkg(conn *gtcp.Conn) (*Message, error) {
if data, err := conn.RecvPkg(); err != nil {
return nil, err
@@ -58,7 +62,6 @@ func MsgPkg(data interface{}, auth *AuthMeta, traceID string) string {
if msg == nil {
return ""
}
return msg.TraceID
}

View File

@@ -22,6 +22,7 @@ type Rpc struct {
callbacks map[string]RpcRespFunc
}
// RpcResp 响应结构
type RpcResp struct {
res interface{}
err error
@@ -29,6 +30,7 @@ type RpcResp struct {
type RpcRespFunc func(resp interface{}, err error)
// NewRpc 初始化一个rpc协议
func NewRpc(ctx context.Context) *Rpc {
return &Rpc{
ctx: ctx,
@@ -57,7 +59,6 @@ func (r *Rpc) HandleMsg(ctx context.Context, cancel context.CancelFunc, data int
})
return true
}
return false
}
@@ -99,7 +100,7 @@ func (r *Rpc) Request(callId string, send func()) (res interface{}, err error) {
<-waitCh
select {
case <-time.After(consts.TCPRpcTimeout):
case <-time.After(time.Second * consts.TCPRpcTimeout):
err = gerror.New("rpc response timeout")
return
case got := <-resCh:

View File

@@ -19,35 +19,38 @@ import (
"time"
)
// ClientConn 连接到tcp服务器的客户端对象
type ClientConn struct {
Conn *gtcp.Conn
Auth *AuthMeta
heartbeat int64
Conn *gtcp.Conn // 连接对象
Auth *AuthMeta // 认证元数据
heartbeat int64 // 心跳
}
// ServerConfig tcp服务器配置
type ServerConfig struct {
Name string // 服务名称
Addr string // 监听地址
}
// Server tcp服务器对象结构
type Server struct {
Ctx context.Context
Logger *glog.Logger
addr string
name string
rpc *Rpc
ln *gtcp.Server
wgLn sync.WaitGroup
mutex sync.Mutex
closeFlag bool
clients map[string]*ClientConn // 已登录的认证客户端
mutexConns sync.Mutex
wgConns sync.WaitGroup
cronRouters map[string]RouterHandler // 路由
queueRouters map[string]RouterHandler
authRouters map[string]RouterHandler
Ctx context.Context // 上下文
Logger *glog.Logger // 日志处理器
addr string // 连接地址
name string // 服务器名称
rpc *Rpc // rpc协议
ln *gtcp.Server // tcp服务器
wgLn sync.WaitGroup // 状态控制主要用于tcp服务器能够按流程启动退出
mutex sync.Mutex // 服务器状态锁
closeFlag bool // 服务关闭标签
clients map[string]*ClientConn // 已登录的认证客户端
mutexConns sync.Mutex // 连接锁,主要用于客户端上下线
cronRouters map[string]RouterHandler // 定时任务路由
queueRouters map[string]RouterHandler // 队列路由
authRouters map[string]RouterHandler // 任务路由
}
// NewServer 初始一个tcp服务器对象
func NewServer(config *ServerConfig) (server *Server, err error) {
if config == nil {
err = gerror.New("config is nil")
@@ -84,6 +87,7 @@ func NewServer(config *ServerConfig) (server *Server, err error) {
return
}
// accept
func (server *Server) accept(conn *gtcp.Conn) {
defer func() {
server.mutexConns.Lock()
@@ -262,6 +266,7 @@ func (server *Server) RegisterQueueRouter(routers map[string]RouterHandler) {
}
}
// Listen 监听服务
func (server *Server) Listen() (err error) {
server.wgLn.Add(1)
defer server.wgLn.Done()
@@ -283,7 +288,6 @@ func (server *Server) Close() {
}
server.clients = nil
server.mutexConns.Unlock()
server.wgConns.Wait()
if server.ln != nil {
_ = server.ln.Close()

View File

@@ -13,16 +13,19 @@ import (
"hotgo/internal/consts"
)
// getCronKey 生成服务端定时任务名称
func (server *Server) getCronKey(s string) string {
return fmt.Sprintf("tcp.server_%s_%s", s, server.name)
}
// stopCron 停止定时任务
func (server *Server) stopCron() {
for _, v := range gcron.Entries() {
gcron.Remove(v.Name)
}
}
// startCron 启动定时任务
func (server *Server) startCron() {
// 心跳超时检查
if gcron.Search(server.getCronKey(consts.TCPCronHeartbeatVerify)) == nil {
@@ -31,7 +34,7 @@ func (server *Server) startCron() {
return
}
for _, client := range server.clients {
if client.heartbeat < gtime.Timestamp()-300 {
if client.heartbeat < gtime.Timestamp()-consts.TCPHeartbeatTimeout {
_ = client.Conn.Close()
server.Logger.Debugf(server.Ctx, "client heartbeat timeout, close conn. auth:%+v", client.Auth)
}

View File

@@ -17,6 +17,7 @@ import (
"hotgo/utility/convert"
)
// onServerLogin 处理客户端登录
func (server *Server) onServerLogin(ctx context.Context, args ...interface{}) {
var (
in = new(msgin.ServerLogin)
@@ -137,6 +138,7 @@ func (server *Server) onServerLogin(ctx context.Context, args ...interface{}) {
_ = server.Write(user.Conn, res)
}
// onServerHeartbeat 处理客户端心跳
func (server *Server) onServerHeartbeat(ctx context.Context, args ...interface{}) {
var (
in *msgin.ServerHeartbeat