Commit 4f689a67 by mushishixian

开始出入库

parent d62fbed9
<?xml version="1.0" encoding="UTF-8"?><wsdl:definitions xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/" xmlns:ax2179="http://host.com/xsd" xmlns:ns1="http://org.apache.axis2/xsd" xmlns:ns="http://pdf.host.com" xmlns:wsaw="http://www.w3.org/2006/05/addressing/wsdl" xmlns:http="http://schemas.xmlsoap.org/wsdl/http/" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:mime="http://schemas.xmlsoap.org/wsdl/mime/" xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/" xmlns:ax2178="http://pdf.host.com/xsd" xmlns:soap12="http://schemas.xmlsoap.org/wsdl/soap12/" targetNamespace="http://pdf.host.com">
<wsdl:documentation>DataEndpoint</wsdl:documentation>
<wsdl:types>
<xs:schema xmlns:ax2181="http://pdf.host.com/xsd" attributeFormDefault="qualified" elementFormDefault="qualified" targetNamespace="http://pdf.host.com">
<xs:import namespace="http://pdf.host.com/xsd"/>
<xs:element name="getData">
<xs:complexType>
<xs:sequence>
<xs:element minOccurs="0" name="request" nillable="true" type="ax2178:DataGenerationReq"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="getDataResp">
<xs:complexType>
<xs:sequence>
<xs:element minOccurs="0" name="return" nillable="true" type="ax2178:DataGenerationResp"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
<xs:schema attributeFormDefault="qualified" elementFormDefault="qualified" targetNamespace="http://host.com/xsd">
<xs:complexType name="BaseReq">
<xs:sequence>
<xs:element minOccurs="0" name="clientIdentification" nillable="true" type="ax2179:ClientIdentification"/>
</xs:sequence>
<xs:attribute name="TestAttr" type="xs:string"></xs:attribute>
</xs:complexType>
<xs:complexType name="BaseResp">
<xs:sequence>
<xs:element minOccurs="0" name="errorDetails" nillable="true" type="ax2179:ErrorDetails"/>
<xs:element minOccurs="0" name="success" type="xs:boolean"/>
</xs:sequence>
</xs:complexType>
</xs:schema>
<xs:schema xmlns:ax2180="http://host.com/xsd" attributeFormDefault="qualified" elementFormDefault="qualified" targetNamespace="http://pdf.host.com/xsd">
<xs:import namespace="http://host.com/xsd"/>
<xs:complexType name="DataGenerationReq">
<xs:complexContent>
<xs:extension base="ax2180:BaseReq">
<xs:sequence>
<xs:element minOccurs="0" name="customerAccountNumber" nillable="true" type="xs:string"/>
<xs:element minOccurs="0" name="pdfGenerationReqType" type="xs:int"/>
<xs:element minOccurs="0" name="withCreditTranferForm" type="xs:boolean"/>
</xs:sequence>
</xs:extension>
</xs:complexContent>
</xs:complexType>
<xs:complexType name="DataGenerationResp">
<xs:complexContent>
<xs:extension base="ax2180:BaseResp">
<xs:sequence>
<xs:element minOccurs="0" name="pdf" nillable="true" type="xs:base64Binary"/>
<xs:element minOccurs="0" name="url" nillable="true" type="xs:string"/>
</xs:sequence>
</xs:extension>
</xs:complexContent>
</xs:complexType>
</xs:schema>
</wsdl:types>
<wsdl:message name="getDataReq">
<wsdl:part name="parameters" element="ns:getData"/>
</wsdl:message>
<wsdl:message name="getDataResp">
<wsdl:part name="parameters" element="ns:getDataResp"/>
</wsdl:message>
<wsdl:portType name="DataEndpointPortType">
<wsdl:operation name="getData">
<wsdl:input message="ns:getDataReq" wsaw:Action="urn:getData"/>
<wsdl:output message="ns:getDataResp" wsaw:Action="urn:getDataResp"/>
</wsdl:operation>
</wsdl:portType>
<wsdl:binding name="DataEndpointSoap11Binding" type="ns:DataEndpointPortType">
<soap:binding transport="http://schemas.xmlsoap.org/soap/http" style="document"/>
<wsdl:operation name="getData">
<soap:operation soapAction="urn:getData" style="document"/>
<wsdl:input>
<soap:body use="literal"/>
</wsdl:input>
<wsdl:output>
<soap:body use="literal"/>
</wsdl:output>
</wsdl:operation>
</wsdl:binding>
<wsdl:binding name="DataEndpointSoap12Binding" type="ns:DataEndpointPortType">
<soap12:binding transport="http://schemas.xmlsoap.org/soap/http" style="document"/>
<wsdl:operation name="getData">
<soap12:operation soapAction="urn:getData" style="document"/>
<wsdl:input>
<soap12:body use="literal"/>
</wsdl:input>
<wsdl:output>
<soap12:body use="literal"/>
</wsdl:output>
</wsdl:operation>
</wsdl:binding>
<wsdl:binding name="DataEndpointHttpBinding" type="ns:DataEndpointPortType">
<http:binding verb="POST"/>
<wsdl:operation name="getData">
<http:operation location="DataEndpoint/getData"/>
<wsdl:input>
<mime:content type="text/xml" part="getData"/>
</wsdl:input>
<wsdl:output>
<mime:content type="text/xml" part="getData"/>
</wsdl:output>
</wsdl:operation>
</wsdl:binding>
<wsdl:service name="DataEndpoint">
<wsdl:port name="DataEndpointHttpSoap11Endpoint" binding="ns:DataEndpointSoap11Binding">
<soap:address location="https://apitest.host.com/services/DataEndpoint.DataEndpointHttpSoap11Endpoint/"/>
</wsdl:port>
<wsdl:port name="DataEndpointHttpSoap12Endpoint" binding="ns:DataEndpointSoap12Binding">
<soap12:address location="https://apitest.host.com/services/DataEndpoint.DataEndpointHttpSoap12Endpoint/"/>
</wsdl:port>
<wsdl:port name="DataEndpointHttpEndpoint" binding="ns:DataEndpointHttpBinding">
<http:address location="https://apitest.host.com/services/DataEndpoint.DataEndpointHttpEndpoint/"/>
</wsdl:port>
</wsdl:service>
</wsdl:definitions>
\ No newline at end of file
package main package main
import ( import (
"fmt" "encoding/xml"
"github.com/tiaguinho/gosoap" "github.com/hooklift/gowsdl/example/gen"
"github.com/hooklift/gowsdl/soap"
"log" "log"
) )
func main() { // GetIPLocationResponse will hold the Soap response
type GetIPLocationResponse struct {
GetIPLocationResult string `xml:"GetIpLocationResult"`
}
soap, err := gosoap.SoapClient("http://192.168.2.253:6888/ormrpc/services/EASLogin?wsdl") // GetIPLocationResult will
//soap, err := gosoap.SoapClient("http://192.168.2.253:6888/ormrpc/services/WSInventoryManagementFacade?wsdl") type GetIPLocationResult struct {
if err != nil { XMLName xml.Name `xml:"GeoIP"`
log.Fatalf("SoapClient error: %s", err) Country string `xml:"Country"`
} State string `xml:"State"`
}
//(userName, password, slnName, dcName, language, dbType, authPattern) var (
//("WBYH", "123456", "eas", "demo", "L2", 1, "BaseDB") r GetIPLocationResponse
params := gosoap.Params{ )
"userName": "WBYH",
"password": "123456",
"slnName": "eas",
"dcName": "demo",
"language": "L2",
"dbType": 1,
"authPattern": "BaseDB",
}
//params:=gosoap.Params{
// "json":`{"FType":"material", "FID":"ET9WHFzYagYZf0="}`,
//}
//err = soap.Call("synRewriteErpBaseDataStatus", params) func main() {
err = soap.Call("login", params) client := soap.NewClient("http://192.168.2.253:6888/ormrpc/services/EASLogin?wsdl")
service := gen.NewStockQuotePortType(client)
reply, err := service.GetLastTradePrice(&gen.TradePriceRequest{})
if err != nil { if err != nil {
log.Fatalf("Call error: %s", err) log.Fatalf("could't get trade prices: %v", err)
} }
fmt.Println(string(soap.Body)) log.Println(reply)
//
//soap, err := gosoap.SoapClient("http://192.168.2.253:6888/ormrpc/services/EASLogin?wsdl")
//if err != nil {
// log.Fatalf("SoapClient error: %s", err)
//}
////
//params := gosoap.Params{
// "userName": "WBYH",
// "password": "123456",
// "slnName": "eas",
// "dcName": "demo",
// "language": "L2",
// "authPattern": "BaseDB",
// "dbType": 1,
//}
//
//
////params := gosoap.Params{
//// "json": `{"FType":"material", "FID":"ET9WHFzYagYZf0="}`,
////}
//
//res, err := soap.Call("login", params)
//if err != nil {
// log.Fatalf("Call error: %s", err)
//}
//fmt.Println(string(res.Body))
//return
//res.Unmarshal(&r)
//
//// GetIpLocationResult will be a string. We need to parse it to XML
//result := GetIPLocationResult{}
//err = xml.Unmarshal([]byte(r.GetIPLocationResult), &result)
//if err != nil {
// log.Fatalf("xml.Unmarshal error: %s", err)
//}
//
//if result.Country != "US" {
// log.Fatalf("error: %+v", r)
//}
//
//log.Println("Country: ", result.Country)
//log.Println("State: ", result.State)
} }
package main
import (
"encoding/json"
"errors"
"fmt"
"github.com/go-kratos/kratos/pkg/log"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
"github.com/imroc/req"
"scm_server/configs"
"scm_server/internal/common"
"scm_server/internal/logic"
"scm_server/internal/model"
"scm_server/internal/service"
"time"
)
type RecvPro struct {
}
type OutStoreMessage struct {
Type string
Data struct {
FWarehouseNo string //
FEntrustBillNo string
FErpPurInWorehouseNo string
FSourceBillID string //原始单据id
CFIsInsp bool
FPrincipalNo string
FBizType string
FEntrys FEntrys
}
}
type FEntrys struct {
FQty int
FIsRecordLotAndDC bool
FOriginCountry string
FIsPrint bool
CFIsInsp bool
FMaterialID string
FBrand string
FSourceBillEntryID string
FModel string
FGoods string
FUnit string
}
func init() {
log.Init(nil)
}
func (t *RecvPro) Consumer(dataByte []byte) error {
fmt.Println(string(dataByte))
return errors.New("test")
var (
message SupplierQueueMessage
err error
supplier model.Supplier
operateType string
syncLog model.SyncLog
)
//先去转换队列消息的json,如果失败,记录起来
if err = json.Unmarshal(dataByte, &message); err != nil {
goto ERR
}
//转换成supplier数据
supplier.ErpId = message.Data.FID
supplier.ErpSupplierCode = message.Data.FNUMBER
supplier.Name = message.Data.CFNAME
//判断操作类型
switch message.Type {
case "save":
//先去查询是否存在,不存在才去插入,已经存在即是修改
if logic.CheckSupplierExist(supplier.ErpId) {
operateType = "update"
if err = logic.UpdateSupplier(supplier); err != nil {
goto ERR
}
} else {
operateType = "insert"
if err = logic.InsertSupplier(supplier); err != nil {
goto ERR
}
}
case "delete":
operateType = "delete"
supplier.Status = 0
if logic.CheckSupplierExist(supplier.ErpId) {
//如果存在,才进行删除
if err = logic.DeleteSupplier(supplier); err != nil {
goto ERR
}
} else {
err = errors.New("试图删除不存在的供应商")
goto ERR
}
default:
err = errors.New("同步供应商出现不存在的操作类型")
goto ERR
}
//操作成功后还要去请求后端接口同步数据
if err = SyncSupplierData(operateType, supplier); err != nil {
goto ERR
}
fmt.Println("同步成功")
return nil
ERR:
//不存在的erp_id不去操作对应的数据库
if supplier.ErpId != "" {
logSyncErrorToSupplier(supplier.ErpId, err.Error())
}
//还要存到一个统一错误表
syncLog = model.SyncLog{
AddTime: time.Now().Unix(),
SyncTime: time.Now().Unix(),
QueueMessage: string(dataByte),
UniqueId: supplier.ErpId, //有可能为
SyncError: err.Error(),
SyncName: "supplier",
}
logic.InsertSyncLog(syncLog)
//发送钉钉错误消息
msg, _ := json.Marshal(syncLog)
service.SendMessage(common.ErrorSendPhone, string(msg))
//保存日志
log.Error("%s", string(msg))
return nil
}
func (t *RecvPro) FailAction(dataByte []byte) error {
fmt.Println("任务处理失败了,发送钉钉消息通知主人")
return nil
}
//同步数据
func SyncSupplierData(operate string, supplier model.Supplier) (err error) {
var (
resp *req.Resp
url string
respData common.Response
)
param := req.Param{
"erp_supplier_sn": supplier.ErpSupplierCode,
"supplier_name": supplier.Name,
"erp_supplier_id": supplier.ErpId,
"admin_name": "系统",
"admin_id": 1,
}
//更新和插入接口不同
if operate == "update" {
url = configs.BasicApiUrl + "/basic/api/ApiUpdateSupplierInfo"
} else if operate == "insert" {
url = configs.BasicApiUrl + "/basic/api/ApiInsertSupplierInfo"
} else {
url = configs.BasicApiUrl + "/basic/api/ApiUpdateSupplierStatus"
}
req.Debug = false
if operate == "update" || operate == "insert" {
resp, err = req.Post(url, param)
if err != nil {
return
}
if err = resp.ToJSON(&respData); err != nil {
return
}
if respData.Errcode != 101100 {
return errors.New(respData.Errmsg)
}
//都没问题,代表后端那边已经成功修改,修改同步表的状态
if err = logic.SyncSupplierSuccess(supplier.ErpId); err != nil {
return
}
} else {
param = req.Param{
"erp_supplier_id": supplier.ErpId,
"admin_name": "系统",
"admin_id": 1,
"status": 0,
}
//删除
resp, err = req.Post(url, param)
if err != nil {
return
}
if err = resp.ToJSON(&respData); err != nil {
return
}
if respData.Errcode != 101100 {
return errors.New(respData.Errmsg)
}
//都没问题,代表后端那边已经成功修改,修改同步表的状态
if err = logic.SyncCustomerSuccess(supplier.ErpId); err != nil {
return
}
}
return
}
func logSyncErrorToSupplier(erpId, syncError string) {
var err error
//请求失败的话,将原因存起来
if err = logic.WriteSupplierSyncError(erpId, syncError); err != nil {
//数据库错误,发送警告
service.SendMessage(common.ErrorSendPhone, err.Error())
}
}
func main() {
t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{
"store_in",
"store_in",
"store",
"direct",
"amqp://huntadmin:jy2y2900@192.168.1.237:5672/",
}, t, 1)
}
package main
import (
"encoding/json"
"fmt"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
)
func main() {
message := make(map[string]interface{})
message = map[string]interface{}{
"type": "save",
"data": map[string]interface{}{
"FWarehouseNo": "002",
"FEntrustBillNo": "B05364",
"FErpPurInWorehouseNo": "FKSH202003107482",
"FSourceBillID": "+7fvorsZSAevQCJ7ujsbLMBZJbY=",
"CFIsInsp": false,
"FPrincipalNo": "WT00268",
"FBizType": "执行采购",
"FEntrys": []map[string]interface{}{
{
"FQty": 9,
"FIsRecordLotAndDC": false,
"FOriginCountry": "502",
"FIsPrint": false,
"CFIsInsp": false,
"FMaterialID": "9l++jtj+RNS2VpdFhzcRE0QJ5/A=",
"FBrand": "ON",
"FSourceBillEntryID": "gHw/y6EQRBmCpLGDFkdJCO0oFFw=",
"FModel": "1abaaba",
"FGoods": "集成电路",
"FUnit": "个",
},
},
},
}
data, err := json.Marshal(message)
if err != nil {
fmt.Println(err)
}
body := string(data)
queueExchange := rabbitmq.QueueExchange{
"store_in",
"store_in",
"store",
"direct",
"amqp://huntadmin:jy2y2900@192.168.1.237:5672/",
}
rabbitmq.Send(queueExchange, body)
}
package main
import (
"encoding/json"
"errors"
"fmt"
"github.com/go-kratos/kratos/pkg/log"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
"github.com/imroc/req"
"scm_server/configs"
"scm_server/internal/common"
"scm_server/internal/logic"
"scm_server/internal/model"
"scm_server/internal/service"
"time"
)
type RecvPro struct {
}
type OutStoreMessage struct {
Type string
Data struct {
FWarehouseNo string //
FErpPostRequistionBillNo string
FIsCustoms bool
FSourceBillID string //原始单据id
FRecPerson string
FRecPersonContact string
FRecPersonAddress string
FEntrys OutStoreFEntrys
}
}
type OutStoreFEntrys struct {
FQty int
FMaterialID string
FBrand string
FPrice float64
FSourceBillEntryID string
FModel string
FGoods string
FUnit string
}
func init() {
log.Init(nil)
}
func (t *RecvPro) Consumer(dataByte []byte) error {
fmt.Println(string(dataByte))
var (
message OutStoreMessage
err error
outStore model.OutStore
operateType string
syncLog model.SyncLog
)
//先去转换队列消息的json,如果失败,记录起来
if err = json.Unmarshal(dataByte, &message); err != nil {
goto ERR
}
//判断操作类型
switch message.Type {
case "save":
//先去查询是否存在,不存在才去插入,已经存在即是修改
if logic.CheckSupplierExist(supplier.ErpId) {
err = errors.New("试图新增已存在的出货单,单号为" + message.Data.FSourceBillID)
} else {
operateType = "insert"
if err = logic.InsertSupplier(supplier); err != nil {
goto ERR
}
}
case "delete":
operateType = "delete"
supplier.Status = 0
if logic.CheckSupplierExist(supplier.ErpId) {
//如果存在,才进行删除
if err = logic.DeleteSupplier(supplier); err != nil {
goto ERR
}
} else {
err = errors.New("试图删除不存在的供应商")
goto ERR
}
default:
err = errors.New("同步供应商出现不存在的操作类型")
goto ERR
}
//操作成功后还要去请求后端接口同步数据
if err = SyncSupplierData(operateType, supplier); err != nil {
goto ERR
}
fmt.Println("同步成功")
return nil
ERR:
//不存在的erp_id不去操作对应的数据库
if supplier.ErpId != "" {
logSyncErrorToSupplier(supplier.ErpId, err.Error())
}
//还要存到一个统一错误表
syncLog = model.SyncLog{
AddTime: time.Now().Unix(),
SyncTime: time.Now().Unix(),
QueueMessage: string(dataByte),
UniqueId: supplier.ErpId, //有可能为
SyncError: err.Error(),
SyncName: "supplier",
}
logic.InsertSyncLog(syncLog)
//发送钉钉错误消息
msg, _ := json.Marshal(syncLog)
service.SendMessage(common.ErrorSendPhone, string(msg))
//保存日志
log.Error("%s", string(msg))
return nil
}
func (t *RecvPro) FailAction(dataByte []byte) error {
fmt.Println("任务处理失败了,发送钉钉消息通知主人")
return nil
}
//同步数据
func SyncSupplierData(operate string, supplier model.Supplier) (err error) {
var (
resp *req.Resp
url string
respData common.Response
)
param := req.Param{
"erp_supplier_sn": supplier.ErpSupplierCode,
"supplier_name": supplier.Name,
"erp_supplier_id": supplier.ErpId,
"admin_name": "系统",
"admin_id": 1,
}
//更新和插入接口不同
if operate == "update" {
url = configs.BasicApiUrl + "/basic/api/ApiUpdateSupplierInfo"
} else if operate == "insert" {
url = configs.BasicApiUrl + "/basic/api/ApiInsertSupplierInfo"
} else {
url = configs.BasicApiUrl + "/basic/api/ApiUpdateSupplierStatus"
}
req.Debug = false
if operate == "update" || operate == "insert" {
resp, err = req.Post(url, param)
if err != nil {
return
}
if err = resp.ToJSON(&respData); err != nil {
return
}
if respData.Errcode != 101100 {
return errors.New(respData.Errmsg)
}
//都没问题,代表后端那边已经成功修改,修改同步表的状态
if err = logic.SyncSupplierSuccess(supplier.ErpId); err != nil {
return
}
} else {
param = req.Param{
"erp_supplier_id": supplier.ErpId,
"admin_name": "系统",
"admin_id": 1,
"status": 0,
}
//删除
resp, err = req.Post(url, param)
if err != nil {
return
}
if err = resp.ToJSON(&respData); err != nil {
return
}
if respData.Errcode != 101100 {
return errors.New(respData.Errmsg)
}
//都没问题,代表后端那边已经成功修改,修改同步表的状态
if err = logic.SyncCustomerSuccess(supplier.ErpId); err != nil {
return
}
}
return
}
func logSyncErrorToSupplier(erpId, syncError string) {
var err error
//请求失败的话,将原因存起来
if err = logic.WriteSupplierSyncError(erpId, syncError); err != nil {
//数据库错误,发送警告
service.SendMessage(common.ErrorSendPhone, err.Error())
}
}
func main() {
t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{
"store_out",
"store_out",
"store",
"direct",
"amqp://huntadmin:jy2y2900@192.168.1.237:5672/",
}, t, 1)
}
package main
import (
"encoding/json"
"fmt"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
)
func main() {
message := make(map[string]interface{})
message = map[string]interface{}{
"type": "save",
"data": map[string]interface{}{
"FWarehouseNo": "002",
"FEntrustBillNo": "B05364",
"FErpPurInWorehouseNo": "FKSH202003107482",
"FSourceBillID": "+7fvorsZSAevQCJ7ujsbLMBZJbY=",
"CFIsInsp": false,
"FPrincipalNo": "WT00268",
"FBizType": "执行采购",
"FEntrys": []map[string]interface{}{
{
"FQty": 9,
"FIsRecordLotAndDC": false,
"FOriginCountry": "502",
"FIsPrint": false,
"CFIsInsp": false,
"FMaterialID": "9l++jtj+RNS2VpdFhzcRE0QJ5/A=",
"FBrand": "ON",
"FSourceBillEntryID": "gHw/y6EQRBmCpLGDFkdJCO0oFFw=",
"FModel": "1abaaba",
"FGoods": "集成电路",
"FUnit": "个",
},
},
},
}
data, err := json.Marshal(message)
if err != nil {
fmt.Println(err)
}
body := string(data)
queueExchange := rabbitmq.QueueExchange{
"store_out",
"store_out",
"store",
"direct",
"amqp://huntadmin:jy2y2900@192.168.1.237:5672/",
}
rabbitmq.Send(queueExchange, body)
}
package main
import (
"scm_server/cmd/queue/user_add/user_add_queue_logic"
"scm_server/configs"
"scm_server/internal/dao"
"scm_server/internal/service"
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"log"
"strconv"
"time"
)
type WaitUpdateUserSale struct {
User_Id int `json:"user_id"`
Sale_Id int `json:"sale_id"`
}
func main() {
//定义队列类型和错误类型
service.ProGramErrType = "crm_update_user_sales"
conn, err := amqp.Dial("amqp://"+configs.RABBITMQDSN+"/")
if err != nil {
service.WriteErrDetail(err.Error() + "Failed to connect to RabbitMQ")
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
service.WriteErrDetail(err.Error() + "Failed to open a channel")
}
defer ch.Close()
q, err := ch.QueueDeclare(
service.ProGramErrType, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
service.WriteErrDetail(err.Error()+"Failed to declare a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
service.WriteErrDetail(err.Error() +"Failed to register a consumer")
}
forever := make(chan bool)
var updateUserSale WaitUpdateUserSale
go func() {
for d := range msgs {
fmt.Println(string(d.Body))
json.Unmarshal(d.Body,&updateUserSale)
handle(updateUserSale.User_Id,updateUserSale.Sale_Id)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
//修改用户的客服
func handle(userId,saleId int)(result bool) {
//查找出crm中的用户
var crmUserId int
_ = dao.GetDb().QueryRowx("select user_id from lie_user where outter_uid = ?",userId).Scan(&crmUserId)
if crmUserId == 0 {
//不存在推一个队列,然后停止2秒,再检查
user_add_queue_logic.PushUserQueue(userId)
time.Sleep(2*time.Second)
_ = dao.GetDb().QueryRowx("select user_id from lie_user where outter_uid = ?",userId).Scan(&crmUserId)
if crmUserId == 0{
service.WriteErrDetail(strconv.Itoa(crmUserId)+"不存在")
return false
}
}
var timeNow = time.Now().Unix()
_,err := dao.GetDb().Exec("update lie_salesman set assign_time = ?,update_time = ?,sale_id= ? where user_id = ?",timeNow,timeNow,saleId,crmUserId)
fmt.Println(err)
_,err = dao.GetDb().Exec("insert into lie_action_log(`user_id`,`operator_id`,`create_time`,`remark`,`admin`,`event`)value(?,?,?,'转为已下单用户','admin','用户下单')",crmUserId,saleId,timeNow)
fmt.Println(err)
return true
}
package main
import (
"scm_server/configs"
"scm_server/internal/common"
"scm_server/internal/logic"
"scm_server/internal/model"
"scm_server/internal/service"
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"log"
"strconv"
)
var(
InsertData model.MemberAddUserData
)
type WaitAddUser struct {
User_Id int `json:"user_id"`
}
//监听用户添加队列
func main(){
//定义队列类型和错误类型
service.ProGramErrType = "member_user_add"
//设置所有城市
common.SetCityName()
conn, err := amqp.Dial("amqp://"+configs.RABBITMQDSN+"/")
if err != nil {
service.WriteErrDetail(err.Error() + "Failed to connect to RabbitMQ")
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
service.WriteErrDetail(err.Error() + "Failed to open a channel")
}
defer ch.Close()
q, err := ch.QueueDeclare(
service.ProGramErrType, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
service.WriteErrDetail(err.Error()+"Failed to declare a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
service.WriteErrDetail(err.Error() +"Failed to register a consumer")
}
forever := make(chan bool)
var user WaitAddUser
go func() {
for d := range msgs {
json.Unmarshal(d.Body,&user)
fmt.Println(user)
handle(user.User_Id)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
func handle(memberId int)(result bool) {
//获取当前会员的基本数据
InsertData = logic.GetMemberUserInfo(memberId)
//检测用户是否存在
if InsertData.User_Id == 0 {
service.WriteErrDetail(strconv.Itoa(memberId) + "会员系统中用户信息不存在")
return false
}
if logic.CheckMemberIdIsHave(InsertData.User_Id) != 0 {
service.WriteErrDetail(strconv.Itoa(memberId) + "crm系统中用户已经存在")
return false
}
//处理得到的用户数据
InsertData = logic.HandleData(InsertData)
//插入数据
if !logic.InsertMemberUser(InsertData) {
errSourcedata, _ := json.Marshal(InsertData)
service.WriteErrDetail("插入用户数据错误:" + string(errSourcedata))
return false
}
return true
}
\ No newline at end of file
package user_add_queue_logic
import (
"scm_server/configs"
"scm_server/internal/service"
"encoding/json"
"github.com/streadway/amqp"
)
type WaitAddUser struct {
User_Id int `json:"user_id"`
}
//推送一个用户数据
func PushUserQueue(memberId int) {
var user WaitAddUser
user.User_Id = memberId
//链接mq
conn, err := amqp.Dial("amqp://"+configs.RABBITMQDSN+"/")
defer conn.Close()
service.WriteErr(err)
//通道
ch, err := conn.Channel()
defer ch.Close()
service.WriteErr(err)
//设置数据
q, err := ch.QueueDeclare(
"member_user_add", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
service.WriteErr(err)
sendBody,err := json.Marshal(user)
ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: sendBody,
})
service.WriteErr(err)
}
\ No newline at end of file
...@@ -4,12 +4,10 @@ go 1.13 ...@@ -4,12 +4,10 @@ go 1.13
require ( require (
github.com/go-kratos/kratos v0.4.2 github.com/go-kratos/kratos v0.4.2
github.com/hooklift/gowsdl v0.3.1 github.com/hooklift/gowsdl v0.3.2-0.20200216020636-7a3e6bce010b
github.com/ichunt2019/go-rabbitmq v1.0.1 github.com/ichunt2019/go-rabbitmq v1.0.1
github.com/imroc/req v0.3.0 github.com/imroc/req v0.3.0
github.com/jmoiron/sqlx v1.2.0 github.com/jmoiron/sqlx v1.2.0
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71
github.com/tealeg/xlsx v1.0.5 github.com/tealeg/xlsx v1.0.5
github.com/tiaguinho/gosoap v1.2.0
) )
...@@ -101,8 +101,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf ...@@ -101,8 +101,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hooklift/gowsdl v0.3.1 h1:tpc8hTwY3HjyucyB2W1qfQRcfMiStWQJE/CNQSqHM9c= github.com/hooklift/gowsdl v0.3.2-0.20200216020636-7a3e6bce010b h1:rNDZgrP4P3Fm48SLvy6n1+G9l8rfNxGZuJho+0sg4mM=
github.com/hooklift/gowsdl v0.3.1/go.mod h1:TYmt7jpe3F5zLlMtKGetjHLwUBIAF5JCd+NYq+mQ/Zk= github.com/hooklift/gowsdl v0.3.2-0.20200216020636-7a3e6bce010b/go.mod h1:TYmt7jpe3F5zLlMtKGetjHLwUBIAF5JCd+NYq+mQ/Zk=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ichunt2019/go-rabbitmq v1.0.1 h1:qHhpGm9v7jnhSBo3f3viX+BSky9yugp9lCSV03eYsF4= github.com/ichunt2019/go-rabbitmq v1.0.1 h1:qHhpGm9v7jnhSBo3f3viX+BSky9yugp9lCSV03eYsF4=
github.com/ichunt2019/go-rabbitmq v1.0.1/go.mod h1:TQsZ1XWULyvm4UwpYHwNPtOXYbuVvLLI0GM7g/BRy68= github.com/ichunt2019/go-rabbitmq v1.0.1/go.mod h1:TQsZ1XWULyvm4UwpYHwNPtOXYbuVvLLI0GM7g/BRy68=
...@@ -235,8 +235,6 @@ github.com/tealeg/xlsx v1.0.5 h1:+f8oFmvY8Gw1iUXzPk+kz+4GpbDZPK1FhPiQRd+ypgE= ...@@ -235,8 +235,6 @@ github.com/tealeg/xlsx v1.0.5 h1:+f8oFmvY8Gw1iUXzPk+kz+4GpbDZPK1FhPiQRd+ypgE=
github.com/tealeg/xlsx v1.0.5/go.mod h1:btRS8dz54TDnvKNosuAqxrM1QgN1udgk9O34bDCnORM= github.com/tealeg/xlsx v1.0.5/go.mod h1:btRS8dz54TDnvKNosuAqxrM1QgN1udgk9O34bDCnORM=
github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161/go.mod h1:wM7WEvslTq+iOEAMDLSzhVuOt5BRZ05WirO+b09GHQU= github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161/go.mod h1:wM7WEvslTq+iOEAMDLSzhVuOt5BRZ05WirO+b09GHQU=
github.com/templexxx/xor v0.0.0-20181023030647-4e92f724b73b/go.mod h1:5XA7W9S6mni3h5uvOC75dA3m9CCCaS83lltmc0ukdi4= github.com/templexxx/xor v0.0.0-20181023030647-4e92f724b73b/go.mod h1:5XA7W9S6mni3h5uvOC75dA3m9CCCaS83lltmc0ukdi4=
github.com/tiaguinho/gosoap v1.2.0 h1:K3L29Wk6PkgGob/u24bd4G/rDy5QGnsnDxljAGRRuUM=
github.com/tiaguinho/gosoap v1.2.0/go.mod h1:m/W/ocE01n6v6UkXstKa3ltZvsZLEaxJ9BIs5pRCrZA=
github.com/tjfoc/gmsm v1.0.1/go.mod h1:XxO4hdhhrzAd+G4CjDqaOkd0hUzmtPR/d3EiBBMn/wc= github.com/tjfoc/gmsm v1.0.1/go.mod h1:XxO4hdhhrzAd+G4CjDqaOkd0hUzmtPR/d3EiBBMn/wc=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
......
...@@ -7,6 +7,7 @@ type Customer struct { ...@@ -7,6 +7,7 @@ type Customer struct {
AddTime int AddTime int
SyncTime int SyncTime int
SyncStatus int SyncStatus int
SyncErpStatus int
Status int Status int
SyncError string SyncError string
} }
package model
//{
// "receive_address": "万科星火online",
// "receive_user_name": "张三",
// "receive_tel": "13555556666",
// "unique": "ce0bdf3c0a3fc32843a20c2ad09f499e",
// "remark": "加糖",
// "store_id": "12",
// "customer_id": "213",
// "warehousing_sn": "1232131231",
// "is_apply_customs": "1",
// "is_insp": "1",
// "out_store_detail": [
// {
// "erp_entry_sn": "123",
// "is_apply_customs": "1",
// "goods_id": 13113,
// "goods_name": 2131,
// "brand_name": 213,
// "batch": "13",
// "dc": "dccc",
// "number": 100,
// "goods_unit": "美元",
// "is_insp": 1
// }
// ]
//}
type OutStore struct {
QueueMessage string
BillId string
AddTime int
SyncTime int
SyncStatus int
SyncErpStatus int
Status int
SyncError string
}
...@@ -7,6 +7,7 @@ type Supplier struct { ...@@ -7,6 +7,7 @@ type Supplier struct {
AddTime int AddTime int
SyncTime int SyncTime int
SyncStatus int SyncStatus int
SyncErpStatus int
SyncError string SyncError string
Status int Status int
} }
package myservice
import (
"encoding/xml"
"github.com/hooklift/gowsdl/soap"
"time"
)
// against "unused imports"
var _ time.Time
var _ xml.Name
type WSContext struct {
SlnName string `xml:"slnName,omitempty"`
Password string `xml:"password,omitempty"`
DbType int32 `xml:"dbType,omitempty"`
UserName string `xml:"userName,omitempty"`
SessionId string `xml:"sessionId,omitempty"`
DcName string `xml:"dcName,omitempty"`
}
type RequestData struct {
SlnName string `xml:"slnName,omitempty"`
Password string `xml:"password,omitempty"`
DbType int32 `xml:"dbType,omitempty"`
UserName string `xml:"userName,omitempty"`
DcName string `xml:"dcName,omitempty"`
Language string `xml:"language,omitempty"`
AuthPattern string `xml:"authPattern,omitempty"`
}
type EASLoginProxy interface {
Login(request *RequestData) (*WSContext, error)
}
type eASLoginProxy struct {
client *soap.Client
}
func NewEASLoginProxy(client *soap.Client) EASLoginProxy {
return &eASLoginProxy{
client: client,
}
}
func (service *eASLoginProxy) Login(request *RequestData) (*WSContext, error) {
response := new(WSContext)
err := service.client.Call("", request, response)
if err != nil {
return nil, err
}
return response, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment