Commit 48965560 by 孙龙

up

parent 6f81e9ac
package crm package crm
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
...@@ -12,46 +13,30 @@ import ( ...@@ -12,46 +13,30 @@ import (
cfg "github.com/ichunt2019/cfg/lib" cfg "github.com/ichunt2019/cfg/lib"
) )
type comUser struct { type comUser struct {
ComId int64 `json:com_id` ComId int64 `json:"com_id"`
UserId int64 `json:user_id` UserId int64 `json:"user_id"`
SaleId int64 `json:sale_id` SaleId int64 `json:"sale_id"`
} }
func PushComUserInfoToErp(data string) (err error){ func PushComUserInfoToErp(data string) (err error){
pushData := comUser{} pushData := &comUser{}
table := gjson.Parse(data).Get("table").String()
action_type := gjson.Parse(data).Get("type").String() err = json.Unmarshal([]byte(data),pushData)
//fmt.Println(action_type) if err != nil{
//action_type = "delete" return
if action_type != "update" && action_type != "insert"{
return nil
}
if table == "lie_invoice_company"{
pushData.ComId = gjson.Parse(data).Get("data.id").Int()
}else if(table == "lie_invoice_com_user"){
pushData.ComId = gjson.Parse(data).Get("data.com_id").Int()
pushData.UserId = gjson.Parse(data).Get("data.user_id").Int()
pushData.SaleId = gjson.Parse(data).Get("data.sale_id").Int()
fmt.Println("888")
if pushData.ComId == 0{
return nil
}
}else{
return nil
} }
//fmt.Println("55555555555")
urlParams := url.Values{} urlParams := url.Values{}
urlParams.Add("com_id",strconv.FormatInt(pushData.ComId,10)) urlParams.Add("com_id",strconv.FormatInt(pushData.ComId,10))
urlParams.Add("user_id",strconv.FormatInt(pushData.UserId,10)) urlParams.Add("user_id",strconv.FormatInt(pushData.UserId,10))
urlParams.Add("sale_id",strconv.FormatInt(pushData.SaleId,10)) urlParams.Add("sale_id",strconv.FormatInt(pushData.SaleId,10))
//fmt.Println(urlParams) fmt.Println(urlParams)
header := http.Header{} header := http.Header{}
header.Set("api-key",cfg.Instance("config").GetString("apiKey")) header.Set("api-key",cfg.Instance("config").GetString("apiKey"))
header.Set("Content-Type","application/json") header.Set("Content-Type","application/json")
push_url := cfg.Instance("config").GetString("pushErpDomain") push_url := cfg.Instance("config").GetString("pushErpDomain")
response,returnData,err := lib.HttpPOST(push_url,urlParams,0,header,"") response,returnData,err := lib.HttpPOST(push_url,urlParams,0,header,"")
//fmt.Println(response,string(returnData),err) fmt.Println(response,string(returnData),err)
if err == nil && response.StatusCode == 200{ if err == nil && response.StatusCode == 200{
err_code := gjson.Parse(string(returnData)).Get("errcode").Int() err_code := gjson.Parse(string(returnData)).Get("errcode").Int()
msg := gjson.Parse(string(returnData)).Get("errmsg").String() msg := gjson.Parse(string(returnData)).Get("errmsg").String()
......
...@@ -9,7 +9,7 @@ func Init(configPath string,logPath string)(err error){ ...@@ -9,7 +9,7 @@ func Init(configPath string,logPath string)(err error){
if err != nil{ if err != nil{
panic(err) panic(err)
} }
xlog.Init(logPath,"crmuser_to_erp","recv_comuser") xlog.Init(logPath,"crmuser_to_erp")
//初始化数据库 //初始化数据库
//dao.Init() //dao.Init()
......
package boot
import (
"github.com/ichunt2019/cfg/lib"
xlog "github.com/ichunt2019/lxLog/log"
)
func Init(configPath string,logPath string)(err error){
err = lib.Init(configPath)
if err != nil{
panic(err)
}
xlog.Init(logPath,"recv_comuser")
//初始化数据库
//dao.Init()
return
}
...@@ -2,7 +2,7 @@ package main ...@@ -2,7 +2,7 @@ package main
import ( import (
"flag" "flag"
"golang-asynctask/boot/crm_comuser_boot" "golang-asynctask/boot/recv_comuser"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
...@@ -24,7 +24,9 @@ func main(){ ...@@ -24,7 +24,9 @@ func main(){
quit := make(chan os.Signal) quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit <-quit
} }
......
...@@ -15,7 +15,7 @@ type RecvPro struct { ...@@ -15,7 +15,7 @@ type RecvPro struct {
//// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db //// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db
func (t *RecvPro) Consumer(dataByte []byte) error { func (t *RecvPro) Consumer(dataByte []byte) error {
//fmt.Println(string(dataByte)) //fmt.Println(string(dataByte))
logger.Instance("crmuser_to_erp").Info(string(dataByte)) logger.Instance("recv_comuser").Info(string(dataByte))
//error := crm.PushComUserInfoToErp(string(dataByte)) //error := crm.PushComUserInfoToErp(string(dataByte))
error := service_crm.DispenseMsg(string(dataByte)) error := service_crm.DispenseMsg(string(dataByte))
return error return error
...@@ -23,10 +23,10 @@ func (t *RecvPro) Consumer(dataByte []byte) error { ...@@ -23,10 +23,10 @@ func (t *RecvPro) Consumer(dataByte []byte) error {
//消息已经消费3次 失败了 请进行处理 //消息已经消费3次 失败了 请进行处理
func (t *RecvPro) FailAction(err error,dataByte []byte) error { func (t *RecvPro) FailAction(err error,dataByte []byte) error {
logger.Instance("crmuser_to_erp").Error("任务处理失败了,我要进入db日志库了") logger.Instance("recv_comuser").Error("任务处理失败了,我要进入db日志库了")
logger.Instance("crmuser_to_erp").Error("任务处理失败了,发送钉钉消息通知主人") logger.Instance("recv_comuser").Error("任务处理失败了,发送钉钉消息通知主人")
logger.Instance("crmuser_to_erp").Error(string(dataByte)) logger.Instance("recv_comuser").Error(string(dataByte))
logger.Instance("crmuser_to_erp").Error("错误原因:%s",err) logger.Instance("recv_comuser").Error("错误原因:%s",err)
return nil return nil
} }
......
...@@ -4,7 +4,7 @@ import ( ...@@ -4,7 +4,7 @@ import (
"flag" "flag"
"golang-asynctask/cmd/crm/sync_comuser_to_erp/service" "golang-asynctask/cmd/crm/sync_comuser_to_erp/service"
"golang-asynctask/boot" "golang-asynctask/boot/crm_comuser_boot"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
...@@ -24,13 +24,6 @@ func main(){ ...@@ -24,13 +24,6 @@ func main(){
service.Listen() service.Listen()
//rabbitmq.Recv(rabbitmq.QueueExchange{
// "a-maxwell",
// "maxwell.lie_invoice_company",
// "",
// "direct",
// "amqp://guest:guest@192.168.2.232:5672/",
//},t,1)
quit := make(chan os.Signal) quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
......
...@@ -3,9 +3,8 @@ package service ...@@ -3,9 +3,8 @@ package service
import ( import (
"github.com/ichunt2019/cfg/lib" "github.com/ichunt2019/cfg/lib"
logger "github.com/ichunt2019/lxLog/log" logger "github.com/ichunt2019/lxLog/log"
//"golang-asynctask/app/dao/crm" "golang-asynctask/app/dao/crm"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq" "github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
service_crm "golang-asynctask/app/service/crm"
) )
type RecvPro struct { type RecvPro struct {
...@@ -16,8 +15,7 @@ type RecvPro struct { ...@@ -16,8 +15,7 @@ type RecvPro struct {
func (t *RecvPro) Consumer(dataByte []byte) error { func (t *RecvPro) Consumer(dataByte []byte) error {
//fmt.Println(string(dataByte)) //fmt.Println(string(dataByte))
logger.Instance("crmuser_to_erp").Info(string(dataByte)) logger.Instance("crmuser_to_erp").Info(string(dataByte))
//error := crm.PushComUserInfoToErp(string(dataByte)) error := crm.PushComUserInfoToErp(string(dataByte))
error := service_crm.DispenseMsg(string(dataByte))
return error return error
} }
...@@ -35,10 +33,10 @@ func Listen(){ ...@@ -35,10 +33,10 @@ func Listen(){
t := &RecvPro{} t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{ rabbitmq.Recv(rabbitmq.QueueExchange{
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.queue_name"), lib.Instance("config").GetString("rabbitmq_crm_comuser.queue_name"),
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.routing_key"), lib.Instance("config").GetString("rabbitmq_crm_comuser.routing_key"),
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.exchange"), lib.Instance("config").GetString("rabbitmq_crm_comuser.exchange"),
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.type"), lib.Instance("config").GetString("rabbitmq_crm_comuser.type"),
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.dns"), lib.Instance("config").GetString("rabbitmq_crm_comuser.dns"),
},t,1) },t,1)
} }
...@@ -14,3 +14,9 @@ dns="amqp://WmsQue:EizM9I1TAxoO0tmzoOm@172.18.137.33:5672/" ...@@ -14,3 +14,9 @@ dns="amqp://WmsQue:EizM9I1TAxoO0tmzoOm@172.18.137.33:5672/"
[rabbitmq_crm_comuser]
queue_name="crm_comusertoerp"
routing_key="crm_comusertoerp"
exchange=""
type="direct"
dns="amqp://WmsQue:EizM9I1TAxoO0tmzoOm@172.18.137.33:5672/"
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment