Commit 67ad96bf by wang

流量控制

parent c9688c9a
...@@ -2,14 +2,14 @@ ...@@ -2,14 +2,14 @@
[default_redis_read] [default_redis_read]
host = 192.168.1.235:6379 host = 192.168.1.235:6379
password = icDb29mLy2s password = icDb29mLy2s
max_idle = 50 max_idle = 1000
max_active = 5000 max_active = 5000
idle_timeout = 20 idle_timeout = 20
[default_redis_write] [default_redis_write]
host = 192.168.1.235:6379 host = 192.168.1.235:6379
password = icDb29mLy2s password = icDb29mLy2s
max_idle = 50 max_idle = 1000
max_active = 5000 max_active = 5000
idle_timeout = 20 idle_timeout = 20
......
...@@ -21,6 +21,7 @@ func Hbsdata(ctx *gin.Context) { ...@@ -21,6 +21,7 @@ func Hbsdata(ctx *gin.Context) {
用classId获取sku列表 用classId获取sku列表
*/ */
func GetSkuListByClass(ctx *gin.Context) { func GetSkuListByClass(ctx *gin.Context) {
return
req:=&model.QuerySkuCreq{} req:=&model.QuerySkuCreq{}
e.CheckError(model.ValidateBind(ctx,req)) e.CheckError(model.ValidateBind(ctx,req))
rsp,err:=service.NewSkuService().GetSkuListByClass(req) rsp,err:=service.NewSkuService().GetSkuListByClass(req)
......
...@@ -76,7 +76,7 @@ func businessList() { ...@@ -76,7 +76,7 @@ func businessList() {
businessList:=[]string{ businessList:=[]string{
jdToken,//京东 jdToken,//京东
baiduToken,//百度 //baiduToken,//百度
} }
fmt.Println("商家列表:key:"+redisKey) fmt.Println("商家列表:key:"+redisKey)
redisStringSet(redisKey,businessList) redisStringSet(redisKey,businessList)
...@@ -126,8 +126,8 @@ func redisHashSet(key string,values map[string]interface{}) { ...@@ -126,8 +126,8 @@ func redisHashSet(key string,values map[string]interface{}) {
b,_=json.Marshal(string(b)) b,_=json.Marshal(string(b))
hsetStr:="Hset "+key+" "+string(hashk)+" "+string(b) hsetStr:="Hset "+key+" "+string(hashk)+" "+string(b)
fmt.Println(hsetStr) fmt.Println(hsetStr)
fmt.Println("对应计时的min key\nflowUse_"+hashk+"_min") //fmt.Println("对应计时的min key\nflowUse_"+hashk+"_min")
fmt.Println("对应计时的day key\nflowUse_"+hashk+"_day") //fmt.Println("对应计时的day key\nflowUse_"+hashk+"_day")
} }
} }
......
...@@ -18,13 +18,14 @@ const ( ...@@ -18,13 +18,14 @@ const (
WHILTREDISEERR2 = 813404//ip不在白名单里面 WHILTREDISEERR2 = 813404//ip不在白名单里面
WHILTREDISEERR3 = 813406//没有设置白名单 WHILTREDISEERR3 = 813406//没有设置白名单
//白名单配置814 //锁814
FLOWERR1 = 814505//流量控制操作 redis错误 FLOWERR1 = 814505//lua脚本执行出错
FLOWERR5 = 814408//脚本执行超时
FLOWERR3 = 814406//触发分钟级流量限制 FLOWERR3 = 814406//触发分钟级流量限制
FLOWERR4 = 814407//触发天级流量限制 FLOWERR4 = 814407//触发天级流量限制
//lock===========815========= //lock===========815=========
LOCKERR1 =815407 //没读到锁,请重试
//sku接口===================================== 80开头 获取sku错误码 //sku接口===================================== 80开头 获取sku错误码
PARAM1 = 80001 //参数问题(缺失or数据格式不对) PARAM1 = 80001 //参数问题(缺失or数据格式不对)
......
package open package open
import ( import (
logger "github.com/ichunt2019/log" "context"
"fmt"
"github.com/gogf/gf/util/gconv"
"github.com/gomodule/redigo/redis"
"golang_open_platform/pkg/common" "golang_open_platform/pkg/common"
"golang_open_platform/pkg/e" "golang_open_platform/pkg/e"
"golang_open_platform/pkg/gredis" "golang_open_platform/pkg/gredis"
...@@ -21,14 +24,42 @@ type flowmeter struct { ...@@ -21,14 +24,42 @@ type flowmeter struct {
interfaceName string //接口名称 interfaceName string //接口名称
} }
const incrLua=`
local dayNum = tonumber(redis.call('get', KEYS[1]) or 0)
local minNum = tonumber(redis.call('get', KEYS[2]) or 0)
local dayMax = tonumber(ARGV[1])
local minMax = tonumber(ARGV[2])
print("Hello World!")
if (dayNum + 1 > dayMax) then
return 1
else
if (dayNum == 0) then
redis.call('SETEX', KEYS[1], 86400,0)
end
end
if (minNum + 1 > minMax) then
return 2
else
if (minNum == 0) then
redis.call('SETEX', KEYS[2], 60,0)
end
end
redis.call('incrby', KEYS[2], 1)
redis.call('incrby', KEYS[1], 1)
return 0
`
func NewFlow(token string,interfaceName string) *flowmeter{ func NewFlow(token string,interfaceName string) *flowmeter{
return &flowmeter{dao:&Dao{},token:token,interfaceName:interfaceName} return &flowmeter{dao:&Dao{},token:token,interfaceName:interfaceName}
} }
//获取 对应的计数redis key 和过期时间 //获取 对应的计数redis key 和过期时间
func (this *flowmeter) getFlowKeyAndEx() (key string,expireTime int64){ func (this *flowmeter) getFlowKeyAndEx(flowLimitType int) (key string,expireTime int64){
key="flowUse_"+this.token+"_"+this.interfaceName key="flowUse_"+this.token+"_"+this.interfaceName
switch this.flowLimitType { switch flowLimitType {
case INCRTYPEMIN: case INCRTYPEMIN:
key+="_min" key+="_min"
expireTime=int64(time.Minute.Seconds()) expireTime=int64(time.Minute.Seconds())
...@@ -45,144 +76,77 @@ func (this *flowmeter) getFlowKeyAndEx() (key string,expireTime int64){ ...@@ -45,144 +76,77 @@ func (this *flowmeter) getFlowKeyAndEx() (key string,expireTime int64){
return return
} }
//设置流量限制类型 INCRTYPEMIN 分限制 INCRTYPEDAY 天限制
func (this *flowmeter) setFlowKey(flowLimitType int) {
this.flowLimitType=flowLimitType
}
func (this *flowmeter) getNum()(int,error){
key,expireTime:=this.getFlowKeyAndEx()
//获取计数 (读 redis key)
num,err:=this.dao.getRedisFlowNum(key)
//common.PrintStdout().Printf("读取 计数key:%s,num 为: %d",key,num)
if(err!=nil){
return 0,err
}
if(num==0){//计数不存在(redis没有此key),就新建一个带有超时的key
err:=this.dao.setRedisFlowEx(key,expireTime)
if(err!=nil){
return 0,err
}
}
return num,nil
}
/** /**
比较
*/
func (this *flowmeter) compare(maxNum int64) (error,bool){
num,err:=this.getNum()
if(err!=nil){
return err,false
}
if(int64(num)>=maxNum){
common.PrintStdout().Printf("最大流量为: %d < %d",int(maxNum),num)
return nil,false
}
return nil,true
}
/**
验证 验证
*/ */
func (this *flowmeter) checkout(business *business) error{ func (this *flowmeter) checkout(business *business) error{
redisWriteConn := gredis.Conn("search_w")
defer redisWriteConn.Close()
return nil
interfaceConfig,err:=business.GetInterfaceConfig(this.interfaceName)//商家接口配置,包含一小时最大流量,一天最大流量 interfaceConfig,err:=business.GetInterfaceConfig(this.interfaceName)//商家接口配置,包含一小时最大流量,一天最大流量
key,_:=this.getFlowKeyAndEx()
defer this.delOnlyLock(key)
err=this.addOnlyLock(key)//加锁
if(err!=nil){ if(err!=nil){
common.PrintStdout().Printf("加锁失败")
this.dao.incrRedisFlowNum("err_num")
return err return err
} }
//lock.Lock() dayMax:=interfaceConfig.dayMaxNum
//defer lock.Unlock() minMax:=interfaceConfig.minMaxNum
dayKey,_:=this.getFlowKeyAndEx(INCRTYPEDAY)
//验证天流量限制 minKey,_:=this.getFlowKeyAndEx(INCRTYPEMIN)
this.setFlowKey(INCRTYPEDAY) fmt.Println(dayKey)
err,isPass:=this.compare(interfaceConfig.dayMaxNum) fmt.Println(minKey)
script:=redis.NewScript(2,incrLua)
//脚本执行5秒算它卡死,用script kill杀死此次脚本
context,cancel:=context.WithTimeout(context.Background(),time.Second*1)
defer cancel()
errRes:=make(chan error)
intRes:=make(chan int)
go func() {
s,err:=script.Do(redisWriteConn,dayKey,minKey,dayMax,minMax)
if(err!=nil){ if(err!=nil){
return err
}
if(err!=nil){
logger.Error("sku_query","比较失败")
return err
}
if(isPass==false){
return e.NewApiError("触发天级流控",FLOWERR4)
}
//分钟流量验证 errRes<-err
this.setFlowKey(INCRTYPEMIN) return
err,isPass=this.compare(interfaceConfig.minMaxNum)
if(err!=nil){
return err
}
if(isPass==false){
return e.NewApiError("触发分钟级流控",FLOWERR4)
} }
intRes<-gconv.Int(s)
}()
//增加分钟计数 select {
err=this.incr() case <-context.Done():
if(err!=nil){ common.PrintStdout().Printf("脚本执行失败")
common.PrintStdout().Printf("增加fen分计数报错") Kill()
return nil return e.NewApiError("service error",FLOWERR5)
}
//增加天计数
this.setFlowKey(INCRTYPEDAY)
err=this.incr()
if(err!=nil){
common.PrintStdout().Printf("增加天计数报错")
return err
}
return nil
}
//自增 case err:=<-errRes:
func (this *flowmeter)incr() (err error) { common.PrintStdout().Printf("脚本出错:"+err.Error())
key,_:=this.getFlowKeyAndEx() return e.NewApiError("service error",FLOWERR1)
_,err=this.dao.incrRedisFlowNum(key)
if(err!=nil){
return err
} case num:=<-intRes:
//common.PrintStdout().Printf("加1后的值是 %d",num)
return nil
}
if(num==1){
common.PrintStdout().Printf("触发天限制")
return e.NewApiError("触发天限制",FLOWERR4)
}
//如果锁不存在并设置锁 if(num==2){
func (this *flowmeter)addOnlyLock(key string)error{ common.PrintStdout().Printf("触发分钟级限制")
redisWriteConn := gredis.Conn("search_w") return e.NewApiError("触发分钟级限制",FLOWERR3)
defer redisWriteConn.Close()
name:="flow_lock_"+key
//common.PrintStdout().Printf("加锁 key:%s",name)
s, err:= redisWriteConn.Do("SET", name, "1", "EX", "100","NX")//锁两秒没主动删就自动关闭(防止某个流程卡死,没执行到删除锁)
if(err!=nil){
common.PrintStdout().Printf("读取redis 锁 key:%d 报错",key)
} }
if(s!=nil){//读到了 break
//common.PrintStdout().Printf("加锁完成 key :%s",name)
return nil
} }
common.PrintStdout().Printf("没读到") return nil
return e.NewApiError("请重试",LOCKERR1)
} }
//删除锁 func Kill() {
func (this *flowmeter) delOnlyLock(key string){
redisWriteConn := gredis.Conn("search_w") redisWriteConn := gredis.Conn("search_w")
defer redisWriteConn.Close() defer redisWriteConn.Close()
name:="flow_lock_"+key _,err:=redisWriteConn.Do("script","kill")
_, err:= redisWriteConn.Do("DEL", name)
if(err!=nil){ if(err!=nil){
logger.Error("sku_query","删除锁失败") common.PrintStdout().Printf("kill失败"+err.Error())
println(err.Error()) return
} }
//common.PrintStdout().Printf("解锁完成") common.PrintStdout().Printf("kill成功")
} }
\ No newline at end of file
package open
import (
logger "github.com/ichunt2019/log"
"golang_open_platform/pkg/common"
"golang_open_platform/pkg/e"
"golang_open_platform/pkg/gredis"
"sync"
"time"
)
const (
INCRTYPEMIN = iota //
INCRTYPEDAY //
)
var lock sync.Mutex
type flowmeter struct {
dao *Dao
flowLimitType int //INCRTYPEMIN 分限制 INCRTYPEDAY 天限制
token string //token
interfaceName string //接口名称
}
func NewFlow(token string,interfaceName string) *flowmeter{
return &flowmeter{dao:&Dao{},token:token,interfaceName:interfaceName}
}
//获取 对应的计数redis key 和过期时间
func (this *flowmeter) getFlowKeyAndEx() (key string,expireTime int64){
key="flowUse_"+this.token+"_"+this.interfaceName
switch this.flowLimitType {
case INCRTYPEMIN:
key+="_min"
expireTime=int64(time.Minute.Seconds())
break
case INCRTYPEDAY:
key+="_day"
expireTime=int64(time.Hour.Seconds()*24)
break
default:
key+="_min"
expireTime=int64(time.Minute.Seconds())
break
}
return
}
//设置流量限制类型 INCRTYPEMIN 分限制 INCRTYPEDAY 天限制
func (this *flowmeter) setFlowKey(flowLimitType int) {
this.flowLimitType=flowLimitType
}
func (this *flowmeter) getNum()(int,error){
key,expireTime:=this.getFlowKeyAndEx()
//获取计数 ( redis key)
num,err:=this.dao.getRedisFlowNum(key)
//common.PrintStdout().Printf("读取 计数key:%s,num 为: %d",key,num)
if(err!=nil){
return 0,err
}
if(num==0){//计数不存在(redis没有此key),就新建一个带有超时的key
err:=this.dao.setRedisFlowEx(key,expireTime)
if(err!=nil){
return 0,err
}
}
return num,nil
}
/**
比较
*/
func (this *flowmeter) compare(maxNum int64) (error,bool){
num,err:=this.getNum()
if(err!=nil){
return err,false
}
if(int64(num)>=maxNum){
common.PrintStdout().Printf("最大流量为: %d < %d",int(maxNum),num)
return nil,false
}
return nil,true
}
/**
验证
*/
func (this *flowmeter) checkout(business *business) error{
return nil
interfaceConfig,err:=business.GetInterfaceConfig(this.interfaceName)//商家接口配置,包含一小时最大流量,一天最大流量
key,_:=this.getFlowKeyAndEx()
defer this.delOnlyLock(key)
err=this.addOnlyLock(key)//加锁
if(err!=nil){
common.PrintStdout().Printf("加锁失败")
this.dao.incrRedisFlowNum("err_num")
return err
}
//lock.Lock()
//defer lock.Unlock()
//验证天流量限制
this.setFlowKey(INCRTYPEDAY)
err,isPass:=this.compare(interfaceConfig.dayMaxNum)
if(err!=nil){
return err
}
if(err!=nil){
logger.Error("sku_query","比较失败")
return err
}
if(isPass==false){
return e.NewApiError("触发天级流控",FLOWERR4)
}
//分钟流量验证
this.setFlowKey(INCRTYPEMIN)
err,isPass=this.compare(interfaceConfig.minMaxNum)
if(err!=nil){
return err
}
if(isPass==false){
return e.NewApiError("触发分钟级流控",FLOWERR4)
}
//增加分钟计数
err=this.incr()
if(err!=nil){
common.PrintStdout().Printf("增加fen分计数报错")
return nil
}
//增加天计数
this.setFlowKey(INCRTYPEDAY)
err=this.incr()
if(err!=nil){
common.PrintStdout().Printf("增加天计数报错")
return err
}
return nil
}
//自增
func (this *flowmeter)incr() (err error) {
key,_:=this.getFlowKeyAndEx()
_,err=this.dao.incrRedisFlowNum(key)
if(err!=nil){
return err
}
//common.PrintStdout().Printf("加1后的值是 %d",num)
return nil
}
//如果锁不存在并设置锁
func (this *flowmeter)addOnlyLock(key string)error{
redisWriteConn := gredis.Conn("search_w")
defer redisWriteConn.Close()
name:="flow_lock_"+key
//common.PrintStdout().Printf("加锁 key:%s",name)
s, err:= redisWriteConn.Do("SET", name, "1", "EX", "100","NX")//锁两秒没主动删就自动关闭(防止某个流程卡死,没执行到删除锁)
if(err!=nil){
common.PrintStdout().Printf("读取redis 锁 key:%d 报错",key)
}
if(s!=nil){//读到了
//common.PrintStdout().Printf("加锁完成 key :%s",name)
return nil
}
common.PrintStdout().Printf("没读到")
return e.NewApiError("请重试",LOCKERR1)
}
//删除锁
func (this *flowmeter) delOnlyLock(key string){
redisWriteConn := gredis.Conn("search_w")
defer redisWriteConn.Close()
name:="flow_lock_"+key
_, err:= redisWriteConn.Do("DEL", name)
if(err!=nil){
logger.Error("sku_query","删除锁失败")
println(err.Error())
}
//common.PrintStdout().Printf("解锁完成")
}
\ No newline at end of file
...@@ -71,16 +71,6 @@ func (this *openValidate) permissionValidate(interfaceName string) error{ ...@@ -71,16 +71,6 @@ func (this *openValidate) permissionValidate(interfaceName string) error{
return nil return nil
} }
//请求次数限制
func (this *openValidate) requestsNumValidate(token string,interfaceName string) error{
err:=NewFlow(token,interfaceName).checkout(this.business)
if(err!=nil){
return err
}
return nil
}
//check验证 //check验证
func (this *openValidate) Check(ctx *gin.Context,interfaceName string) error { func (this *openValidate) Check(ctx *gin.Context,interfaceName string) error {
...@@ -100,7 +90,7 @@ func (this *openValidate) Check(ctx *gin.Context,interfaceName string) error { ...@@ -100,7 +90,7 @@ func (this *openValidate) Check(ctx *gin.Context,interfaceName string) error {
return err return err
} }
//流量控制验证 //流量控制验证
err=this.requestsNumValidate(tokenStr,interfaceName) err=NewFlow(tokenStr,interfaceName).checkout(this.business)
if(err!=nil){ if(err!=nil){
return err return err
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment