Commit 21b69c3e by mushishixian

修改统计到redis

parent acfcbd1c
2021-02-03 16:17:04.719 ERROR (main.go:main.(*RecvPro).Consumer:149) unexpected end of JSON input
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
"github.com/garyburd/redigo/redis"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/gohouse/gorose/v2" "github.com/gohouse/gorose/v2"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq" "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
...@@ -24,6 +25,7 @@ var ( ...@@ -24,6 +25,7 @@ var (
mongoConn *mgo.Session mongoConn *mgo.Session
db gorose.IOrm db gorose.IOrm
specialDb gorose.IOrm specialDb gorose.IOrm
RedisConn redis.Conn
) )
func init() { func init() {
...@@ -64,6 +66,17 @@ func initDB() { ...@@ -64,6 +66,17 @@ func initDB() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
RedisConn = connectRedis(util.Configs)
}
func connectRedis(config *util.Config) redis.Conn {
redisDns := config.Redis_config.Host + ":" + config.Redis_config.Port
option := redis.DialPassword(config.Redis_config.Password)
conn, err := redis.Dial("tcp", redisDns, option)
if err != nil {
panic(err)
}
return conn
} }
func DB() gorose.IOrm { func DB() gorose.IOrm {
...@@ -83,10 +96,10 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) { ...@@ -83,10 +96,10 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) {
if err = json.Unmarshal(dataByte, &rbmqData); err != nil { if err = json.Unmarshal(dataByte, &rbmqData); err != nil {
fmt.Println(err) fmt.Println(err)
logger.Error(err.Error()) logger.Error(err.Error())
return return nil
} }
if rbmqData.ActivityId == 0 { if rbmqData.ActivityId == 0 {
return nil return
} }
var ( var (
...@@ -104,8 +117,9 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) { ...@@ -104,8 +117,9 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) {
isNewReg int isNewReg int
adTag string adTag string
) )
if rbmqData.UserId != 0 {
//找出用户相关的信息 //找出用户相关的信息
if rbmqData.UserId != 0 {
var user model.UserMain var user model.UserMain
err = db.Reset().Table(&user).Where("user_id", rbmqData.UserId).Select() err = db.Reset().Table(&user).Where("user_id", rbmqData.UserId).Select()
if err != nil { if err != nil {
...@@ -126,10 +140,20 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) { ...@@ -126,10 +140,20 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) {
} }
} }
//如果rbmq传过来的数据在mongo里面找不到,先去数据库查找活动是否存在 //如果rbmq传过来的数据在mongo里面找不到,先去redis查找活动是否存在
var statistics model.ActivityViewStatistics var statistics model.ActivityViewStatistics
err = specialDb.Reset().Table(&statistics).Where("activity_id", rbmqData.ActivityId).Select() redisData, err := redis.String(RedisConn.Do("HGET", "activity_view_statistics", rbmqData.ActivityId))
if redisData != "" {
err = json.Unmarshal([]byte(redisData), &statistics)
if err != nil {
fmt.Println(err)
logger.Error(err.Error())
return nil
}
}
//err = specialDb.Reset().Table(&statistics).Where("activity_id", rbmqData.ActivityId).Select()
//活动不存在,则往数据库插入一条新的统计 //活动不存在,则往数据库插入一条新的统计
var activityViewStatistics model.ActivityViewStatistics
if statistics.ActivityId == 0 { if statistics.ActivityId == 0 {
var loginNum, regNum int var loginNum, regNum int
if rbmqData.UserId != 0 { if rbmqData.UserId != 0 {
...@@ -138,18 +162,22 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) { ...@@ -138,18 +162,22 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) {
regNum = 1 regNum = 1
} }
} }
activityViewStatistics := model.ActivityViewStatistics{ activityViewStatistics = model.ActivityViewStatistics{
ActivityId: rbmqData.ActivityId, ActivityId: rbmqData.ActivityId,
RegNum: regNum, RegNum: regNum,
Uv: 1,
Pv: 1,
LoginNum: loginNum, LoginNum: loginNum,
CreateTime: int(time.Now().Unix()), CreateTime: int(time.Now().Unix()),
UpdateTime: 0,
} }
_, err = specialDb.Reset().Table("lie_activity_view_statistics").Data(activityViewStatistics).Insert() //_, err := specialDb.Reset().Table("lie_activity_view_statistics").Data(activityViewStatistics).Insert()
if err != nil { //_, err := RedisConn.Do("HSET", "activity_view_statistics", rbmqData.ActivityId, activityViewStatistics)
fmt.Println(err) //if err != nil {
logger.Error(err.Error()) // fmt.Println(err)
return // logger.Error(err.Error())
} // return
//}
} else { } else {
loginNum := statistics.LoginNum loginNum := statistics.LoginNum
regNum := statistics.RegNum regNum := statistics.RegNum
...@@ -161,21 +189,39 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) { ...@@ -161,21 +189,39 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) {
regNum++ regNum++
} }
} }
if mongoHistory.UniqueId == "" {
statistics.Uv += 1
}
//存在则去更新 //存在则去更新
activityViewStatistics := model.ActivityViewStatistics{ activityViewStatistics = model.ActivityViewStatistics{
ActivityId: rbmqData.ActivityId, ActivityId: rbmqData.ActivityId,
Pv: statistics.Pv + 1,
Uv: statistics.Uv,
RegNum: regNum, RegNum: regNum,
LoginNum: loginNum, LoginNum: loginNum,
UpdateTime: int(time.Now().Unix()), UpdateTime: int(time.Now().Unix()),
} }
_, err = specialDb.Reset().Table("lie_activity_view_statistics").Where("activity_id", rbmqData.ActivityId). //_, err = specialDb.Reset().Table("lie_activity_view_statistics").Where("activity_id", rbmqData.ActivityId).
Data(activityViewStatistics).Update() // Data(activityViewStatistics).Update()
//if err != nil {
// fmt.Println(err)
// logger.Error(err.Error())
// return
//}
}
result, err := json.Marshal(activityViewStatistics)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
logger.Error(err.Error()) logger.Error(err.Error())
return return nil
} }
_, err = RedisConn.Do("HSET", "activity_view_statistics", rbmqData.ActivityId, string(result))
if err != nil {
fmt.Println(err)
logger.Error(err.Error())
return nil
} }
//如果已经存在的mongo记录里面,已经存在了unique_id,但是user_id为空,同时新传过来的rbmq的数据的user_id不为空 //如果已经存在的mongo记录里面,已经存在了unique_id,但是user_id为空,同时新传过来的rbmq的数据的user_id不为空
//此时只需要去更新mongo对应这条记录的user_id,mobile等信息 //此时只需要去更新mongo对应这条记录的user_id,mobile等信息
if mongoHistory.UserId != 0 { if mongoHistory.UserId != 0 {
...@@ -185,9 +231,9 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) { ...@@ -185,9 +231,9 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) {
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
logger.Error(err.Error()) logger.Error(err.Error())
return
} }
} else { } else {
newData := model.HistoryData{ newData := model.HistoryData{
UniqueId: rbmqData.UniqueId, UniqueId: rbmqData.UniqueId,
ActivityId: rbmqData.ActivityId, ActivityId: rbmqData.ActivityId,
...@@ -214,9 +260,8 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) { ...@@ -214,9 +260,8 @@ func (t *RecvPro) Consumer(dataByte []byte) (err error) {
logger.Error(err.Error()) logger.Error(err.Error())
} }
} }
} }
return nil return
} }
func (t *RecvPro) FailAction(dataByte []byte) error { func (t *RecvPro) FailAction(dataByte []byte) error {
...@@ -232,6 +277,7 @@ func main() { ...@@ -232,6 +277,7 @@ func main() {
db.Close() db.Close()
specialDb.Close() specialDb.Close()
mongoConn.Close() mongoConn.Close()
RedisConn.Close()
}() }()
t := &RecvPro{} t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{ rabbitmq.Recv(rabbitmq.QueueExchange{
......
package model package model
//type ActivityViewStatistics struct {
// ActivityId int `gorose:"activity_id"`
// Uv int `gorose:"uv"`
// Pv int `gorose:"pv"`
// RegNum int `gorose:"reg_num"`
// LoginNum int `gorose:"login_num"`
// CreateTime int `gorose:"create_time"`
// UpdateTime int `gorose:"update_time"`
//}
type ActivityViewStatistics struct { type ActivityViewStatistics struct {
ActivityId int `gorose:"activity_id"` ActivityId int `json:"activity_id"`
RegNum int `gorose:"reg_num"` Uv int `json:"uv"`
LoginNum int `gorose:"login_num"` Pv int `json:"pv"`
CreateTime int `gorose:"create_time"` RegNum int `json:"reg_num"`
UpdateTime int `gorose:"update_time"` LoginNum int `json:"login_num"`
CreateTime int `json:"create_time"`
UpdateTime int `json:"update_time"`
} }
func (avs *ActivityViewStatistics) TableName() string { func (avs *ActivityViewStatistics) TableName() string {
return "lie_activity_view_statistics" return "lie_activity_view_statistics"
} }
type HistoryData struct { type HistoryData struct {
UniqueId string `bson:"unique_id"` UniqueId string `bson:"unique_id"`
ActivityId int `bson:"activity_id"` ActivityId int `bson:"activity_id"`
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment