Commit 3a1909fd by wang

fix

parent ce76b3ae
...@@ -6,3 +6,4 @@ ...@@ -6,3 +6,4 @@
*.exe~ *.exe~
gowatch.yml gowatch.yml
/config/prod /config/prod
/cmd/main2.go
...@@ -17,7 +17,7 @@ func init() { ...@@ -17,7 +17,7 @@ func init() {
// "", // "",
// "", // "",
// "", // "",
// "amqp://guest:guest@192.168.2.232:5672/", // "amqp://guest:guest@192.168.1.252:5672/",
//} //}
// //
//str := `{"supplierNumber":"C0000102","PTID":"L0000002","supplierID":"LxYAAAG0CQA3xn38","supplierName":"TestName"}` //str := `{"supplierNumber":"C0000102","PTID":"L0000002","supplierID":"LxYAAAG0CQA3xn38","supplierName":"TestName"}`
...@@ -26,7 +26,6 @@ func init() { ...@@ -26,7 +26,6 @@ func init() {
} }
func (t *RecvPro) Consumer(dataByte []byte) (err error) { func (t *RecvPro) Consumer(dataByte []byte) (err error) {
fmt.Println("来了123")
log.Println(string(dataByte)) log.Println(string(dataByte))
service.NewUploadObj(string(dataByte)).Sync() service.NewUploadObj(string(dataByte)).Sync()
return nil return nil
...@@ -41,6 +40,7 @@ func (t *RecvPro) FailAction(dataByte []byte) error { ...@@ -41,6 +40,7 @@ func (t *RecvPro) FailAction(dataByte []byte) error {
//sku_upload_sync //sku_upload_sync
func Run() { func Run() {
t := &RecvPro{} t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{ rabbitmq.Recv(rabbitmq.QueueExchange{
"data_manager_data_callback", "data_manager_data_callback",
"", "",
......
...@@ -3,6 +3,7 @@ package service ...@@ -3,6 +3,7 @@ package service
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/gogf/gf/util/gconv"
"github.com/ichunt2019/cfg/lib" "github.com/ichunt2019/cfg/lib"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq" "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
...@@ -91,7 +92,6 @@ func (this *UploadSkuSync)Sync() error{ ...@@ -91,7 +92,6 @@ func (this *UploadSkuSync)Sync() error{
//开始处理 //开始处理
func (this *UploadSkuSync) handle() { func (this *UploadSkuSync) handle() {
fmt.Printf("加1")
dao.IncrSyncQueue(this.SkuRes.UpSn)//redis 完成同步+1 dao.IncrSyncQueue(this.SkuRes.UpSn)//redis 完成同步+1
if(this.SkuRes.UpSn=="" || this.SkuRes.UploadLogId==0){ if(this.SkuRes.UpSn=="" || this.SkuRes.UploadLogId==0){
msg:=fmt.Sprintf("回调的消息UploadLogId 和 UpSn 为空或者为0%s",this.SkuReStr) msg:=fmt.Sprintf("回调的消息UploadLogId 和 UpSn 为空或者为0%s",this.SkuReStr)
...@@ -112,9 +112,13 @@ func (this *UploadSkuSync) handle() { ...@@ -112,9 +112,13 @@ func (this *UploadSkuSync) handle() {
updateItemStruct.SkuStatus=this.SkuRes.AddSku updateItemStruct.SkuStatus=this.SkuRes.AddSku
updateItemStruct.ErrorMsg=error_msg updateItemStruct.ErrorMsg=error_msg
updateItemStruct.SyncStatus=sync_status updateItemStruct.SyncStatus=sync_status
uploadInfo,err:=dao.GetUploadLogInfo(this.SkuRes.UpSn)
//如果sku是新增,就压入队列 es_sku_add
this.PushInsertSkuMq(uploadInfo,updateItemStruct)
//修改UpdateLogItem 表 //修改UpdateLogItem 表
err:=dao.UpdateLogItem(this.SkuRes.UpSn,this.SkuRes.UploadLogId,updateItemStruct) err=dao.UpdateLogItem(this.SkuRes.UpSn,this.SkuRes.UploadLogId,updateItemStruct)
if(err!=nil){ if(err!=nil){
msg:=fmt.Sprintf("修改详情表出错:"+err.Error()) msg:=fmt.Sprintf("修改详情表出错:"+err.Error())
fmt.Println(msg) fmt.Println(msg)
...@@ -136,18 +140,63 @@ func (this *UploadSkuSync) handle() { ...@@ -136,18 +140,63 @@ func (this *UploadSkuSync) handle() {
this.dingdingSucessPush(msg); this.dingdingSucessPush(msg);
} }
//发送队列到 supplier_sku_callback (处理脚本队列) //发送队列到 supplier_sku_callback (处理脚本队列)
uploadInfo,err:=dao.GetUploadLogInfo(this.SkuRes.UpSn)
if(err!=nil){ if(err!=nil){
msg:=fmt.Sprintf("查询主日志表错误:err:%s,up_sn:%s",err.Error(),this.SkuRes.UpSn) msg:=fmt.Sprintf("查询主日志表错误:err:%s,up_sn:%s",err.Error(),this.SkuRes.UpSn)
fmt.Println(msg) fmt.Println(msg)
this.dingdingSucessPush(msg); this.dingdingSucessPush(msg);
return return
} }
this.PushStatus3HandleList(uploadInfo) this.PushStatus3HandleList(uploadInfo)//处理完推送下架队列
//uploadInfo.ActionTime this.PushWechatMsg(uploadInfo)//处理完 推送到微信通知队列
}
}
//推送到微信通知
func (this *UploadSkuSync)PushWechatMsg(uploadInfo *dao.UpdatesUploadStruct) {
data:=make(map[string]interface{})
data["queue_route_key"]="/notice/skuenable"
data["supplier_code"]=uploadInfo.SupplierCode
bytes,err:=json.Marshal(data)
if(err!=nil){
msg:=fmt.Sprintf("推送微信通知队列数组组装失败")
fmt.Println(msg)
this.dingdingSucessPush(msg);
return
} }
listMsg:=string(bytes)
fmt.Printf("单号:%s ;发送 cloud_common_queue 队列\n,data:%s",uploadInfo.UpSn,listMsg)
listName:="cloud_common_queue"
this.rabbitmqPush(listName,listMsg)
} }
//如果sku是新增,就压入队列 es_sku_add
func (this *UploadSkuSync) PushInsertSkuMq(uploadInfo *dao.UpdatesUploadStruct,updateItemStruct dao.UpdateItemStruct) {
fmt.Printf("单号:%s,skuid:%d;的状态为%d\n",uploadInfo.UpSn,updateItemStruct.SkuId,updateItemStruct.SkuStatus)
if(updateItemStruct.SkuStatus==1){
data:=make(map[string]interface{})
data["goods_id"]=gconv.String(updateItemStruct.SkuId)
data["spu_id"]=gconv.String(updateItemStruct.SpuId)
data["cannal"]=uploadInfo.SupplierCode
bytes,err:=json.Marshal(data)
if(err!=nil){
msg:=fmt.Sprintf("推送新增sku队列,解析为json报错")
fmt.Println(msg)
this.dingdingSucessPush(msg);
return
}
listMsg:=string(bytes)
fmt.Printf("单号:%s,skuid:%d ;发送 es_sku_add 队列\n,data:%s",uploadInfo.UpSn,updateItemStruct.SkuId,listMsg)
listName:="es_sku_add"
this.rabbitmqPush(listName,listMsg)
}
}
//after2 := time.After(2*time.Second)
//push到下架商品的队列 //push到下架商品的队列
func (this *UploadSkuSync) PushStatus3HandleList(uploadInfo *dao.UpdatesUploadStruct) { func (this *UploadSkuSync) PushStatus3HandleList(uploadInfo *dao.UpdatesUploadStruct) {
...@@ -165,6 +214,15 @@ func (this *UploadSkuSync) PushStatus3HandleList(uploadInfo *dao.UpdatesUploadSt ...@@ -165,6 +214,15 @@ func (this *UploadSkuSync) PushStatus3HandleList(uploadInfo *dao.UpdatesUploadSt
listMsg:=string(bytes) listMsg:=string(bytes)
fmt.Printf("单号:%s, 已完成;发送 supplier_sku_callback 队列\n,data:%s",uploadInfo.UpSn,listMsg) fmt.Printf("单号:%s, 已完成;发送 supplier_sku_callback 队列\n,data:%s",uploadInfo.UpSn,listMsg)
listName:="supplier_sku_callback" listName:="supplier_sku_callback"
this.rabbitmqPush(listName,listMsg)
}
func (this *UploadSkuSync) rabbitmqPush(listName string,listMsg string) {
//configString:=lib.Instance("config").GetString("rabbit_mq_default.url")
//fmt.Println(configString)
queueExchange := rabbitmq.QueueExchange{ queueExchange := rabbitmq.QueueExchange{
listName, listName,
listName, listName,
...@@ -173,9 +231,9 @@ func (this *UploadSkuSync) PushStatus3HandleList(uploadInfo *dao.UpdatesUploadSt ...@@ -173,9 +231,9 @@ func (this *UploadSkuSync) PushStatus3HandleList(uploadInfo *dao.UpdatesUploadSt
lib.Instance("config").GetString("rabbit_mq_default.url"), lib.Instance("config").GetString("rabbit_mq_default.url"),
} }
rabbitmq.Send(queueExchange, listMsg) rabbitmq.Send(queueExchange, listMsg)
} }
/** /**
应该发送队列 与 已完成队列对比 应该发送队列 与 已完成队列对比
*/ */
......
...@@ -19,9 +19,12 @@ func main() { ...@@ -19,9 +19,12 @@ func main() {
flag.StringVar(&logPath, "logdir", "./logs/", "日志文件存储目录") flag.StringVar(&logPath, "logdir", "./logs/", "日志文件存储目录")
flag.Parse() flag.Parse()
boot.Init(configPath, logPath) boot.Init(configPath, logPath)
//testrabbitmqPush("es_sku_add111","hello")
//service.NewUploadObj("123").Sync() //service.NewUploadObj("123").Sync()
queue.Run() queue.Run()
} }
...@@ -28,7 +28,7 @@ data = [["gamma", "delta"],[1, 2]] ...@@ -28,7 +28,7 @@ data = [["gamma", "delta"],[1, 2]]
[database] [database]
[database.default] [database.default]
host = "192.168.2.232" host = "192.168.1.252"
[supplier_no_brand] [supplier_no_brand]
3 = [615,757,46596,43172,52,46481,47811,48817] 3 = [615,757,46596,43172,52,46481,47811,48817]
...@@ -36,7 +36,7 @@ data = [["gamma", "delta"],[1, 2]] ...@@ -36,7 +36,7 @@ data = [["gamma", "delta"],[1, 2]]
9 = [47778,4589,12369] 9 = [47778,4589,12369]
[rabbit_mq_default] [rabbit_mq_default]
url = "amqp://huntadmin:jy2y2900@192.168.1.237:5672/" url = "amqp://guest:guest@192.168.1.252:5672/"
[DINGDING] [DINGDING]
SEARCH_API_MONITOR = "92917a6e090a8a39832c4843a579d6c6f9dfecc46fa275f8753ddee2b4399045" SEARCH_API_MONITOR = "92917a6e090a8a39832c4843a579d6c6f9dfecc46fa275f8753ddee2b4399045"
\ No newline at end of file
...@@ -2,14 +2,14 @@ ...@@ -2,14 +2,14 @@
ShowSQL = false ShowSQL = false
[micro] [micro]
data_source_name = "micro_service:lie_micro_service#zsyM@tcp(192.168.2.232:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing" data_source_name = "micro_service:lie_micro_service#zsyM@tcp(192.168.1.252:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 20 max_open_conn = 20
max_idle_conn = 10 max_idle_conn = 10
table_prefix = "" table_prefix = ""
max_conn_life_time = 100 max_conn_life_time = 100
[supplier] [supplier]
data_source_name = "liexin_ass:liexin_ass#zsyM@tcp(192.168.2.232:3306)/liexin_ass?charset=utf8&parseTime=true&loc=Asia%2FChongqing" data_source_name = "liexin_ass:liexin_ass#zsyM@tcp(192.168.1.252:3306)/liexin_ass?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 20 max_open_conn = 20
max_idle_conn = 10 max_idle_conn = 10
table_prefix = "lie_" table_prefix = "lie_"
...@@ -17,14 +17,14 @@ ShowSQL = false ...@@ -17,14 +17,14 @@ ShowSQL = false
[sku] [sku]
[sku.sku_0] [sku.sku_0]
dns = "micro_service:lie_micro_service#zsyM@tcp(192.168.2.232:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing" dns = "micro_service:lie_micro_service#zsyM@tcp(192.168.1.252:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 20 max_open_conn = 20
max_idle_conn = 10 max_idle_conn = 10
table_prefix = "" table_prefix = ""
max_conn_life_time = 100 max_conn_life_time = 100
[sku.sku_1] [sku.sku_1]
dns = "micro_service:lie_micro_service#zsyM@tcp(192.168.2.232:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing" dns = "micro_service:lie_micro_service#zsyM@tcp(192.168.1.252:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 20 max_open_conn = 20
max_idle_conn = 10 max_idle_conn = 10
table_prefix = "" table_prefix = ""
......
...@@ -4,7 +4,7 @@ micro_service_name = "go_upload_sync" ...@@ -4,7 +4,7 @@ micro_service_name = "go_upload_sync"
#etcd配置信息 etcd服务的ip端口用户密码 #etcd配置信息 etcd服务的ip端口用户密码
[etcd_config] [etcd_config]
addrs = [ addrs = [
"192.168.2.232:2379" "192.168.1.252:2379"
] ]
username = "" username = ""
password = "" password = ""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment