Commit 4ee7b2e6 by 朱继来

添加bom_crm队列消费

parent 71bf7802
...@@ -141,7 +141,7 @@ func consume(bom BomNoResult) { ...@@ -141,7 +141,7 @@ func consume(bom BomNoResult) {
if sale_id != 0 { if sale_id != 0 {
var sale_uid int64 // 客服前台ID var sale_uid int64 // 客服前台ID
dao.GetCmsDb().Get(&sale_uid, "select user_id from lie_intracode where admin_id = ?", 1357) dao.GetCmsDb().Get(&sale_uid, "select user_id from lie_intracode where admin_id = ?", sale_id)
if sale_uid == 0 { if sale_uid == 0 {
sendMsg("Bom推送到CRM,内部用户ID("+strconv.Itoa(sale_id)+")未绑定前台账号") sendMsg("Bom推送到CRM,内部用户ID("+strconv.Itoa(sale_id)+")未绑定前台账号")
......
package main
import (
"crm-server/configs"
"crm-server/internal/dao"
"crm-server/internal/service"
"encoding/json"
"flag"
"fmt"
"github.com/ichunt2019/logger"
"github.com/streadway/amqp"
"strconv"
"time"
)
// bom下单后推送到crm,获取用户关联客服
// 日志目录
var LogDir string
// 解析命令行参数
func initArgs() {
// worker -config ./worker.json
// worker -h
flag.StringVar(&LogDir, "logDir", "", "日志目录")
flag.Parse()
}
type BomOrder struct {
UserId int `json:"user_id"`
BomId int `json:"bom_id"`
BomSn string `json:"bom_sn"`
}
// 钉钉告警
func sendMsg(msg_text string) {
// 默认参数
var mobile []string = make([]string, 0)
var isAtAll bool = false
service.DingSend(configs.Ding_crm_msg, "队列任务", msg_text, mobile, isAtAll)
}
// 用户账号
type Account struct {
Mobile string
Email string
}
func main() {
initArgs()
logConfig := make(map[string]string)
logConfig["log_path"] = LogDir+"/bom_order"
logConfig["log_chan_size"] = "10"
logger.InitLogger("file",logConfig)
logger.Init()
conn, err := amqp.Dial("amqp://"+configs.RABBITMQBOM+"/")
if err != nil {
logger.Info("Failed to connect to RabbitMQ ", err.Error())
sendMsg("bom_crm队列任务,连接MQ失败,原因:"+err.Error())
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
logger.Info("Failed to open a channel ", err.Error())
sendMsg("bom_crm队列任务,打开channel失败,原因:"+err.Error())
}
defer ch.Close()
// 定义交换
//err = ch.ExchangeDeclare("bom", "direct", true, false, false, false, nil)
//if nil != err {
// logger.Info("Failed to define a exchange ", err.Error())
// sendMsg("bom_crm队列任务,定义exchange失败,原因:"+err.Error())
//}
q, err := ch.QueueDeclare(
"bom_crm", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
logger.Info("Failed to declare a queue ", err.Error())
sendMsg("bom_crm队列任务,声明queue失败,原因:"+err.Error())
}
err = ch.QueueBind(q.Name, "bom_crm", "bom", false, nil)
if nil != err {
logger.Info("Failed to bind a queue and exchange ", err.Error())
sendMsg("bom_crm队列任务,队列绑定失败,原因:"+err.Error())
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
logger.Info("Failed to register a consumer ", err.Error())
sendMsg("bom_crm队列任务,注册消费者失败,原因:"+err.Error())
}
forever := make(chan bool)
var bom_order BomOrder
go func() {
for d := range msgs {
fmt.Println(string(d.Body))
sendMsg("获取bom_crm队列:"+string(d.Body))
json.Unmarshal(d.Body, &bom_order)
consume(bom_order)
}
}()
<-forever
}
// 处理
func consume(bom BomOrder) {
// 获取crm的用户ID
var crmUserId int
err := dao.GetDb().Get(&crmUserId, "select user_id from lie_user where outter_uid = ?", bom.UserId)
if err != nil {
logger.Info("Failed to connect crm db ", err.Error())
sendMsg("bom_crm队列任务,连接CRM数据库失败,原因:"+err.Error())
return
}
if crmUserId == 0 { // 不存在推送告警
sendMsg("bom_crm队列任务,用户ID("+strconv.Itoa(bom.UserId)+")CRM不存在")
return
}
// 获取绑定客服
var sale_id int // 客服ID
var sale_name string // 客服名称
dao.GetDb().Get(&sale_id, "select sale_id from lie_salesman where user_id = ?", crmUserId)
if sale_id != 0 { // 若存在,则获取客服名称
dao.GetCmsDb().Get(&sale_name, "select name from user_info where userId = ?", sale_id)
}
// 获取用户账户
var account Account
var client_account string
dao.GetLiexinDb().Get(&account,"select mobile, email from lie_user_main where user_id = ?", bom.UserId)
if account.Mobile != "" {
client_account = account.Mobile
} else {
client_account = account.Email
}
// 添加到lie_bom_extend表
curr_time := time.Now().Unix()
_, err = dao.GetLiexinBomDb().Exec("insert into lie_bom_extend (kefu_id, kefu_name, bom_id, bom_sn, user_id, user_name, create_time, update_time) values (?,?,?,?,?,?,?,?)",
sale_id, sale_name, bom.BomId, bom.BomSn, bom.UserId, client_account, curr_time, curr_time)
if err != nil {
logger.Info("Failed to insert into lie_bom_extend db ", err.Error())
sendMsg("bom_crm队列任务,添加到lie_bom_extend表失败,原因:"+err.Error())
return
}
sendMsg("bom_crm队列任务,添加到lie_bom_extend表成功")
}
...@@ -11,10 +11,12 @@ import ( ...@@ -11,10 +11,12 @@ import (
var instance * sqlx.DB var instance * sqlx.DB
var instanceCms * sqlx.DB var instanceCms * sqlx.DB
var instanceLiexin * sqlx.DB var instanceLiexin * sqlx.DB
var instanceLiexinBom * sqlx.DB
var once sync.Once var once sync.Once
var onceCms sync.Once var onceCms sync.Once
var onceLiexin sync.Once var onceLiexin sync.Once
var onceLiexinBom sync.Once
func GetDb()(*sqlx.DB) { func GetDb()(*sqlx.DB) {
...@@ -54,3 +56,15 @@ func GetLiexinDb()(*sqlx.DB) { ...@@ -54,3 +56,15 @@ func GetLiexinDb()(*sqlx.DB) {
return instanceLiexin return instanceLiexin
} }
func GetLiexinBomDb()(*sqlx.DB) {
onceLiexinBom.Do(func() {
dbConfig := configs.GetDBLiexinBom()
db, err := sqlx.Open(dbConfig.Engine, fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8", dbConfig.User, dbConfig.Pass, dbConfig.Ip, dbConfig.Port, dbConfig.Table))
if err != nil {
log.Fatalln(err)
}
instanceLiexinBom = db
})
return instanceLiexinBom
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment