Commit 31097b3a by Joneq

增加释放字段

parent 489ccb9f
......@@ -20,5 +20,6 @@ func SetFree() {
fmt.Println("添加记录失败",k)
}
dao.GetDb().Exec("delete from lie_assign_active where user_id = ? and turn_in_id = ?",k,v)
dao.GetDb().Exec("update lie_user set is_free = 1 where user_id",k)
}
}
\ No newline at end of file
package main
import (
"crm-server/cmd/queue/user_add/user_add_queue_logic"
"crm-server/configs"
"crm-server/internal/dao"
"crm-server/internal/service"
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"log"
"strconv"
"time"
)
type WaitUpdateUserSale struct {
User_Id int `json:"user_id"`
Sale_Id int `json:"sale_id"`
}
func main() {
//定义队列类型和错误类型
service.ProGramErrType = "crm_update_user_sales"
conn, err := amqp.Dial("amqp://"+configs.RABBITMQDSN+"/")
if err != nil {
service.WriteErrDetail(err.Error() + "Failed to connect to RabbitMQ")
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
service.WriteErrDetail(err.Error() + "Failed to open a channel")
}
defer ch.Close()
q, err := ch.QueueDeclare(
service.ProGramErrType, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
service.WriteErrDetail(err.Error()+"Failed to declare a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
service.WriteErrDetail(err.Error() +"Failed to register a consumer")
}
forever := make(chan bool)
var updateUserSale WaitUpdateUserSale
go func() {
for d := range msgs {
fmt.Println(string(d.Body))
json.Unmarshal(d.Body,&updateUserSale)
handle(updateUserSale.User_Id,updateUserSale.Sale_Id)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
//修改用户的客服
func handle(userId,saleId int)(result bool) {
//查找出crm中的用户
var crmUserId int
_ = dao.GetDb().QueryRowx("select user_id from lie_user where outter_uid = ?",userId).Scan(&crmUserId)
if crmUserId == 0 {
//不存在推一个队列,然后停止2秒,再检查
user_add_queue_logic.PushUserQueue(userId)
time.Sleep(2*time.Second)
_ = dao.GetDb().QueryRowx("select user_id from lie_user where outter_uid = ?",userId).Scan(&crmUserId)
if crmUserId == 0{
service.WriteErrDetail(strconv.Itoa(crmUserId)+"不存在")
return false
}
}
var timeNow = time.Now().Unix()
_,err := dao.GetDb().Exec("update lie_salesman set assign_time = ?,update_time = ?,sale_id= ? where user_id = ?",timeNow,timeNow,saleId,crmUserId)
fmt.Println(err)
_,err = dao.GetDb().Exec("insert into lie_action_log(`user_id`,`operator_id`,`create_time`,`remark`,`admin`,`event`)value(?,?,?,'转为已下单用户','admin','用户下单')",crmUserId,saleId,timeNow)
fmt.Println(err)
return true
}
package main
import (
"crm-server/configs"
"crm-server/internal/common"
"crm-server/internal/logic"
"crm-server/internal/model"
"crm-server/internal/service"
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"log"
"strconv"
)
var(
MemberId int
InsertData model.MemberAddUserData
)
type WaitAddUser struct {
User_Id int `json:"user_id"`
}
//监听用户添加队列
func main(){
//定义错误类型
service.ProGramErrType = "member_user_add:"
//定义队列类型和错误类型
service.ProGramErrType = "member_user_add"
//设置所有城市
common.SetCityName()
MemberId = 1
conn, err := amqp.Dial("amqp://"+configs.RABBITMQDSN+"/")
if err != nil {
service.WriteErrDetail(err.Error() + "Failed to connect to RabbitMQ")
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
service.WriteErrDetail(err.Error() + "Failed to open a channel")
}
defer ch.Close()
q, err := ch.QueueDeclare(
service.ProGramErrType, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
service.WriteErrDetail(err.Error()+"Failed to declare a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
service.WriteErrDetail(err.Error() +"Failed to register a consumer")
}
forever := make(chan bool)
var user WaitAddUser
go func() {
for d := range msgs {
json.Unmarshal(d.Body,&user)
fmt.Println(user)
handle(user.User_Id)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
func handle(memberId int)(result bool) {
//获取当前会员的基本数据
InsertData = logic.GetMemberUserInfo(MemberId)
InsertData = logic.GetMemberUserInfo(memberId)
//检测用户是否存在
if InsertData.User_Id == 0{
service.WriteErrDetail(strconv.Itoa(MemberId)+"会员系统中用户信息不存在")
if InsertData.User_Id == 0 {
service.WriteErrDetail(strconv.Itoa(memberId) + "会员系统中用户信息不存在")
return false
}
if logic.CheckMemberIdIsHave(InsertData.User_Id) != 0 {
service.WriteErrDetail(strconv.Itoa(MemberId)+"crm系统中用户已经存在")
service.WriteErrDetail(strconv.Itoa(memberId) + "crm系统中用户已经存在")
return false
}
//处理得到的用户数据
InsertData = logic.HandleData(InsertData)
//插入数据
if !logic.InsertMemberUser(InsertData) {
errSourcedata,_ := json.Marshal(InsertData)
service.WriteErrDetail("插入用户数据错误:"+string(errSourcedata))
errSourcedata, _ := json.Marshal(InsertData)
service.WriteErrDetail("插入用户数据错误:" + string(errSourcedata))
return false
}
return true
}
\ No newline at end of file
package user_add_queue_logic
import (
"crm-server/configs"
"crm-server/internal/service"
"encoding/json"
"github.com/streadway/amqp"
)
type WaitAddUser struct {
User_Id int `json:"user_id"`
}
//推送一个用户数据
func PushUserQueue(memberId int) {
var user WaitAddUser
user.User_Id = memberId
//链接mq
conn, err := amqp.Dial("amqp://"+configs.RABBITMQDSN+"/")
defer conn.Close()
service.WriteErr(err)
//通道
ch, err := conn.Channel()
defer ch.Close()
service.WriteErr(err)
//设置数据
q, err := ch.QueueDeclare(
"member_user_add", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
service.WriteErr(err)
sendBody,err := json.Marshal(user)
ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: sendBody,
})
service.WriteErr(err)
}
\ No newline at end of file
......@@ -9,6 +9,7 @@ require (
github.com/jmoiron/sqlx v1.2.0
github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect
github.com/pkg/errors v0.8.1
github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71
golang.org/x/net v0.0.0-20191011234655-491137f69257
google.golang.org/grpc v1.24.0
)
......@@ -160,6 +160,8 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71 h1:2MR0pKUzlP3SGgj5NYJe/zRYDwOu9ku6YHy+Iw7l5DM=
github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment