Commit 3a6792ae by hcy

并发问题

parent 32b298ac
......@@ -3,10 +3,13 @@ package controller
import (
"context"
"encoding/json"
"fmt"
"github.com/tidwall/gjson"
"go_sku_server/pkg/common"
"go_sku_server/pkg/gredis"
"go_sku_server/pkg/logger"
"go_sku_server/pkg/mongo"
"go_sku_server/service"
"gopkg.in/mgo.v2/bson"
"sync"
"time"
......@@ -41,14 +44,11 @@ func CommonController(ctx *gin.Context) map[string]interface{} {
// 1. 提前提取所有请求参数(在主协程中,避免在子协程中访问 gin.Context)
requestParams := service.ExtractRequestParams(ctx)
// 2. 创建带超时的 context,用于控制所有协程的生命周期
// 5秒超时后,所有子协程都会收到取消信号
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() // 确保函数返回时取消所有协程,释放资源
//抽取自营 或者联营 goods_id
zyService := service.ZiyingService{} //实例化自营查询
lyService := service.LyService{} //实例化联营查询
SpuService := service.SpuService{}
var goodsIdArr []string
if GoodsIdStr == "" {
goodsIdMap := ctx.PostFormMap("goods_id")
......@@ -63,11 +63,64 @@ func CommonController(ctx *gin.Context) map[string]interface{} {
return nil
}
//////////初始化公共变量 以及 批量查询///////////
zyGoodsId := make([]string, 0, goodsSliceCount)
lyGoodsId := make([]string, 0, goodsSliceCount)
for _, goodsId := range goodsIdArr {
if len(goodsId) < 19 { //自营
zyGoodsId = append(zyGoodsId, goodsId)
} else { //联营
lyGoodsId = append(lyGoodsId, goodsId)
}
}
//批量查询redis-sku信息
redisLySkuArr := gredis.Hmget("default_r", "sku", lyGoodsId)
//批量查询归档的sku
preSkuIds := make([]int64, 0)
for _, goodsId := range goodsIdArr {
if redisLySkuArr[goodsId] == "" {
preSkuIds = append(preSkuIds, gconv.Int64(goodsId))
}
}
if len(preSkuIds) > 0 {
var prevSkuArr []bson.M
prevSkuMongo := mongo.Conn("pre_sku")
query := bson.M{
"sku_id": bson.M{
"$in": preSkuIds,
},
}
err := prevSkuMongo.DB("ichunt").C("prev_sku").Find(query).All(&prevSkuArr)
if err != nil {
fmt.Println("查询失败: %v", err)
}
for _, preSkuOne := range prevSkuArr {
jsonBytes, err := json.Marshal(preSkuOne)
if err != nil {
continue
}
preSkuOneStr := string(jsonBytes)
skuId := gjson.Get(preSkuOneStr, "sku_id").String()
redisLySkuArr[skuId] = preSkuOneStr
}
}
//查询spu信息
redisLySpuArr := SpuService.GetSpuList(redisLySkuArr)
SpuService.SetInit() //设置查询公共变量
/////////////////多线程///////////////////////////
var wg sync.WaitGroup
ch := make(chan sync.Map, 50) //管道
wgMax := 3 //线程数量最多多少
wgcount := 0 //当前线程数
zyGoodsId := make([]string, 0, goodsSliceCount)
lyGoodsId := make([]string, 0, goodsSliceCount)
temp := make(map[string]interface{}) //最后输出计算结果
for _, goodsId := range goodsIdArr {
//if len(goodsIdArr) > 100 {
// common.Output(ctx, 1001, "查询型号ID不得超过100个", "")
......@@ -90,87 +143,49 @@ func CommonController(ctx *gin.Context) map[string]interface{} {
go func(ctx context.Context, params service.RequestParams, goodsIds []string, ch chan sync.Map) {
defer wg.Done()
zyService.ZyGoodsDetail(ctx, params, goodsIds, ch)
}(ctxWithTimeout, requestParams, idsToProcess, ch)
}(ctx, requestParams, idsToProcess, ch)
zyGoodsId = zyGoodsId[:0]
}
} else { //联营
lyGoodsId = append(lyGoodsId, goodsId)
if len(lyGoodsId) >= goodsSliceCount {
common.PrintDebugHtml(ctx, "ly增加协程1002:")
common.PrintDebugHtml(ctx, lyGoodsId)
wg.Add(1)
idsToProcess := make([]string, len(lyGoodsId))
copy(idsToProcess, lyGoodsId)
common.PrintDebugHtml(ctx, "ly增加协程:"+goodsId)
// 启动协程,传递独立的 context 和参数,而不是 gin.Context
go func(ctx context.Context, params service.RequestParams, goodsIds []string, ch chan sync.Map) {
defer wg.Done()
lyService.LyGoodsDetail(ctx, params, goodsIds, ch)
}(ctxWithTimeout, requestParams, idsToProcess, ch)
lyGoodsId = lyGoodsId[:0]
}
//单个sku详情
if _, ok := redisLySkuArr[goodsId]; !ok {
temp[goodsId] = false
continue
}
skuStr := redisLySkuArr[goodsId]
spuId := gjson.Get(skuStr, "spu_id").String()
if _, ok := redisLySpuArr[spuId]; !ok {
temp[goodsId] = false
continue
}
if len(zyGoodsId) > 0 {
common.PrintDebugHtml(ctx, "zy增加协程1003:")
common.PrintDebugHtml(ctx, zyGoodsId)
wg.Add(1) //协程计数一
idsToProcess := make([]string, len(zyGoodsId))
copy(idsToProcess, zyGoodsId)
// 启动协程,传递独立的 context 和参数,而不是 gin.Context
go func(ctx context.Context, params service.RequestParams, goodsIds []string, ch chan sync.Map) {
go func(ctx context.Context, params service.RequestParams, skuStr, goodsId, spuStr string, ch chan sync.Map) {
defer wg.Done()
zyService.ZyGoodsDetail(ctx, params, goodsIds, ch)
}(ctxWithTimeout, requestParams, idsToProcess, ch)
}
lyService.LyGoodsDetail(ctx, params, skuStr, goodsId, spuStr, ch)
}(ctx, requestParams, skuStr, goodsId, redisLySpuArr[spuId], ch)
if len(lyGoodsId) > 0 {
common.PrintDebugHtml(ctx, "ly增加协程1004:")
common.PrintDebugHtml(ctx, lyGoodsId)
wg.Add(1)
wgcount += 1
idsToProcess := make([]string, len(lyGoodsId))
copy(idsToProcess, lyGoodsId)
// 启动协程,传递独立的 context 和参数,而不是 gin.Context
go func(ctx context.Context, params service.RequestParams, goodsIds []string, ch chan sync.Map) {
defer wg.Done()
lyService.LyGoodsDetail(ctx, params, goodsIds, ch)
}(ctxWithTimeout, requestParams, idsToProcess, ch)
}
// 开启一个协程,等待所有任务完成,然后关闭channel
// 使用 context 来控制这个等待协程的生命周期
go func() {
// 创建一个 done channel 用于通知 wg.Wait() 完成
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
if wgcount >= wgMax {
println("线程数量:" + gconv.String(wgcount))
wg.Wait() // 等待所有登记的goroutine都结束
wgcount = 0
}
}
}
// 等待 wg.Wait() 完成或 context 取消
select {
case <-done:
// 所有协程都正常完成
close(ch)
case <-ctxWithTimeout.Done():
// context 超时,不再等待剩余协程
// 等待一小段时间让协程有机会退出
time.Sleep(300 * time.Millisecond)
close(ch)
logger.Log("等待协程完成超时,强制关闭channel", "sku", 1)
if wgcount > 0 {
//println("等待结束,现有运行线程数:" + gconv.String(wgcount))
//wg.Wait()
}
}()
close(ch)
//异步map最后转成map
temp := make(map[string]interface{})
for {
select {
case GoodsRes, ok := <-ch:
......@@ -182,36 +197,12 @@ func CommonController(ctx *gin.Context) map[string]interface{} {
temp[s] = v
return true
})
case <-ctxWithTimeout.Done():
// context 超时或被取消
logger.Log("协程整体处理超时,所有子协程已收到取消信号", "sku", 1)
// 继续从 channel 读取已经发送的数据,避免协程阻塞
// 等待 channel 被关闭(由上面的 wg.Wait() 协程关闭)
cleanupTimeout := time.After(500 * time.Millisecond)
for {
select {
case GoodsRes, ok := <-ch:
if !ok {
// channel 已关闭,所有协程都已退出
logger.Log("channel已关闭,返回结果", "sku", 1)
case <-time.After(20 * time.Second):
println("协程超时20秒")
return temp
}
// 收集已经发送的数据
GoodsRes.Range(func(k, v interface{}) bool {
s, _ := k.(string)
temp[s] = v
return true
})
case <-cleanupTimeout:
// 清理超时,强制返回
logger.Log("清理超时,强制返回", "sku", 1)
return temp
}
}
}
}
}
func Synchronization(ctx *gin.Context) {
......
......@@ -7,5 +7,5 @@ type PrevSku struct {
ID bson.ObjectId `bson:"_id"`
SkuId int64 `bson:"sku_id"`
SpuId int64 `bson:"spu_id"`
SupplierId int64 `bson:"supplier_id"`
SupplierId int32 `bson:"supplier_id"`
}
......@@ -9,7 +9,6 @@ import (
"go_sku_server/pkg/mongo"
"go_sku_server/service/sorter"
"sort"
"strconv"
"sync"
"gopkg.in/mgo.v2"
......@@ -37,14 +36,7 @@ type Power struct {
联营数据详情
使用 context.Context 控制协程生命周期,避免 gin.Context 在协程中的并发问题
*/
func (ls *LyService) LyGoodsDetail(ctx context.Context, params RequestParams, goodsIds []string, ch chan sync.Map) {
// 检查 context 是否已取消(超时或主动取消)
select {
case <-ctx.Done():
logger.Log("LyGoodsDetail: context已取消,直接返回", "sku", 1)
return
default:
}
func (ls *LyService) LyGoodsDetail(ctx context.Context, params RequestParams, skuStr, goodsId, spuStr string, ch chan sync.Map) {
redisConn := gredis.Conn("search_r")
redisConnSpu := gredis.Conn("spu")
......@@ -68,55 +60,11 @@ func (ls *LyService) LyGoodsDetail(ctx context.Context, params RequestParams, go
//是否展示spu额外信息
showSpuExtra := params.ShowSpuExtra
//批量获取商品详情
skuArr := gredis.Hmget("default_r", "sku", goodsIds)
//为了性能着想,这边也先去批量获取spu的信息
var spuService SpuService
spuList := spuService.getSpuList(skuArr)
GoodsRes := sync.Map{}
for goodsId, skuStr := range skuArr {
// 在处理每个商品前检查 context 是否已取消
select {
case <-ctx.Done():
logger.Log("LyGoodsDetail: 处理过程中context被取消", "sku", 1)
// 不发送不完整的数据,直接返回让协程尽快退出
return
default:
}
//初始化有序map,拼接data数据,就是从redis取出初始数据
sku := model.InitSkuData(skuStr)
var spu string
if skuStr == "" {
// 如果redis中找不到sku数据,尝试从prev_sku MongoDB中查找
skuId, err := strconv.ParseInt(goodsId, 10, 64)
if err != nil {
GoodsRes.Store(goodsId, false)
continue
}
var prevSku model.PrevSku
err = prevSkuMongo.DB("ichunt").C("prev_sku").Find(bson.M{"sku_id": skuId}).One(&prevSku)
if err != nil {
// 如果在prev_sku中也找不到,则保持默认值
GoodsRes.Store(goodsId, false)
continue
}
// 根据找到的spu_id去spu的redis中查找
spuIdStr := strconv.FormatInt(prevSku.SpuId, 10)
spuStr, _ := redis.String(redisConnSpu.Do("HGET", "spu", spuIdStr))
if spuStr == "" {
// 如果spu缓存也没有,保持默认值
GoodsRes.Store(goodsId, false)
continue
}
sku.SupplierId = prevSku.SupplierId
sku.SpuId = spuIdStr
spu = spuStr
} else {
spu = spuList[sku.SpuId]
}
spu := spuStr
sku.GoodsId = goodsId
//读取包装字段的缓存(分别是DGK,avnet,mro)
......@@ -323,15 +271,8 @@ func (ls *LyService) LyGoodsDetail(ctx context.Context, params RequestParams, go
GoodsRes.Store(goodsId, sku)
//(*goodsRes)[goodsId] = A
}
// 发送结果时也要检查 context,避免在超时后阻塞
select {
case <-ctx.Done():
logger.Log("LyGoodsDetail: 发送结果前context已取消,直接返回", "sku", 1)
return
case ch <- GoodsRes:
// 成功发送
}
//退出通道
ch <- GoodsRes
}
// 获取活动
......
......@@ -83,7 +83,7 @@ func (ss *SampleService) GetGoods(ctx *gin.Context, goodsIds []string) map[strin
//抽取自营 或者联营 goods_id
zyService := ZiyingService{} //实例化自营查询
lyService := LyService{} //实例化联营查询
//lyService := LyService{} //实例化联营查询
ch := make(chan sync.Map) //管道
p := 0 //总共协程
zyGoodsId := make([]string, 0)
......@@ -102,7 +102,7 @@ func (ss *SampleService) GetGoods(ctx *gin.Context, goodsIds []string) map[strin
} else { //联营
lyGoodsId = append(lyGoodsId, goodsId)
if len(lyGoodsId) >= 10 {
go lyService.LyGoodsDetail(ctxWithTimeout, params, lyGoodsId, ch)
//go lyService.LyGoodsDetail(ctxWithTimeout, params, lyGoodsId, ch)
lyGoodsId = lyGoodsId[:0:0]
p++
}
......@@ -113,7 +113,7 @@ func (ss *SampleService) GetGoods(ctx *gin.Context, goodsIds []string) map[strin
p++
}
if len(lyGoodsId) > 0 {
go lyService.LyGoodsDetail(ctxWithTimeout, params, lyGoodsId, ch)
//go lyService.LyGoodsDetail(ctxWithTimeout, params, lyGoodsId, ch)
p++
}
//异步map最后转成map
......
package service
import (
"github.com/gomodule/redigo/redis"
"github.com/tidwall/gjson"
"go_sku_server/pkg/gredis"
)
type SpuService struct {
var UsdRate float64 //美金匯率
type SpuService struct {
}
func (ss *SpuService) getSpuList(skuArr map[string]string) (spuList map[string]string) {
func (ss *SpuService) GetSpuList(skuArr map[string]string) (spuList map[string]string) {
var spuIds []string
for _, skuStr := range skuArr {
spuId := gjson.Get(skuStr, "spu_id").String()
......@@ -19,3 +21,10 @@ func (ss *SpuService) getSpuList(skuArr map[string]string) (spuList map[string]s
spuList = gredis.Hmget("spu", "spu", spuIds)
return
}
// 设置查询公共变量
func (ss *SpuService) SetInit() {
redisCon := gredis.Conn("default_r")
usdRatio, _ := redis.Float64(redisCon.Do("HGET", "erp_rate", 2))
UsdRate = usdRatio
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment