Commit acfcbd1c by mushishixian

fix

parents 2a281977 6d7b4d4a
2021-02-02 11:31:02.644 ERROR (main.go:main.(*RecvPro).Consumer:208) not found
2021-02-02 11:33:00.958 ERROR (main.go:main.(*RecvPro).Consumer:208) not found
2021-02-02 11:38:52.737 ERROR (main.go:main.(*RecvPro).Consumer:208) not found
2021-02-02 11:39:28.825 ERROR (main.go:main.(*RecvPro).Consumer:211) not found
2021-02-02 11:39:30.882 ERROR (main.go:main.(*RecvPro).Consumer:211) not found
2021-02-02 11:39:32.436 ERROR (main.go:main.(*RecvPro).Consumer:211) not found
2021-02-02 11:39:53.796 ERROR (main.go:main.(*RecvPro).Consumer:213) not found
2021-02-02 11:41:37.89 ERROR (main.go:main.(*RecvPro).Consumer:208) not found
...@@ -78,43 +78,70 @@ type RecvPro struct { ...@@ -78,43 +78,70 @@ type RecvPro struct {
} }
func (t *RecvPro) Consumer(dataByte []byte) (err error) { func (t *RecvPro) Consumer(dataByte []byte) (err error) {
var viewData model.ViewData //获取队列数据
//fmt.Println(string(dataByte)) var rbmqData model.ViewData
if err = json.Unmarshal(dataByte, &viewData); err != nil { if err = json.Unmarshal(dataByte, &rbmqData); err != nil {
fmt.Println(err) fmt.Println(err)
logger.Error(err.Error()) logger.Error(err.Error())
return return
} }
//先去遍历数据库,全部过一遍 if rbmqData.ActivityId == 0 {
return nil
}
var ( var (
historyData model.HistoryData mongoHistory model.HistoryData
whereMap map[string]interface{} whereMap map[string]interface{}
) )
//先去mongo查找 //先去mongo查找有没有存在的记录,unique_id是由3.0写到cookie的,过期时间为1年
//if viewData.UserId != 0 { whereMap = bson.M{"activity_id": rbmqData.ActivityId, "unique_id": rbmqData.UniqueId}
// whereMap = bson.M{"activity_id": viewData.ActivityId, "user_id": viewData.UserId}
//} else {
whereMap = bson.M{"activity_id": viewData.ActivityId, "unique_id": viewData.UniqueId}
//}
err = mongoConn.DB("ichunt").C("activity_view_history"). err = mongoConn.DB("ichunt").C("activity_view_history").
Find(whereMap).One(&historyData) Find(whereMap).One(&mongoHistory)
if err != nil && err != mgo.ErrNotFound {
//先去获取用户相关信息
var (
mobile string
isNewReg int
adTag string
)
if rbmqData.UserId != 0 {
//找出用户相关的信息
var user model.UserMain
err = db.Reset().Table(&user).Where("user_id", rbmqData.UserId).Select()
if err != nil {
fmt.Println(err) fmt.Println(err)
logger.Error(err.Error()) logger.Error(err.Error())
return
} }
//mysql数据操作 mobile = user.Mobile
var result model.ActivityViewStatistics adTag = user.RegRemark
err = specialDb.Reset().Table(&result).Where("activity_id", viewData.ActivityId).Select() //再去判断是否是这个活动注册的用户
var userActivity model.UserActivity
count, err := db.Reset().Table(&userActivity).Where("activity_id", rbmqData.ActivityId).Where("user_id", rbmqData.UserId).Count()
if err != nil { if err != nil {
fmt.Println("110", err) fmt.Println(err)
logger.Error(err.Error())
}
if count != 0 {
isNewReg = 1
}
}
//如果rbmq传过来的数据在mongo里面找不到,先去数据库查找活动是否存在
var statistics model.ActivityViewStatistics
err = specialDb.Reset().Table(&statistics).Where("activity_id", rbmqData.ActivityId).Select()
//活动不存在,则往数据库插入一条新的统计
if statistics.ActivityId == 0 {
var loginNum, regNum int
if rbmqData.UserId != 0 {
loginNum = 1
if isNewReg == 1 {
regNum = 1
}
} }
if result.ActivityId == 0 {
//先去数据库插入
activityViewStatistics := model.ActivityViewStatistics{ activityViewStatistics := model.ActivityViewStatistics{
ActivityId: viewData.ActivityId, ActivityId: rbmqData.ActivityId,
RegNum: 0, RegNum: regNum,
LoginNum: 0, LoginNum: loginNum,
CreateTime: int(time.Now().Unix()), CreateTime: int(time.Now().Unix()),
} }
_, err = specialDb.Reset().Table("lie_activity_view_statistics").Data(activityViewStatistics).Insert() _, err = specialDb.Reset().Table("lie_activity_view_statistics").Data(activityViewStatistics).Insert()
...@@ -124,19 +151,24 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) { ...@@ -124,19 +151,24 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) {
return return
} }
} else { } else {
loginNum := result.LoginNum loginNum := statistics.LoginNum
regNum := result.RegNum regNum := statistics.RegNum
if viewData.UserId != 0 { //如果已经存在的mongo记录里面,已经存在了unique_id,但是user_id为空,同时新传过来的rbmq的数据的user_id不为空,就代表这个用户
//从未登录变成了登陆态,该活动的登陆人数+1,同时去判断该用户是否是这个活动注册的用户
if mongoHistory.UserId == 0 && rbmqData.UserId != 0 {
loginNum++ loginNum++
if isNewReg == 1 {
regNum++
}
} }
//存在则去更新 //存在则去更新
activityViewStatistics := model.ActivityViewStatistics{ activityViewStatistics := model.ActivityViewStatistics{
ActivityId: viewData.ActivityId, ActivityId: rbmqData.ActivityId,
RegNum: regNum, RegNum: regNum,
LoginNum: loginNum, LoginNum: loginNum,
UpdateTime: int(time.Now().Unix()), UpdateTime: int(time.Now().Unix()),
} }
_, err = specialDb.Reset().Table("lie_activity_view_statistics").Where("activity_id", viewData.ActivityId). _, err = specialDb.Reset().Table("lie_activity_view_statistics").Where("activity_id", rbmqData.ActivityId).
Data(activityViewStatistics).Update() Data(activityViewStatistics).Update()
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
...@@ -144,68 +176,41 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) { ...@@ -144,68 +176,41 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) {
return return
} }
} }
//如果已经存在的mongo记录里面,已经存在了unique_id,但是user_id为空,同时新传过来的rbmq的数据的user_id不为空
//如果mongo存在,更新mongo数据 //此时只需要去更新mongo对应这条记录的user_id,mobile等信息
if historyData.UserId != 0 { if mongoHistory.UserId != 0 {
historyData.LastVisitTime = int(time.Now().Unix()) mongoHistory.LastVisitTime = int(time.Now().Unix())
historyData.LastVisitTime = int(time.Now().Unix()) selector := bson.M{"unique_id": mongoHistory.UniqueId}
selector := bson.M{"unique_id": historyData.UniqueId} err = mongoConn.DB("ichunt").C("activity_view_history").Update(selector, &mongoHistory)
err = mongoConn.DB("ichunt").C("activity_view_history").Update(selector, &historyData)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
logger.Error(err.Error()) logger.Error(err.Error())
} }
} else { } else {
var (
mobile string newData := model.HistoryData{
isNewReg int UniqueId: rbmqData.UniqueId,
adTag string ActivityId: rbmqData.ActivityId,
)
if viewData.UserId != 0 {
//找出用户相关的信息
var user model.UserMain
err = db.Reset().Table(&user).Where("user_id", viewData.UserId).Select()
if err != nil {
fmt.Println("用户相关的信息为空 : ", err)
logger.Error(err.Error())
}
mobile = user.Mobile
adTag = user.RegRemark
//再去判断是否是这个活动注册的用户
var userActivity model.UserActivity
count, err := db.Reset().Table(&userActivity).Where("activity_id", viewData.ActivityId).Where("user_id", viewData.UserId).Count()
if err != nil {
fmt.Println(err)
logger.Error(err.Error())
}
if count != 0 {
isNewReg = 1
}
}
historyData := model.HistoryData{
UniqueId: viewData.UniqueId,
ActivityId: viewData.ActivityId,
Mobile: mobile, Mobile: mobile,
UserId: viewData.UserId, UserId: rbmqData.UserId,
IsNewReg: isNewReg, IsNewReg: isNewReg,
AdTag: adTag, AdTag: adTag,
Source: viewData.Source, Source: rbmqData.Source,
VisitTime: viewData.VisitTime, VisitTime: rbmqData.VisitTime,
LastVisitTime: 0, LastVisitTime: int(time.Now().Unix()),
Pf: viewData.Pf, Pf: rbmqData.Pf,
} }
if historyData.UniqueId == "" { if mongoHistory.UniqueId == "" {
err = mongoConn.DB("ichunt").C("activity_view_history").Insert(&historyData) err = mongoConn.DB("ichunt").C("activity_view_history").Insert(&newData)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
logger.Error(err.Error()) logger.Error(err.Error())
} }
} else { } else {
historyData.LastVisitTime = int(time.Now().Unix()) selector := bson.M{"unique_id": mongoHistory.UniqueId}
selector := bson.M{"unique_id": historyData.UniqueId} err = mongoConn.DB("ichunt").C("activity_view_history").Update(selector, &newData)
err = mongoConn.DB("ichunt").C("activity_view_history").Update(selector, &historyData)
if err != nil { if err != nil {
fmt.Println("205", err) fmt.Println(err)
logger.Error(err.Error()) logger.Error(err.Error())
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment