Commit 4f4bc2c7 by mushishixian

添加审核状态队列

parent 20db06bd
package main
import (
"encoding/json"
"fmt"
"github.com/beevik/etree"
"github.com/go-kratos/kratos/pkg/log"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
"github.com/mushishixian/gosoap"
"github.com/pkg/errors"
"scm_server/cmd/queue/sync_audit_status/product"
"scm_server/configs"
"scm_server/internal/common"
"scm_server/internal/logic"
"scm_server/internal/model"
"scm_server/internal/service"
"time"
)
type RecvPro struct {
}
type QueueAuditStatus struct {
SyncName string
Status int
ErpId string
}
func init() {
product.SyncErpStatus("in_store", "+7fvorsZSAevQCJ7ujsbLMBZJbY=")
log.Init(nil)
}
func (t *RecvPro) Consumer(dataByte []byte) (err error) {
fmt.Println(string(dataByte))
var (
message QueueAuditStatus
auditStatusLog model.AuditStatusLog
)
//先去转换队列消息的json
if err = json.Unmarshal(dataByte, &message); err != nil {
goto ERR
}
//得到message以后,去请求金蝶的接口
fmt.Println("请求金蝶接口")
if err = SyncErpAuditStatus(message.SyncName, message.ErpId); err != nil {
goto ERR
}
return nil
ERR:
auditStatusLog.SyncName = message.SyncName
auditStatusLog.UniqueId = message.ErpId
auditStatusLog.SyncError = err.Error()
auditStatusLog.AddTime = time.Now().Unix()
//如果失败了,记录到表,并且发送钉钉消息
if err = logic.InsertAuditStatusLog(auditStatusLog); err != nil {
log.Error(err.Error())
service.SendMessage(common.ErrorSendPhone, err.Error())
}
service.SendMessage(common.ErrorSendPhone, auditStatusLog.SyncError)
return nil
}
func LoginErp() (err error) {
soap, err := gosoap.SoapClient(configs.ErpLoginStatusApi)
if err != nil {
return
}
params := gosoap.ArrayParams{
{"userName", "WBYH"},
{"password", "123456"},
{"slnName", "eas"},
{"dcName", "demo"},
{"language", "L2"},
{"dbType", "1"},
{"authPattern", "BaseDB"},
}
res, err := soap.Call("login", params)
if err != nil {
return
}
doc := etree.NewDocument()
if err := doc.ReadFromBytes(res.Body); err != nil {
return err
}
root := doc.SelectElement("multiRef")
sessionId := root.SelectElement("sessionId").Text()
if sessionId != "" {
return
}
return errors.New(string(res.Body))
}
func SyncErpAuditStatus(syncName, uniqueId string) (err error) {
if err = LoginErp(); err != nil {
return
}
var (
soap *gosoap.Client
params gosoap.ArrayParams
res *gosoap.Response
apiUrl string
apiMethod string
)
switch syncName {
case "in_store":
apiUrl = configs.ErpSyncBillStatusApi
apiMethod = "synInWarehousAudit"
break
case "out_store":
apiUrl = configs.ErpSyncBillStatusApi
apiMethod = "synOutWarehousAudit"
break
default:
return errors.New("同步金蝶审核状态出现非法的同步类型" + syncName)
}
soap, err = gosoap.SoapClient(apiUrl)
if err != nil {
return
}
params = gosoap.ArrayParams{
{"json", fmt.Sprintf(`{"FSourceBillID":"%s"}`, uniqueId)},
}
res, err = soap.Call(apiMethod, params)
if err != nil {
return
}
return CheckBillDataRequest(res)
}
//检查出入库单申请同步请求的情况
func CheckBillDataRequest(res *gosoap.Response) (err error) {
doc := etree.NewDocument()
fmt.Println(string(res.Body))
if err = doc.ReadFromBytes(res.Body); err != nil {
return
}
//没有这个xml节点代表金蝶报错了
root := doc.SelectElement("ns1:synInWarehousAuditResponse")
if root == nil {
return errors.New(string(res.Body))
}
result := root.SelectElement("synInWarehousAuditReturn")
if result != nil {
var responseData map[string]string
json.Unmarshal([]byte(result.Text()), &responseData)
for key, value := range responseData {
//金蝶判断成功的标志
if key == "0000" {
return
}
//金额判断失败的标志
if key == "4444" {
return errors.New("金蝶返回失败,失败信息为:" + value)
}
}
}
return errors.New("金蝶返回响应格式无法识别")
}
func (t *RecvPro) FailAction(dataByte []byte) error {
fmt.Println("任务处理失败了,发送钉钉消息通知主人")
return nil
}
func main() {
t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{
"store_audit_status",
"store_audit_status",
"store",
"direct",
configs.RABBITMQDSN,
}, t, 1)
}
package main package product
import ( import (
"encoding/json" "encoding/json"
...@@ -6,13 +6,12 @@ import ( ...@@ -6,13 +6,12 @@ import (
"scm_server/configs" "scm_server/configs"
) )
func main(){ func SyncErpStatus(syncName string, ErpId string) (err error) {
//{"FType":"out_store", "FID":"+7fvorsZSAevQCJ7ujsbLMBZJbY=121"}
message := make(map[string]interface{}) message := make(map[string]interface{})
message = map[string]interface{}{ message = map[string]interface{}{
"SyncName": "out_store", "SyncName": syncName,
"Status": 1, "Status": 1,
"ErpId": "+7fvorsZSAevQCJ7ujsbLMBZJbY=121", "ErpId": ErpId,
} }
data, err := json.Marshal(message) data, err := json.Marshal(message)
if err != nil { if err != nil {
...@@ -20,12 +19,13 @@ func main(){ ...@@ -20,12 +19,13 @@ func main(){
} }
body := string(data) body := string(data)
queueExchange := rabbitmq.QueueExchange{ queueExchange := rabbitmq.QueueExchange{
"store_sync_status", "store_audit_status",
"store_sync_status", "store_audit_status",
"store", "store",
"direct", "direct",
configs.RABBITMQDSN, configs.RABBITMQDSN,
} }
rabbitmq.Send(queueExchange, body) rabbitmq.Send(queueExchange, body)
} return
\ No newline at end of file }
package logic
import (
"scm_server/internal/dao"
"scm_server/internal/model"
)
//插入金蝶同步状态错误信息
const InsertAuditStatusLogSql = "INSERT INTO `lie_audit_status_log` ( `sync_name` , `unique_id` , `sync_error` , `add_time` ) VALUE (?,?,?,?)"
func InsertAuditStatusLog(auditStatusLog model.AuditStatusLog) (err error) {
_, err = dao.GetDb().Exec(InsertAuditStatusLogSql, auditStatusLog.SyncName, auditStatusLog.UniqueId, auditStatusLog.SyncError, auditStatusLog.AddTime)
return
}
package model
type AuditStatusLog struct {
SyncName string
SyncError string
UniqueId string
AddTime int64
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment