mirror of
				https://github.com/songquanpeng/one-api.git
				synced 2025-11-04 07:43:41 +08:00 
			
		
		
		
	feat: able to use separated database for table logs
This commit is contained in:
		
							
								
								
									
										39
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										39
									
								
								README.md
									
									
									
									
									
								
							@@ -349,38 +349,39 @@ graph LR
 | 
			
		||||
     + `SQL_MAX_OPEN_CONNS`:最大打开连接数,默认为 `1000`。
 | 
			
		||||
       + 如果报错 `Error 1040: Too many connections`,请适当减小该值。
 | 
			
		||||
     + `SQL_CONN_MAX_LIFETIME`:连接的最大生命周期,默认为 `60`,单位分钟。
 | 
			
		||||
4. `FRONTEND_BASE_URL`:设置之后将重定向页面请求到指定的地址,仅限从服务器设置。
 | 
			
		||||
4. `LOG_SQL_DSN`:设置之后将为 `logs` 表使用独立的数据库,请使用 MySQL 或 PostgreSQL。
 | 
			
		||||
5. `FRONTEND_BASE_URL`:设置之后将重定向页面请求到指定的地址,仅限从服务器设置。
 | 
			
		||||
   + 例子:`FRONTEND_BASE_URL=https://openai.justsong.cn`
 | 
			
		||||
5. `MEMORY_CACHE_ENABLED`:启用内存缓存,会导致用户额度的更新存在一定的延迟,可选值为 `true` 和 `false`,未设置则默认为 `false`。
 | 
			
		||||
6. `MEMORY_CACHE_ENABLED`:启用内存缓存,会导致用户额度的更新存在一定的延迟,可选值为 `true` 和 `false`,未设置则默认为 `false`。
 | 
			
		||||
   + 例子:`MEMORY_CACHE_ENABLED=true`
 | 
			
		||||
6. `SYNC_FREQUENCY`:在启用缓存的情况下与数据库同步配置的频率,单位为秒,默认为 `600` 秒。
 | 
			
		||||
7. `SYNC_FREQUENCY`:在启用缓存的情况下与数据库同步配置的频率,单位为秒,默认为 `600` 秒。
 | 
			
		||||
   + 例子:`SYNC_FREQUENCY=60`
 | 
			
		||||
7. `NODE_TYPE`:设置之后将指定节点类型,可选值为 `master` 和 `slave`,未设置则默认为 `master`。
 | 
			
		||||
8. `NODE_TYPE`:设置之后将指定节点类型,可选值为 `master` 和 `slave`,未设置则默认为 `master`。
 | 
			
		||||
   + 例子:`NODE_TYPE=slave`
 | 
			
		||||
8. `CHANNEL_UPDATE_FREQUENCY`:设置之后将定期更新渠道余额,单位为分钟,未设置则不进行更新。
 | 
			
		||||
9. `CHANNEL_UPDATE_FREQUENCY`:设置之后将定期更新渠道余额,单位为分钟,未设置则不进行更新。
 | 
			
		||||
   + 例子:`CHANNEL_UPDATE_FREQUENCY=1440`
 | 
			
		||||
9. `CHANNEL_TEST_FREQUENCY`:设置之后将定期检查渠道,单位为分钟,未设置则不进行检查。
 | 
			
		||||
   + 例子:`CHANNEL_TEST_FREQUENCY=1440`
 | 
			
		||||
10. `POLLING_INTERVAL`:批量更新渠道余额以及测试可用性时的请求间隔,单位为秒,默认无间隔。
 | 
			
		||||
10. `CHANNEL_TEST_FREQUENCY`:设置之后将定期检查渠道,单位为分钟,未设置则不进行检查。
 | 
			
		||||
    + 例子:`CHANNEL_TEST_FREQUENCY=1440`
 | 
			
		||||
11. `POLLING_INTERVAL`:批量更新渠道余额以及测试可用性时的请求间隔,单位为秒,默认无间隔。
 | 
			
		||||
    + 例子:`POLLING_INTERVAL=5`
 | 
			
		||||
11. `BATCH_UPDATE_ENABLED`:启用数据库批量更新聚合,会导致用户额度的更新存在一定的延迟可选值为 `true` 和 `false`,未设置则默认为 `false`。
 | 
			
		||||
12. `BATCH_UPDATE_ENABLED`:启用数据库批量更新聚合,会导致用户额度的更新存在一定的延迟可选值为 `true` 和 `false`,未设置则默认为 `false`。
 | 
			
		||||
    + 例子:`BATCH_UPDATE_ENABLED=true`
 | 
			
		||||
    + 如果你遇到了数据库连接数过多的问题,可以尝试启用该选项。
 | 
			
		||||
12. `BATCH_UPDATE_INTERVAL=5`:批量更新聚合的时间间隔,单位为秒,默认为 `5`。
 | 
			
		||||
13. `BATCH_UPDATE_INTERVAL=5`:批量更新聚合的时间间隔,单位为秒,默认为 `5`。
 | 
			
		||||
    + 例子:`BATCH_UPDATE_INTERVAL=5`
 | 
			
		||||
13. 请求频率限制:
 | 
			
		||||
14. 请求频率限制:
 | 
			
		||||
    + `GLOBAL_API_RATE_LIMIT`:全局 API 速率限制(除中继请求外),单 ip 三分钟内的最大请求数,默认为 `180`。
 | 
			
		||||
    + `GLOBAL_WEB_RATE_LIMIT`:全局 Web 速率限制,单 ip 三分钟内的最大请求数,默认为 `60`。
 | 
			
		||||
14. 编码器缓存设置:
 | 
			
		||||
15. 编码器缓存设置:
 | 
			
		||||
    + `TIKTOKEN_CACHE_DIR`:默认程序启动时会联网下载一些通用的词元的编码,如:`gpt-3.5-turbo`,在一些网络环境不稳定,或者离线情况,可能会导致启动有问题,可以配置此目录缓存数据,可迁移到离线环境。
 | 
			
		||||
    + `DATA_GYM_CACHE_DIR`:目前该配置作用与 `TIKTOKEN_CACHE_DIR` 一致,但是优先级没有它高。
 | 
			
		||||
15. `RELAY_TIMEOUT`:中继超时设置,单位为秒,默认不设置超时时间。
 | 
			
		||||
16. `SQLITE_BUSY_TIMEOUT`:SQLite 锁等待超时设置,单位为毫秒,默认 `3000`。
 | 
			
		||||
17. `GEMINI_SAFETY_SETTING`:Gemini 的安全设置,默认 `BLOCK_NONE`。
 | 
			
		||||
18. `THEME`:系统的主题设置,默认为 `default`,具体可选值参考[此处](./web/README.md)。
 | 
			
		||||
19. `ENABLE_METRIC`:是否根据请求成功率禁用渠道,默认不开启,可选值为 `true` 和 `false`。
 | 
			
		||||
20. `METRIC_QUEUE_SIZE`:请求成功率统计队列大小,默认为 `10`。
 | 
			
		||||
21. `METRIC_SUCCESS_RATE_THRESHOLD`:请求成功率阈值,默认为 `0.8`。
 | 
			
		||||
16. `RELAY_TIMEOUT`:中继超时设置,单位为秒,默认不设置超时时间。
 | 
			
		||||
17. `SQLITE_BUSY_TIMEOUT`:SQLite 锁等待超时设置,单位为毫秒,默认 `3000`。
 | 
			
		||||
18. `GEMINI_SAFETY_SETTING`:Gemini 的安全设置,默认 `BLOCK_NONE`。
 | 
			
		||||
19. `THEME`:系统的主题设置,默认为 `default`,具体可选值参考[此处](./web/README.md)。
 | 
			
		||||
20. `ENABLE_METRIC`:是否根据请求成功率禁用渠道,默认不开启,可选值为 `true` 和 `false`。
 | 
			
		||||
21. `METRIC_QUEUE_SIZE`:请求成功率统计队列大小,默认为 `10`。
 | 
			
		||||
22. `METRIC_SUCCESS_RATE_THRESHOLD`:请求成功率阈值,默认为 `0.8`。
 | 
			
		||||
 | 
			
		||||
### 命令行参数
 | 
			
		||||
1. `--port <port_number>`: 指定服务器监听的端口号,默认为 `3000`。
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										16
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								main.go
									
									
									
									
									
								
							@@ -30,11 +30,25 @@ func main() {
 | 
			
		||||
	if config.DebugEnabled {
 | 
			
		||||
		logger.SysLog("running in debug mode")
 | 
			
		||||
	}
 | 
			
		||||
	var err error
 | 
			
		||||
	// Initialize SQL Database
 | 
			
		||||
	err := model.InitDB()
 | 
			
		||||
	model.DB, err = model.InitDB("SQL_DSN")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.FatalLog("failed to initialize database: " + err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	if os.Getenv("LOG_SQL_DSN") != "" {
 | 
			
		||||
		logger.SysLog("using secondary database for table logs")
 | 
			
		||||
		model.LOG_DB, err = model.InitDB("LOG_SQL_DSN")
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			logger.FatalLog("failed to initialize secondary database: " + err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		model.LOG_DB = model.DB
 | 
			
		||||
	}
 | 
			
		||||
	err = model.CreateRootAccountIfNeed()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.FatalLog("database init error: " + err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	defer func() {
 | 
			
		||||
		err := model.CloseDB()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										24
									
								
								model/log.go
									
									
									
									
									
								
							
							
						
						
									
										24
									
								
								model/log.go
									
									
									
									
									
								
							@@ -45,7 +45,7 @@ func RecordLog(userId int, logType int, content string) {
 | 
			
		||||
		Type:      logType,
 | 
			
		||||
		Content:   content,
 | 
			
		||||
	}
 | 
			
		||||
	err := DB.Create(log).Error
 | 
			
		||||
	err := LOG_DB.Create(log).Error
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.SysError("failed to record log: " + err.Error())
 | 
			
		||||
	}
 | 
			
		||||
@@ -69,7 +69,7 @@ func RecordConsumeLog(ctx context.Context, userId int, channelId int, promptToke
 | 
			
		||||
		Quota:            int(quota),
 | 
			
		||||
		ChannelId:        channelId,
 | 
			
		||||
	}
 | 
			
		||||
	err := DB.Create(log).Error
 | 
			
		||||
	err := LOG_DB.Create(log).Error
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.Error(ctx, "failed to record log: "+err.Error())
 | 
			
		||||
	}
 | 
			
		||||
@@ -78,9 +78,9 @@ func RecordConsumeLog(ctx context.Context, userId int, channelId int, promptToke
 | 
			
		||||
func GetAllLogs(logType int, startTimestamp int64, endTimestamp int64, modelName string, username string, tokenName string, startIdx int, num int, channel int) (logs []*Log, err error) {
 | 
			
		||||
	var tx *gorm.DB
 | 
			
		||||
	if logType == LogTypeUnknown {
 | 
			
		||||
		tx = DB
 | 
			
		||||
		tx = LOG_DB
 | 
			
		||||
	} else {
 | 
			
		||||
		tx = DB.Where("type = ?", logType)
 | 
			
		||||
		tx = LOG_DB.Where("type = ?", logType)
 | 
			
		||||
	}
 | 
			
		||||
	if modelName != "" {
 | 
			
		||||
		tx = tx.Where("model_name = ?", modelName)
 | 
			
		||||
@@ -107,9 +107,9 @@ func GetAllLogs(logType int, startTimestamp int64, endTimestamp int64, modelName
 | 
			
		||||
func GetUserLogs(userId int, logType int, startTimestamp int64, endTimestamp int64, modelName string, tokenName string, startIdx int, num int) (logs []*Log, err error) {
 | 
			
		||||
	var tx *gorm.DB
 | 
			
		||||
	if logType == LogTypeUnknown {
 | 
			
		||||
		tx = DB.Where("user_id = ?", userId)
 | 
			
		||||
		tx = LOG_DB.Where("user_id = ?", userId)
 | 
			
		||||
	} else {
 | 
			
		||||
		tx = DB.Where("user_id = ? and type = ?", userId, logType)
 | 
			
		||||
		tx = LOG_DB.Where("user_id = ? and type = ?", userId, logType)
 | 
			
		||||
	}
 | 
			
		||||
	if modelName != "" {
 | 
			
		||||
		tx = tx.Where("model_name = ?", modelName)
 | 
			
		||||
@@ -128,17 +128,17 @@ func GetUserLogs(userId int, logType int, startTimestamp int64, endTimestamp int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func SearchAllLogs(keyword string) (logs []*Log, err error) {
 | 
			
		||||
	err = DB.Where("type = ? or content LIKE ?", keyword, keyword+"%").Order("id desc").Limit(config.MaxRecentItems).Find(&logs).Error
 | 
			
		||||
	err = LOG_DB.Where("type = ? or content LIKE ?", keyword, keyword+"%").Order("id desc").Limit(config.MaxRecentItems).Find(&logs).Error
 | 
			
		||||
	return logs, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func SearchUserLogs(userId int, keyword string) (logs []*Log, err error) {
 | 
			
		||||
	err = DB.Where("user_id = ? and type = ?", userId, keyword).Order("id desc").Limit(config.MaxRecentItems).Omit("id").Find(&logs).Error
 | 
			
		||||
	err = LOG_DB.Where("user_id = ? and type = ?", userId, keyword).Order("id desc").Limit(config.MaxRecentItems).Omit("id").Find(&logs).Error
 | 
			
		||||
	return logs, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func SumUsedQuota(logType int, startTimestamp int64, endTimestamp int64, modelName string, username string, tokenName string, channel int) (quota int64) {
 | 
			
		||||
	tx := DB.Table("logs").Select("ifnull(sum(quota),0)")
 | 
			
		||||
	tx := LOG_DB.Table("logs").Select("ifnull(sum(quota),0)")
 | 
			
		||||
	if username != "" {
 | 
			
		||||
		tx = tx.Where("username = ?", username)
 | 
			
		||||
	}
 | 
			
		||||
@@ -162,7 +162,7 @@ func SumUsedQuota(logType int, startTimestamp int64, endTimestamp int64, modelNa
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func SumUsedToken(logType int, startTimestamp int64, endTimestamp int64, modelName string, username string, tokenName string) (token int) {
 | 
			
		||||
	tx := DB.Table("logs").Select("ifnull(sum(prompt_tokens),0) + ifnull(sum(completion_tokens),0)")
 | 
			
		||||
	tx := LOG_DB.Table("logs").Select("ifnull(sum(prompt_tokens),0) + ifnull(sum(completion_tokens),0)")
 | 
			
		||||
	if username != "" {
 | 
			
		||||
		tx = tx.Where("username = ?", username)
 | 
			
		||||
	}
 | 
			
		||||
@@ -183,7 +183,7 @@ func SumUsedToken(logType int, startTimestamp int64, endTimestamp int64, modelNa
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func DeleteOldLog(targetTimestamp int64) (int64, error) {
 | 
			
		||||
	result := DB.Where("created_at < ?", targetTimestamp).Delete(&Log{})
 | 
			
		||||
	result := LOG_DB.Where("created_at < ?", targetTimestamp).Delete(&Log{})
 | 
			
		||||
	return result.RowsAffected, result.Error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -207,7 +207,7 @@ func SearchLogsByDayAndModel(userId, start, end int) (LogStatistics []*LogStatis
 | 
			
		||||
		groupSelect = "strftime('%Y-%m-%d', datetime(created_at, 'unixepoch')) as day"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = DB.Raw(`
 | 
			
		||||
	err = LOG_DB.Raw(`
 | 
			
		||||
		SELECT `+groupSelect+`,
 | 
			
		||||
		model_name, count(1) as request_count,
 | 
			
		||||
		sum(quota) as quota,
 | 
			
		||||
 
 | 
			
		||||
@@ -17,8 +17,9 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var DB *gorm.DB
 | 
			
		||||
var LOG_DB *gorm.DB
 | 
			
		||||
 | 
			
		||||
func createRootAccountIfNeed() error {
 | 
			
		||||
func CreateRootAccountIfNeed() error {
 | 
			
		||||
	var user User
 | 
			
		||||
	//if user.Status != util.UserStatusEnabled {
 | 
			
		||||
	if err := DB.First(&user).Error; err != nil {
 | 
			
		||||
@@ -41,9 +42,9 @@ func createRootAccountIfNeed() error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func chooseDB() (*gorm.DB, error) {
 | 
			
		||||
	if os.Getenv("SQL_DSN") != "" {
 | 
			
		||||
		dsn := os.Getenv("SQL_DSN")
 | 
			
		||||
func chooseDB(envName string) (*gorm.DB, error) {
 | 
			
		||||
	if os.Getenv(envName) != "" {
 | 
			
		||||
		dsn := os.Getenv(envName)
 | 
			
		||||
		if strings.HasPrefix(dsn, "postgres://") {
 | 
			
		||||
			// Use PostgreSQL
 | 
			
		||||
			logger.SysLog("using PostgreSQL as database")
 | 
			
		||||
@@ -71,23 +72,22 @@ func chooseDB() (*gorm.DB, error) {
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func InitDB() (err error) {
 | 
			
		||||
	db, err := chooseDB()
 | 
			
		||||
func InitDB(envName string) (db *gorm.DB, err error) {
 | 
			
		||||
	db, err = chooseDB(envName)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		if config.DebugSQLEnabled {
 | 
			
		||||
			db = db.Debug()
 | 
			
		||||
		}
 | 
			
		||||
		DB = db
 | 
			
		||||
		sqlDB, err := DB.DB()
 | 
			
		||||
		sqlDB, err := db.DB()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		sqlDB.SetMaxIdleConns(env.Int("SQL_MAX_IDLE_CONNS", 100))
 | 
			
		||||
		sqlDB.SetMaxOpenConns(env.Int("SQL_MAX_OPEN_CONNS", 1000))
 | 
			
		||||
		sqlDB.SetConnMaxLifetime(time.Second * time.Duration(env.Int("SQL_MAX_LIFETIME", 60)))
 | 
			
		||||
 | 
			
		||||
		if !config.IsMasterNode {
 | 
			
		||||
			return nil
 | 
			
		||||
			return db, err
 | 
			
		||||
		}
 | 
			
		||||
		if common.UsingMySQL {
 | 
			
		||||
			_, _ = sqlDB.Exec("DROP INDEX idx_channels_key ON channels;") // TODO: delete this line when most users have upgraded
 | 
			
		||||
@@ -95,46 +95,55 @@ func InitDB() (err error) {
 | 
			
		||||
		logger.SysLog("database migration started")
 | 
			
		||||
		err = db.AutoMigrate(&Channel{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		err = db.AutoMigrate(&Token{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		err = db.AutoMigrate(&User{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		err = db.AutoMigrate(&Option{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		err = db.AutoMigrate(&Redemption{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		err = db.AutoMigrate(&Ability{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		err = db.AutoMigrate(&Log{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		logger.SysLog("database migrated")
 | 
			
		||||
		err = createRootAccountIfNeed()
 | 
			
		||||
		return err
 | 
			
		||||
		return db, err
 | 
			
		||||
	} else {
 | 
			
		||||
		logger.FatalLog(err)
 | 
			
		||||
	}
 | 
			
		||||
	return err
 | 
			
		||||
	return db, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func CloseDB() error {
 | 
			
		||||
	sqlDB, err := DB.DB()
 | 
			
		||||
func closeDB(db *gorm.DB) error {
 | 
			
		||||
	sqlDB, err := db.DB()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	err = sqlDB.Close()
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func CloseDB() error {
 | 
			
		||||
	if LOG_DB != DB {
 | 
			
		||||
		err := closeDB(LOG_DB)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return closeDB(DB)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user