mirror of
				https://github.com/songquanpeng/one-api.git
				synced 2025-10-27 20:03:42 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			79 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			79 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package model
 | |
| 
 | |
| import (
 | |
| 	"github.com/songquanpeng/one-api/common/config"
 | |
| 	"github.com/songquanpeng/one-api/common/logger"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	BatchUpdateTypeUserQuota = iota
 | |
| 	BatchUpdateTypeTokenQuota
 | |
| 	BatchUpdateTypeUsedQuota
 | |
| 	BatchUpdateTypeChannelUsedQuota
 | |
| 	BatchUpdateTypeRequestCount
 | |
| 	BatchUpdateTypeCount // if you add a new type, you need to add a new map and a new lock
 | |
| )
 | |
| 
 | |
| var batchUpdateStores []map[int]int64
 | |
| var batchUpdateLocks []sync.Mutex
 | |
| 
 | |
| func init() {
 | |
| 	for i := 0; i < BatchUpdateTypeCount; i++ {
 | |
| 		batchUpdateStores = append(batchUpdateStores, make(map[int]int64))
 | |
| 		batchUpdateLocks = append(batchUpdateLocks, sync.Mutex{})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func InitBatchUpdater() {
 | |
| 	go func() {
 | |
| 		for {
 | |
| 			time.Sleep(time.Duration(config.BatchUpdateInterval) * time.Second)
 | |
| 			batchUpdate()
 | |
| 		}
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| func addNewRecord(type_ int, id int, value int64) {
 | |
| 	batchUpdateLocks[type_].Lock()
 | |
| 	defer batchUpdateLocks[type_].Unlock()
 | |
| 	if _, ok := batchUpdateStores[type_][id]; !ok {
 | |
| 		batchUpdateStores[type_][id] = value
 | |
| 	} else {
 | |
| 		batchUpdateStores[type_][id] += value
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func batchUpdate() {
 | |
| 	logger.SysLog("batch update started")
 | |
| 	for i := 0; i < BatchUpdateTypeCount; i++ {
 | |
| 		batchUpdateLocks[i].Lock()
 | |
| 		store := batchUpdateStores[i]
 | |
| 		batchUpdateStores[i] = make(map[int]int64)
 | |
| 		batchUpdateLocks[i].Unlock()
 | |
| 		// TODO: maybe we can combine updates with same key?
 | |
| 		for key, value := range store {
 | |
| 			switch i {
 | |
| 			case BatchUpdateTypeUserQuota:
 | |
| 				err := increaseUserQuota(key, value)
 | |
| 				if err != nil {
 | |
| 					logger.SysError("failed to batch update user quota: " + err.Error())
 | |
| 				}
 | |
| 			case BatchUpdateTypeTokenQuota:
 | |
| 				err := increaseTokenQuota(key, value)
 | |
| 				if err != nil {
 | |
| 					logger.SysError("failed to batch update token quota: " + err.Error())
 | |
| 				}
 | |
| 			case BatchUpdateTypeUsedQuota:
 | |
| 				updateUserUsedQuota(key, value)
 | |
| 			case BatchUpdateTypeRequestCount:
 | |
| 				updateUserRequestCount(key, int(value))
 | |
| 			case BatchUpdateTypeChannelUsedQuota:
 | |
| 				updateChannelUsedQuota(key, value)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	logger.SysLog("batch update finished")
 | |
| }
 |