Commit cdc7b402 by wang

库存上传回调队列消费脚本

parent 6b1436d0
......@@ -33,6 +33,12 @@ func BuildDatabaseList() (DatabaseList map[string]BaseDatabase) {
MaxIdleCons: lib.Instance("db").GetInt("supplier.max_idle_conn"),
MaxOpenCons: lib.Instance("db").GetInt("supplier.max_open_conn"),
},
"spu": {
DataSourceName: lib.Instance("db").GetString("spu.data_source_name"),
Prefix: lib.Instance("db").GetString("spu.table_prefix"),
MaxIdleCons: lib.Instance("db").GetInt("spu.max_idle_conn"),
MaxOpenCons: lib.Instance("db").GetInt("spu.max_open_conn"),
},
}
}
......
......@@ -5,8 +5,8 @@ import (
"github.com/gin-gonic/gin"
"github.com/ichunt2019/cfg/lib"
xlog "github.com/ichunt2019/lxLog/log"
common "go_supplier_sever/app/common/function"
"go_supplier_sever/app/service"
common "go_upload_sync/app/common/function"
"go_upload_sync/app/service"
)
func Ping(ctx *gin.Context) {
......
......@@ -4,7 +4,7 @@ import (
"github.com/go-redis/redis/v7"
_ "github.com/go-sql-driver/mysql"
"github.com/ichunt2019/cfg/lib"
"go_supplier_sever/app/common/config"
"go_upload_sync/app/common/config"
"time"
"xorm.io/xorm"
......
package dao
import (
"fmt"
"github.com/go-redis/redis/v7"
"time"
)
/**
写的比较急,各个方法的参数有待优化
*/
func GetLogItem() (res []map[string]string) {
//_, err = Dao.GetDb("supplier").Table("lie_sku_upload_item_0").Get()
res,err:=Dao.GetDb("spu").QueryString("select * from lie_sku_upload_item_0 where log_id=?",17)
if(err!=nil){
fmt.Println(err.Error())
}
return res;
}
type UpdateItemStruct struct {
ErrorMsg string `json:"error_msg"`
SyncStatus int64 `json:"sync_status"`
SpuId int64 `json:"spu_id"`
SkuId int64 `json:"sku_id"`
SkuStatus int64 `json:"sku_status"`
SpuStatus int64 `json:"spu_status"`
UpdateTime int64 `json:"update_time" xorm:"updated"`
}
//修改item
func UpdateLogItem(upSn string,logId int64,updateItemStruct UpdateItemStruct) error{
tableName:=ResolveUpsn(upSn)
_,err:=Dao.GetDb("spu").Table(tableName).Where("id =?",logId).Update(&updateItemStruct)
if(err!=nil){
return err
}
return nil
}
//修改主表
type UpdatesUploadStruct struct {
UpSn string `json:"up_sn"`
SupplierCode string `json:"supplier_code"`
Status int64 `json:"status"`
UpdateTime int64 `json:"update_time" xorm:"updated"`
ActionTime int64 `json:"action_time" `
EndTime int64 `json:"end_time" `
}
//更新主日志单为完成
func UpdatesUploadLogSucess(upSn string,status int64) (error) {
updatesUploadStruct:=UpdatesUploadStruct{
Status: status,
EndTime:time.Now().Unix(),
}
_,err:=Dao.GetDb("spu").Table("lie_sku_upload_log").Where("up_sn =?",upSn).Update(&updatesUploadStruct)
if(err!=nil){
return err
}
return nil
}
func GetUploadLogInfo(upSn string) (*UpdatesUploadStruct,error) {
updatesUploadStruct:=&UpdatesUploadStruct{}
_,err:=Dao.GetDb("spu").Table("lie_sku_upload_log").Where("up_sn=?",upSn).Get(updatesUploadStruct)
if(err!=nil){
return updatesUploadStruct,err
}
return updatesUploadStruct ,nil
}
func ResolveUpsn(upSn string) (table string) {
var tableCode string; //表0-9
tableCode=upSn[len(upSn)-1:len(upSn)]
tableCode="lie_sku_upload_item_"+tableCode
return tableCode
}
//获取已经回调的队列数量 (sync_queue_upload)
func GetSyncQueueCount(upSn string) (int,error) {
value,err:=Dao.GetRedisDbGroup("api").HGet("sync_queue_upload",upSn).Int()
if(err!=nil &&err!=redis.Nil){
return value,err
}
return value,nil
}
//同步加1
func IncrSyncQueue(upSn string) (error){
count,err:=GetSyncQueueCount(upSn)
if(err!=nil){
return err
}
count=count+1
err=Dao.GetRedisDbGroup("api").HSet("sync_queue_upload",upSn,count).Err()
if(err!=nil){
return err
}
return nil
}
//获取应该发送的队列数量 (should_send_upload)
func GetShouldSendQueueCount(upSn string) (int,error) {
value,err:= Dao.GetRedisDbGroup("api").HGet("should_send_upload",upSn).Int()
if(err!=nil &&err!=redis.Nil){
return value,err
}
return value,nil
}
package dao
import (
"go_supplier_sever/app/model"
"go_upload_sync/app/model"
"time"
)
......
......@@ -18,4 +18,6 @@ func GetUser(){
fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef").String())
}
......@@ -4,8 +4,8 @@ import (
"fmt"
"github.com/ichunt2019/cfg/lib"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
"github.com/tidwall/gjson"
"go_supplier_sever/app/service"
"go_upload_sync/app/service"
"log"
)
type RecvPro struct {
......@@ -26,15 +26,9 @@ func init() {
}
func (t *RecvPro) Consumer(dataByte []byte) (err error) {
message := string(dataByte)
fmt.Println(message)
supplierCode := gjson.Get(message, "PTID").String()
erpSupplierCode := gjson.Get(message, "supplierNumber").String()
erpSupplierName := gjson.Get(message, "supplierName").String()
err = service.AddSupplierSync(supplierCode, erpSupplierCode, erpSupplierName)
if err != nil {
fmt.Println(err)
}
fmt.Println("来了123")
log.Println(string(dataByte))
service.NewUploadObj(string(dataByte)).Sync()
return nil
}
......@@ -43,10 +37,12 @@ func (t *RecvPro) FailAction(dataByte []byte) error {
return nil
}
//data_manage
//sku_upload_sync
func Run() {
t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{
"supplier_erp",
"data_manager_data_callback",
"",
"",
"",
......
package service
import "go_supplier_sever/app/dao"
import "go_upload_sync/app/dao"
func GetData(){
dao.GetUser()
......
package service
import "go_supplier_sever/app/dao"
import "go_upload_sync/app/dao"
func AddSupplierSync(supplierCode, erpSupplierCode, erpSupplierName string) (err error) {
return dao.AddSupplierSync(supplierCode, erpSupplierCode, erpSupplierName)
......
package service
import (
"encoding/json"
"fmt"
"github.com/ichunt2019/cfg/lib"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
"github.com/tidwall/gjson"
"go_upload_sync/app/dao"
ls "go_upload_sync/util/lib"
)
/**
写的比较急,很多没封装
*/
type UploadSkuSync struct {
SkuReStr string
SkuRes *savaSkuRes
}
type savaSkuRes struct {
Errcode int64 `json:"errcode"`
GoodsId int64 `json:"goods_id"`
AddSku int64 `json:"add_sku"`
AddSpu int64 `json:"add_spu"`
SpuId int64 `json:"spu_id"`
SkuId int64 `json:"sku_id"`
ErrMsg string `json:"errmsg"`
UploadLogId int64 `json:"upload_log_id"`
UpSn string `json:"up_sn"`
}
func parseRes(r gjson.Result) (resObJ *savaSkuRes){
resObj:=&savaSkuRes{}
resObj.Errcode=r.Get("errcode").Int()
resObj.GoodsId=r.Get("goods_id").Int()
resObj.AddSku=r.Get("add_sku").Int()
resObj.AddSpu=r.Get("add_spu").Int()
resObj.SpuId=r.Get("spu_id").Int()
resObj.UploadLogId=r.Get("upload_log_id").Int()
resObj.ErrMsg=r.Get("errmsg").String()
resObj.UpSn=r.Get("up_sn").String()
return resObj
}
func NewUploadObj(SkuReStr string) *UploadSkuSync {
return &UploadSkuSync{SkuReStr:SkuReStr}
}
/**
@res string 接收 poolsku返回的结果
@res 格式如下
{
"errcode": 0,
"errmsg": "ok",
"goods_id": 1161889823935244337,
"add_sku": 0,
"add_spu": 0,
"spu_id": 2161889823940624400,
"upload_log_id": 123
}
@return error
*/
func (this *UploadSkuSync)Sync() error{
/*Str:=`
{
"errcode": 10086,
"errmsg": "这是错误信息",
"goods_id": 1161916576790978473,
"add_sku": 0,
"add_spu": 0,
"spu_id": 2161916576646508808,
"upload_log_id": 235,
"up_sn": "u161916721743338200"
}
`
this.SkuReStr=Str*/
r:=gjson.Parse(this.SkuReStr)
if(r.Exists() && r.IsObject()){
this.SkuRes=parseRes(r)
this.handle()
}else{
this.dingdingSucessPush("格式出错");
return nil
}
return nil
}
//开始处理
func (this *UploadSkuSync) handle() {
dao.IncrSyncQueue(this.SkuRes.UpSn)//redis 完成同步+1
if(this.SkuRes.UpSn=="" || this.SkuRes.UploadLogId==0){
msg:=fmt.Sprintf("回调的消息UploadLogId 和 UpSn 为空或者为0%s",this.SkuReStr)
this.dingdingSucessPush(msg);
return
}
error_msg:=""
sync_status:=int64(2)
if(this.SkuRes.Errcode!=0){
error_msg=this.SkuRes.ErrMsg
this.dingdingSucessPush(error_msg);
}
updateItemStruct:=dao.UpdateItemStruct{}
updateItemStruct.SkuId=this.SkuRes.GoodsId
updateItemStruct.SpuId=this.SkuRes.SpuId
updateItemStruct.SpuStatus=this.SkuRes.AddSpu
updateItemStruct.SkuStatus=this.SkuRes.AddSku
updateItemStruct.ErrorMsg=error_msg
updateItemStruct.SyncStatus=sync_status
//修改UpdateLogItem 表
err:=dao.UpdateLogItem(this.SkuRes.UpSn,this.SkuRes.UploadLogId,updateItemStruct)
if(err!=nil){
msg:=fmt.Sprintf("修改详情表出错:"+err.Error())
fmt.Println(msg)
this.dingdingSucessPush(msg);
}
//是否已经处理完成判断
isSucess,err:=this.IshandleSucess()
if(err!=nil){
msg:=fmt.Sprintf("判断是否处理完成时报错,err:%s",err.Error())
fmt.Println(msg)
this.dingdingSucessPush(msg);
}
if(isSucess){//如果已经处理完了 1.将主表的状态设置成已完成 2.发送到发送到下架处理脚本队列(开始时间和结束时间之外的sku下架)
err:=dao.UpdatesUploadLogSucess(this.SkuRes.UpSn,3)
if(err!=nil){
msg:=fmt.Sprintf("修改主表日志log错误")
fmt.Println(msg)
this.dingdingSucessPush(msg);
}
//发送队列到 supplier_sku_callback (处理脚本队列)
uploadInfo,err:=dao.GetUploadLogInfo(this.SkuRes.UpSn)
if(err!=nil){
msg:=fmt.Sprintf("查询主日志表错误:err:%s,up_sn:%s",err.Error(),this.SkuRes.UpSn)
fmt.Println(msg)
this.dingdingSucessPush(msg);
return
}
this.PushStatus3HandleList(uploadInfo)
//uploadInfo.ActionTime
}
}
//push到下架商品的队列
func (this *UploadSkuSync) PushStatus3HandleList(uploadInfo *dao.UpdatesUploadStruct) {
data:=make(map[string]interface{})
data["supplier_code"]=uploadInfo.SupplierCode
data["action_time"]=uploadInfo.ActionTime
data["end_time"]=uploadInfo.EndTime
bytes,err:=json.Marshal(data)
if(err!=nil){
msg:=fmt.Sprintf("推送下架队列,解析主日志为json报错")
fmt.Println(msg)
this.dingdingSucessPush(msg);
return
}
listMsg:=string(bytes)
fmt.Printf("单号:%s, 已完成;发送 supplier_sku_callback 队列\n,data:%s",uploadInfo.UpSn,listMsg)
listName:="supplier_sku_callback"
queueExchange := rabbitmq.QueueExchange{
listName,
listName,
"",
"direct",
lib.Instance("config").GetString("rabbit_mq_default.url"),
}
rabbitmq.Send(queueExchange, listMsg)
}
/**
应该发送队列 与 已完成队列对比
*/
func (this *UploadSkuSync) IshandleSucess() (bool,error) {
/**
sync_status=2(同步成功)
如果是错误 error_msg 填上
处理完成判断
已回来队列数 跟应该发送队列数是否相等(应该发送队列数是 状态为已审核+待审核)
*/
countSync,err:=dao.GetSyncQueueCount(this.SkuRes.UpSn)
fmt.Printf("up_sn:%s:的已接收数量为:%d\n",this.SkuRes.UpSn,countSync)
if(err!=nil){
return false,err
}
countSend,err:=dao.GetShouldSendQueueCount(this.SkuRes.UpSn)
fmt.Printf("up_sn:%s:应该发送的数量为:%d\n",this.SkuRes.UpSn,countSend)
if(countSync==countSend){
return true,nil
}
return false,nil
}
//钉钉发送成功
func (this *UploadSkuSync) dingdingSucessPush(msg string) {
msgTtile:="[同步回调队列][错误]\n"
msgS:=fmt.Sprintf("错误信息:%s",msg)
msgT:=fmt.Sprintf("返回数据:%s",this.SkuReStr)
zmsg:=msgTtile+msgS+msgT
res,err:=ls.DingDingPush(zmsg)
if(err!=nil){
fmt.Printf("发送钉钉错误:"+string(res.Errcode)+"msg:"+res.Errmsg+"\n")
}
}
......@@ -3,7 +3,7 @@ package boot
import (
"github.com/ichunt2019/cfg/lib"
xlog "github.com/ichunt2019/lxLog/log"
"go_supplier_sever/app/dao"
"go_upload_sync/app/dao"
)
func Init(configPath string, logPath string) (err error) {
......
......@@ -2,8 +2,8 @@ package main
import (
"flag"
"go_supplier_sever/app/queue"
"go_supplier_sever/boot"
"go_upload_sync/app/queue"
"go_upload_sync/boot"
)
var (
......@@ -12,11 +12,14 @@ var (
)
func main() {
//监听 sku_upload_sync
flag.StringVar(&configPath, "config", "./config/dev/", "配置文件")
flag.StringVar(&logPath, "logdir", "./logs/", "日志文件存储目录")
flag.Parse()
boot.Init(configPath, logPath)
//service.NewUploadObj("123").Sync()
queue.Run()
}
......@@ -36,4 +36,7 @@ data = [["gamma", "delta"],[1, 2]]
9 = [47778,4589,12369]
[rabbit_mq_default]
url = "amqp://guest:guest@192.168.2.232:5672/"
\ No newline at end of file
url = "amqp://huntadmin:jy2y2900@192.168.1.237:5672/"
[DINGDING]
SEARCH_API_MONITOR = "92917a6e090a8a39832c4843a579d6c6f9dfecc46fa275f8753ddee2b4399045"
\ No newline at end of file
......@@ -29,3 +29,9 @@ ShowSQL = false
max_idle_conn = 10
table_prefix = ""
max_conn_life_time = 100
[spu]
data_source_name = "spu:spu@tcp(192.168.1.235:3306)/liexin_spu?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 20
max_idle_conn = 10
table_prefix = ""
max_conn_life_time = 100
\ No newline at end of file
#服务注册名称
micro_service_name = "go_supplier_sever"
micro_service_name = "go_upload_sync"
#etcd配置信息 etcd服务的ip端口用户密码
[etcd_config]
......
module go_supplier_sever
module go_upload_sync
go 1.14
......@@ -7,11 +7,13 @@ require (
github.com/gin-gonic/gin v1.6.3
github.com/go-redis/redis/v7 v7.4.0
github.com/go-sql-driver/mysql v1.5.0
github.com/gogf/gf v1.15.6
github.com/ichunt2019/cfg v0.0.0-20210310074903-4b1bcab17717
github.com/ichunt2019/go-rabbitmq v1.0.1
github.com/ichunt2019/go-redis-pool v0.0.0-20210305064829-86b9011c57f5
github.com/ichunt2019/ichunt-micro-registry v1.0.1
github.com/ichunt2019/lxLog v0.0.0-20210226024426-781becb3c042
github.com/imroc/req v0.3.0
github.com/mattn/go-sqlite3 v1.14.6 // indirect
github.com/spf13/viper v1.7.1
github.com/tidwall/gjson v1.6.8
......
......@@ -4,8 +4,8 @@ import (
"errors"
"fmt"
"github.com/gin-gonic/gin"
common "go_supplier_sever/app/common/function"
"go_supplier_sever/util"
common "go_upload_sync/app/common/function"
"go_upload_sync/util"
"github.com/ichunt2019/cfg/lib"
"runtime/debug"
)
......
......@@ -3,8 +3,8 @@ package middleware
import (
"bytes"
"github.com/gin-gonic/gin"
"go_supplier_sever/util"
"go_supplier_sever/util/lib"
"go_upload_sync/util"
"go_upload_sync/util/lib"
"io/ioutil"
"time"
)
......
......@@ -3,7 +3,7 @@ package middleware
import (
"encoding/json"
"fmt"
"go_supplier_sever/util/lib"
"go_upload_sync/util/lib"
"github.com/gin-gonic/gin"
"strings"
)
......
......@@ -4,7 +4,7 @@ import (
"context"
"github.com/gin-gonic/gin"
cfg "github.com/ichunt2019/cfg/lib"
"go_supplier_sever/middleware"
"go_upload_sync/middleware"
"log"
"net/http"
"time"
......
......@@ -2,7 +2,7 @@ package router
import (
"github.com/gin-gonic/gin"
"go_supplier_sever/app/controller"
"go_upload_sync/app/controller"
)
func InitRouter(middlewares ...gin.HandlerFunc) *gin.Engine {
......
package lib
import (
"encoding/json"
"github.com/ichunt2019/cfg/lib"
"github.com/imroc/req"
"github.com/tidwall/gjson"
)
//发送钉钉消息的包
type DingDingRequest struct {
MsgType string `json:"msgtype"`
Text map[string]string `json:"text"`
IsAtAll bool `json:"isAtAll"`
}
type DingDingResponse struct {
Errcode int `json:"errcode"`
Errmsg string `json:"errmsg"`
}
func DingDingPush(content string) (result DingDingResponse, err error) {
accessToken:=lib.Instance("config").GetString("DINGDING.SEARCH_API_MONITOR")
webhook := "https://oapi.dingtalk.com/robot/send?access_token=" + accessToken
data := make(map[string]interface{})
data["msgtype"] = "text"
data["text"] = map[string]string{
"content": content,
}
req.Debug = false
dataStrByte, _ := json.Marshal(data)
dataStr := string(dataStrByte)
//dataStr = strings.Replace(dataStr, "\\", "\\\\", -1)
params := req.BodyJSON(dataStr)
resp, err := req.Post(webhook, params, req.Header{
"Content-Type": "application/json",
"charset": "UTF-8",
})
if resp == nil {
return
}
result.Errcode = int(gjson.Get(resp.String(), "errcode").Int())
result.Errmsg = gjson.Get(resp.String(), "errmsg").String()
return
}
......@@ -3,7 +3,7 @@ package lib
import (
_ "github.com/go-sql-driver/mysql"
"github.com/ichunt2019/cfg/lib"
"go_supplier_sever/app/common/config"
"go_upload_sync/app/common/config"
"xorm.io/xorm"
)
......
......@@ -3,7 +3,7 @@ package util
import (
"context"
"go_supplier_sever/util/lib"
"go_upload_sync/util/lib"
"github.com/gin-gonic/gin"
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment