Commit 46873e98 by mushishixian

修改统计表

parent 6cb2647f
......@@ -20,6 +20,6 @@ require (
github.com/stretchr/testify v1.4.0 // indirect
github.com/syyongx/php2go v0.9.4
github.com/tidwall/gjson v1.6.0
golang.org/x/text v0.3.2 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
)
2021-01-15 13:53:13.783 ERROR (main.go:main.main:77) sdsd
2021-01-15 13:54:25.879 ERROR (main.go:main.main:75) sadisaiod
package main
import (
"encoding/json"
"flag"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/gohouse/gorose/v2"
"github.com/ichunt2019/logger"
"github.com/tidwall/gjson"
"go-queue-server/special/activity_statistics/model"
"go-queue-server/util"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"time"
)
......@@ -15,10 +19,12 @@ var LogDir string
var err error
var engine *gorose.Engin
var specialEngine *gorose.Engin
var mongoConn *mgo.Session
func init() {
initArgs()
util.Init(ConfigDir)
initLog()
initDB()
}
......@@ -30,6 +36,17 @@ func initArgs() {
flag.Parse()
}
func initLog() {
logConfig := make(map[string]string)
logConfig["log_path"] = LogDir + "special/activity_statistics/logs"
logConfig["log_chan_size"] = "100"
err := logger.InitLogger("file", logConfig)
if err != nil {
panic(err)
}
logger.Init()
}
//初始化数据库
func initDB() {
dsn := util.Configs.Liexin_databases.Dns
......@@ -57,19 +74,23 @@ func main() {
//先去遍历数据库,全部过一遍
db := DB()
specialDb := SpecialDB()
mongoConn, err = model.GetMongoConn(util.Configs.Mongo_config)
defer func() {
db.Close()
specialDb.Close()
mongoConn.Close()
}()
nowTime := time.Now().Unix()
//先去取出所有的正在进行的活动列表
var giftActivityList []model.GiftActivity
err = specialDb.Table(&giftActivityList).Fields("id,start_time,end_time").Where("end_time", ">", nowTime).Select()
if err != nil {
fmt.Println(err)
logger.Error(err.Error())
}
//统计满赠活动******************
for _, activity := range giftActivityList {
//一个个去统计
orderNum, _ := db.Table("lie_order_gift").Where("activity_id", activity.Id).Count()
newUserNum, _ := db.Reset().Table("lie_order_gift").LeftJoin("lie_user_main on lie_order_gift.user_id = lie_user_main.user_id").
Where("lie_order_gift.activity_id", activity.Id).Where("lie_user_main.is_new", 1).Count()
......@@ -83,26 +104,32 @@ func main() {
data["activity_id"] = activity.Id
data["type"] = 2
if count > 0 {
specialDb.Reset().Table("lie_activity_statistics").Where("activity_id", activity.Id).Data(data).Update()
_, err = specialDb.Reset().Table("lie_activity_statistics").Where("activity_id", activity.Id).Data(data).Update()
if err != nil {
logger.Error(err.Error())
}
} else {
data["create_time"] = time.Now().Unix()
specialDb.Reset().Table("lie_activity_statistics").Data(data).Insert()
_, err = specialDb.Reset().Table("lie_activity_statistics").Data(data).Insert()
if err != nil {
logger.Error(err.Error())
}
}
}
//先去取出所有的正在进行的活动列表
//价格促销活动*****************
var priceActivityList []model.PriceActivity
err = specialDb.Reset().Table(&priceActivityList).Fields("id,start_time,end_time").Where("end_time", ">", nowTime).Select()
if err != nil {
fmt.Println(err)
logger.Error(err.Error())
}
fmt.Println(priceActivityList)
for _, activity := range priceActivityList {
//一个个去统计(type=4代表是活动)
orderNum, _ := db.Reset().Table("lie_order_items_price").Where("activity_id", activity.Id).GroupBy("order_id").Count("id")
newUserNum, _ := db.Reset().Table("lie_order_items_price").LeftJoin("lie_user_main on lie_order_items_price.user_id = lie_user_main.user_id").
Where("lie_order_items_price.activity_id", activity.Id).Where("lie_user_main.is_new", 1).
Where("lie_order_items_price.type", 4).GroupBy("order_id").Count("id")
////先查询统计是否存在
orderNum, _ := db.Reset().Table("lie_order_activity_count").Where("activity_id", activity.Id).Count("order_id")
newUserNum, _ := db.Reset().Table("lie_order_activity_count").LeftJoin("lie_user_main on lie_order_activity_count.user_id = lie_user_main.user_id").
Where("lie_order_activity_count.activity_id", activity.Id).Where("lie_user_main.is_new", 1).
Count("lie_user_main.user_id")
//先查询统计是否存在
count, _ := specialDb.Reset().Table("lie_activity_statistics").Where("activity_id", activity.Id).
Where("type", 1).Count()
data := make(map[string]interface{})
......@@ -112,10 +139,49 @@ func main() {
data["activity_id"] = activity.Id
data["type"] = 1
if count > 0 {
specialDb.Reset().Table("lie_activity_statistics").Where("activity_id", activity.Id).Data(data).Update()
_,err = specialDb.Reset().Table("lie_activity_statistics").Where("activity_id", activity.Id).Data(data).Update()
if err != nil {
logger.Error(err.Error())
}
} else {
data["create_time"] = time.Now().Unix()
specialDb.Reset().Table("lie_activity_statistics").Data(data).Insert()
_,err = specialDb.Reset().Table("lie_activity_statistics").Data(data).Insert()
if err != nil {
logger.Error(err.Error())
}
}
items, err := db.Reset().Fields("*,count(amount) as total_amount").Table("lie_order_items_price").
LeftJoin("lie_user_main on lie_order_items_price.user_id = lie_user_main.user_id").
Where("lie_order_items_price.activity_id", activity.Id).Where("lie_order_items_price.type", 4).
GroupBy("order_id").Get()
if err != nil {
logger.Error(err.Error())
}
for _, item := range items {
resultByte, err := json.Marshal(item)
result := string(resultByte)
//把订单列表写到mongo里面
//先去检查这个订单是否在里面
var priceActivityMongo model.PriceActivityMongo
err = mongoConn.DB("ichunt").C("price_activity_statistics").
Find(bson.M{"order_id": gjson.Get(result, "order_id").Int()}).One(&priceActivityMongo)
if err != nil && err != mgo.ErrNotFound {
logger.Error(err.Error())
}
//存在跳过,不存在才插入
if priceActivityMongo.OrderId == 0 {
priceActivityMongo.ActivityId = gjson.Get(result, "activity_id").Int()
priceActivityMongo.OrderId = gjson.Get(result, "order_id").Int()
priceActivityMongo.UserId = gjson.Get(result, "user_id").Int()
priceActivityMongo.Mobile = gjson.Get(result, "mobile").String()
priceActivityMongo.OrderAmount = gjson.Get(result, "total_amount").Float()
priceActivityMongo.Pf = gjson.Get(result, "create_device").Int()
err = mongoConn.DB("ichunt").C("price_activity_statistics").Insert(&priceActivityMongo)
if err != nil {
logger.Error(err.Error())
}
}
}
}
}
package model
import (
"go-queue-server/util"
"gopkg.in/mgo.v2"
)
var session *mgo.Session
var err error
func GetMongoConn(mongoConfig *util.MongoConn) (*mgo.Session, error) {
url := mongoConfig.Host
session, err = mgo.Dial(url)
if err != nil {
return nil, err
}
//fmt.Println("url",url)
//fmt.Println("maxPoolSizeInt",maxPoolSizeInt)
session.SetMode(mgo.Monotonic, true)
myDB := session.DB("ichunt")
err = myDB.Login(mongoConfig.Username, mongoConfig.Password)
if err != nil {
return nil, err
}
return session, nil
}
package model
type OrderItemsPrice struct {
}
func (otp *OrderItemsPrice) TableName() string {
return "lie_order_items_price"
}
......@@ -11,6 +11,15 @@ type PriceActivity struct {
EndTime int `gorose:"end_time"`
}
type PriceActivityMongo struct {
ActivityId int64 `bson:"activity_id"`
UserId int64 `bson:"user_id"`
OrderId int64 `bson:"order_id"`
Mobile string `bson:"mobile"`
Pf int64 `bson:"pf"`
OrderAmount float64 `bson:"order_amount"`
}
func (ga *PriceActivity) TableName() string {
return "lie_price_activity"
}
......
package main
import (
"encoding/json"
"flag"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/gohouse/gorose/v2"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
"go-queue-server/special/activity_view_statistics/model"
"go-queue-server/util"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"time"
)
var (
ConfigDir string
LogDir string
err error
engine *gorose.Engin
specialEngine *gorose.Engin
mongoConn *mgo.Session
db gorose.IOrm
specialDb gorose.IOrm
)
func init() {
initArgs()
util.Init(ConfigDir)
initDB()
}
// 解析命令行参数
func initArgs() {
//go run main.go -configDir xxx
flag.StringVar(&ConfigDir, "configDir", "", "配置文件")
flag.StringVar(&LogDir, "logDir", "", "配置文件")
flag.Parse()
}
//初始化数据库
func initDB() {
dsn := util.Configs.Liexin_databases.Dns
specialDsn := util.Configs.Special_databases.Dns
engine, err = gorose.Open(&gorose.Config{Driver: "mysql", Dsn: dsn})
if err != nil {
panic(err)
}
specialEngine, err = gorose.Open(&gorose.Config{Driver: "mysql", Dsn: specialDsn})
if err != nil {
panic(err)
}
}
func DB() gorose.IOrm {
return engine.NewOrm()
}
func SpecialDB() gorose.IOrm {
return specialEngine.NewOrm()
}
type RecvPro struct {
}
func (t *RecvPro) Consumer(dataByte []byte) (err error) {
var viewData model.ViewData
if err = json.Unmarshal(dataByte, &viewData); err != nil {
fmt.Println(err)
return
}
//先去遍历数据库,全部过一遍
var (
historyData model.HistoryData
whereMap map[string]interface{}
)
//先去mongo查找
if viewData.UserId != 0 {
whereMap = bson.M{"activity_id": viewData.ActivityId, "user_id": viewData.UserId}
} else {
whereMap = bson.M{"activity_id": viewData.ActivityId, "unique_id": viewData.UniqueId}
}
err = mongoConn.DB("ichunt").C("activity_view_history").
Find(whereMap).One(&historyData)
if err != nil && err != mgo.ErrNotFound {
fmt.Println(err)
return
}
//mysql数据操作
var result model.ActivityViewStatistics
err = specialDb.Table(&result).Where("activity_id", viewData.ActivityId).Select()
if result.ActivityId == 0 {
//先去数据库插入
activityViewStatistics := model.ActivityViewStatistics{
ActivityId: viewData.ActivityId,
RegNum: 0,
LoginNum: 0,
CreateTime: int(time.Now().Unix()),
}
_, err = specialDb.Reset().Table("lie_activity_view_statistics").Data(activityViewStatistics).Insert()
if err != nil {
fmt.Println(err)
return
}
} else {
loginNum := result.LoginNum
regNum := result.RegNum
if viewData.UserId != 0 {
loginNum++
}
//存在则去更新
activityViewStatistics := model.ActivityViewStatistics{
ActivityId: viewData.ActivityId,
RegNum: regNum,
LoginNum: loginNum,
UpdateTime: int(time.Now().Unix()),
}
_, err = specialDb.Reset().Table("lie_activity_view_statistics").Where("activity_id", viewData.ActivityId).
Data(activityViewStatistics).Update()
if err != nil {
fmt.Println(err)
return
}
}
//如果mongo存在,更新mongo数据
if historyData.ActivityId != 0 {
historyData.LastVisitTime = int(time.Now().Unix())
} else {
var (
mobile string
isNewReg int
adTag string
)
if viewData.UserId != 0 {
//找出用户相关的信息
var user model.UserMain
err = db.Reset().Table(&user).Where("user_id", viewData.UserId).Select()
if err != nil {
fmt.Println(err)
}
mobile = user.Mobile
adTag = user.RegRemark
//再去判断是否是这个活动注册的用户
var userActivity model.UserActivity
count, err := db.Reset().Table(&userActivity).Where("activity_id", viewData.ActivityId).Where("user_id", viewData.UserId).Count()
if err != nil {
fmt.Println(err)
}
if count != 0 {
isNewReg = 1
}
}
historyData := model.HistoryData{
UniqueId: viewData.UniqueId,
ActivityId: viewData.ActivityId,
Mobile: mobile,
UserId: viewData.UserId,
IsNewReg: isNewReg,
AdTag: adTag,
Source: viewData.Source,
VisitTime: viewData.VisitTime,
LastVisitTime: 0,
Pf: viewData.Pf,
}
err = mongoConn.DB("ichunt").C("activity_view_history").Insert(&historyData)
if err != nil {
fmt.Println(err)
}
}
return nil
}
func (t *RecvPro) FailAction(dataByte []byte) error {
fmt.Println("任务处理失败了,发送钉钉消息通知主人")
return nil
}
func main() {
db = DB()
specialDb = SpecialDB()
mongoConn, err = model.GetMongoConn(util.Configs.Mongo_config)
defer func() {
db.Close()
specialDb.Close()
mongoConn.Close()
}()
t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{
"activity_view_statistics",
"activity_view_statistics",
"CREDITHC_CS",
"direct",
util.Configs.Rabbitmq_ichunt.Dns,
}, t, 1)
}
func init() {
//queueExchange := rabbitmq.QueueExchange{
// "activity_view_statistics",
// "activity_view_statistics",
// "CREDITHC_CS",
// "direct",
// util.Configs.Rabbitmq_ichunt.Dns,
//}
//
////str := `{"bom_id":666,"delivery_type":1,"sort":1}`
//str := `{"activity_id":142,"unique_id":"111","user_id":1,"source":"activity","referer":"test","visit_time":1610527567,"pf":1}`
//rabbitmq.Send(queueExchange, str)
}
package model
type ActivityViewStatistics struct {
ActivityId int `gorose:"activity_id"`
RegNum int `gorose:"reg_num"`
LoginNum int `gorose:"login_num"`
CreateTime int `gorose:"create_time"`
UpdateTime int `gorose:"update_time"`
}
func (avs *ActivityViewStatistics) TableName() string {
return "lie_activity_view_statistics"
}
package model
type HistoryData struct {
UniqueId string `bson:"unique_id"`
ActivityId int `bson:"activity_id"`
Mobile string `bson:"mobile"`
UserId int `bson:"user_id"`
IsNewReg int `bson:"is_new_reg"`
AdTag string `bson:"ad_tag"`
Source string `bson:"source"`
VisitTime int `bson:"visit_time"`
LastVisitTime int `bson:"last_visit_time"`
Pf int `bson:"pf"`
}
package model
import (
"go-queue-server/util"
"gopkg.in/mgo.v2"
)
var session *mgo.Session
var err error
func GetMongoConn(mongoConfig *util.MongoConn) (*mgo.Session, error) {
url := mongoConfig.Host
session, err = mgo.Dial(url)
if err != nil {
return nil, err
}
//fmt.Println("url",url)
//fmt.Println("maxPoolSizeInt",maxPoolSizeInt)
session.SetMode(mgo.Monotonic, true)
myDB := session.DB("ichunt")
err = myDB.Login(mongoConfig.Username, mongoConfig.Password)
if err != nil {
return nil, err
}
return session, nil
}
package model
type UserActivity struct {
UserId int `gorose:"user_id"`
ActivityId int `gorose:"activity_id"`
}
func (ua *UserActivity) TableName() string {
return "lie_user_activity"
}
package model
type UserMain struct {
UserId int `gorose:"user_id"`
Mobile string `gorose:"mobile"`
RegRemark string `gorose:"reg_remark"`
}
func (um *UserMain) TableName() string {
return "lie_user_main"
}
package model
type ViewData struct {
ActivityId int `json:"activity_id"`
UniqueId string `json:"unique_id"`
UserId int `json:"user_id"`
Source string `json:"source"`
Referer string `json:"referer"`
VisitTime int `json:"visit_time"`
Pf int `json:"pf"`
}
......@@ -35,3 +35,4 @@ func SendMessage(userId int, content string) bool {
}
return true
}
......@@ -10,18 +10,19 @@ type Config struct {
LiexinLabel_databases *LiexinLabelMysqlConfig
Liexincms_databases *LiexinCmsMysqlConfig
Bom_databases *BomDatabasesMysqlConfig
Special_databases *SpecialDatabasesMysqlConfig
Special_databases *SpecialDatabasesMysqlConfig
Mongo_config *MongoConn
Rabbitmq_ichunt *RabbitmqIchunt
Rabbitmq_erp_label *RabbitmqErpLabel
Rabbitmq_order_push_stock *RabbitmqOrderPushStock
Rabbitmq_yaohaoyou *RabbitmqYaoHaoYou
Rabbitmq_bomcreateorder *RabbitmqBomCreateOrder
Crm_domain *SendMail
Cms_user_domain *SendUserLoginMail
Ding_msg *Ding
Api_domain *ApiDomain
Redis_config *RedisConn
Fengkong_domain *FengkongDomain
Rabbitmq_yaohaoyou *RabbitmqYaoHaoYou
Rabbitmq_bomcreateorder *RabbitmqBomCreateOrder
Crm_domain *SendMail
Cms_user_domain *SendUserLoginMail
Ding_msg *Ding
Api_domain *ApiDomain
Redis_config *RedisConn
Fengkong_domain *FengkongDomain
}
type LiexinMysqlConfig struct {
......@@ -80,7 +81,7 @@ type SendMail struct {
SendMailUrl string `toml:"send_mail"`
}
type SendUserLoginMail struct{
type SendUserLoginMail struct {
SendUserLoginMailUrl string `toml:"send_user_login_mail"`
}
......@@ -107,6 +108,13 @@ type RedisConn struct {
Port string `toml:"port"`
}
type MongoConn struct {
Host string `toml:"host"`
Username string `toml:"username"`
Password string `toml:"password"`
Port string `toml:"port"`
}
type FengkongDomain struct {
GoUrl string `toml:"go_url"`
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment