Commit ed352464 by mushishixian

导入初始数据改成投递队列

parent 506f7794
package customer
import (
"fmt"
"github.com/imroc/req"
"github.com/tealeg/xlsx"
"scm_server/configs"
"scm_server/internal/common"
"scm_server/internal/logic"
"scm_server/internal/model"
)
//导入委托方的信息(excel导入)
func Imports() {
var (
excelFileName string
xlFile *xlsx.File
err error
customer model.Customer
)
excelFileName = "./cmd/source/data/customer.xlsx"
xlFile, err = xlsx.OpenFile(excelFileName)
if err != nil {
fmt.Printf("open failed: %s\n", err)
}
//循环遍历数据
for _, sheet := range xlFile.Sheets {
for key, row := range sheet.Rows {
if key == 0 {
continue
}
customer.ErpId = row.Cells[0].String()
customer.ErpClientCode = row.Cells[1].String()
customer.Name = row.Cells[2].String()
//存在的记录跳过
if logic.CheckCustomerExist(customer.ErpId) {
continue
}
if err = logic.InsertCustomer(customer); err != nil {
fmt.Println(err)
break
}
//插入成功后还要去请求后端接口同步数据
SyncCustomerData(customer)
}
}
}
//同步数据
func SyncCustomerData(customer model.Customer) bool {
var (
resp *req.Resp
url string
respData common.Response
err error
)
param := req.Param{
"erp_client_sn": customer.ErpClientCode,
"customer_name": customer.Name,
"erp_customer_id": customer.ErpId,
"admin_name": "admin@ichunt.com",
"admin_id": 1000,
}
url = configs.BasicApiUrl + "/basic/api/ApiInsertCustomerInfo"
req.Debug = true
resp, err = req.Post(url, param)
if err != nil {
logSyncErrorToDataBase(customer.ErpId, err.Error())
return false
}
if err = resp.ToJSON(&respData); err != nil {
logSyncErrorToDataBase(customer.ErpId, err.Error())
return false
}
if respData.Errcode != 101100 {
logSyncErrorToDataBase(customer.ErpId, respData.Errmsg)
return false
}
//都没问题,代表后端那边已经成功修改,修改同步表的状态
if err = logic.SyncCustomerSuccess(customer.ErpId); err != nil {
fmt.Println(err)
}
}
func logSyncErrorToDataBase(erpId, syncError string) {
var err error
//请求失败的话,将原因存起来
if err = logic.WriteCustomerSyncError(erpId, syncError); err != nil {
//写数据失败,记录到日志
fmt.Println(err)
}
}
package supplier
import (
"fmt"
"github.com/imroc/req"
"github.com/tealeg/xlsx"
"scm_server/configs"
"scm_server/internal/common"
"scm_server/internal/logic"
"scm_server/internal/model"
)
//导入委托方的信息(excel导入)
func Import() {
var (
excelFileName string
xlFile *xlsx.File
err error
supplier model.Supplier
)
excelFileName = "./cmd/source/data/supplier.xlsx"
xlFile, err = xlsx.OpenFile(excelFileName)
if err != nil {
fmt.Printf("open failed: %s\n", err)
}
for _, sheet := range xlFile.Sheets {
for key, row := range sheet.Rows {
if key > 0 {
supplier.ErpId = row.Cells[0].String()
supplier.ErpSupplierCode = row.Cells[1].String()
supplier.Name = row.Cells[2].String()
//先去查询是否存在,不存在才去插入
if logic.CheckSupplierExist(supplier.ErpId) {
continue
}
if err = logic.InsertSupplier(supplier); err != nil {
fmt.Println(err)
continue
}
//插入成功后还要去请求后端接口同步数据
if !SyncSupplierData(supplier) {
continue
}
}
}
}
}
//同步数据
func SyncSupplierData(supplier model.Supplier) bool {
var (
resp *req.Resp
url string
respData common.Response
err error
)
param := req.Param{
"erp_supplier_sn": supplier.ErpSupplierCode,
"supplier_name": supplier.Name,
"erp_supplier_id": supplier.ErpId,
"admin_name": "admin@ichunt.com",
"admin_id": 1000,
}
url = configs.BasicApiUrl + "/basic/api/ApiInsertSupplierInfo"
req.Debug = true
resp, err = req.Post(url, param)
if err != nil {
logSyncErrorToDataBase(supplier.ErpId, err.Error())
return false
}
if err = resp.ToJSON(&respData); err != nil {
logSyncErrorToDataBase(supplier.ErpId, err.Error())
return false
}
if respData.Errcode != 101100 {
logSyncErrorToDataBase(supplier.ErpId, respData.Errmsg)
return false
}
//都没问题,代表后端那边已经成功修改,修改同步表的状态
if err = logic.SyncSupplierSuccess(supplier.ErpId); err != nil {
fmt.Println(err)
}
return true
}
func logSyncErrorToDataBase(erpId, syncError string) {
var err error
//请求失败的话,将原因存起来
if err = logic.WriteSupplierSyncError(erpId, syncError); err != nil {
//写数据失败,记录到日志
fmt.Println(err)
}
}
package main
import (
"encoding/json"
"errors"
"fmt"
"github.com/go-kratos/kratos/pkg/log"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
"github.com/imroc/req"
"scm_server/configs"
"scm_server/internal/common"
"scm_server/internal/logic"
"scm_server/internal/model"
"scm_server/internal/service"
"time"
)
type RecvPro struct {
}
type SupplierQueueMessage struct {
Type string
Data struct {
FID string
FNUMBER string
CFNAME string
}
}
func init() {
log.Init(nil)
}
func (t *RecvPro) Consumer(dataByte []byte) error {
var (
message SupplierQueueMessage
err error
supplier model.Supplier
operateType string
syncLog model.SyncLog
)
//先去转换队列消息的json,如果失败,记录起来
if err = json.Unmarshal(dataByte, &message); err != nil {
goto ERR
}
//转换成supplier数据
supplier.ErpId = message.Data.FID
supplier.ErpSupplierCode = message.Data.FNUMBER
supplier.Name = message.Data.CFNAME
//判断操作类型
switch message.Type {
case "save":
//先去查询是否存在,不存在才去插入,已经存在即是修改
if logic.CheckSupplierExist(supplier.ErpId) {
operateType = "update"
if err = logic.UpdateSupplier(supplier); err != nil {
goto ERR
}
} else {
operateType = "insert"
if err = logic.InsertSupplier(supplier); err != nil {
goto ERR
}
}
case "delete":
operateType = "delete"
if logic.CheckSupplierExist(supplier.ErpId) {
//如果存在,才进行删除
if err = logic.DeleteSupplier(supplier.ErpId); err != nil {
goto ERR
}
} else {
err = errors.New("试图删除不存在的供应商")
goto ERR
}
default:
err = errors.New("同步供应商出现不存在的操作类型")
goto ERR
}
//操作成功后还要去请求后端接口同步数据
if err = SyncSupplierData(operateType, supplier); err != nil {
goto ERR
}
fmt.Println("同步成功")
return nil
ERR:
//不存在的erp_id不去操作对应的数据库
if supplier.ErpId != "" {
logSyncErrorToSupplier(supplier.ErpId, err.Error())
}
//还要存到一个统一错误表
syncLog = model.SyncLog{
AddTime: time.Now().Unix(),
SyncTime: time.Now().Unix(),
QueueMessage: string(dataByte),
UniqueId: supplier.ErpId, //有可能为
SyncError: err.Error(),
SyncName: "supplier",
}
logic.InsertSyncLog(syncLog)
//发送钉钉错误消息
msg, _ := json.Marshal(syncLog)
service.SendMessage(common.ErrorSendPhone, string(msg))
//保存日志
log.Error("%s", string(msg))
return nil
}
func (t *RecvPro) FailAction(dataByte []byte) error {
fmt.Println("任务处理失败了,发送钉钉消息通知主人")
return nil
}
//同步数据
func SyncSupplierData(operate string, supplier model.Supplier) (err error) {
var (
resp *req.Resp
url string
respData common.Response
)
param := req.Param{
"erp_supplier_sn": supplier.ErpSupplierCode,
"supplier_name": supplier.Name,
"erp_supplier_id": supplier.ErpId,
"admin_name": "admin@ichunt.com",
"admin_id": 1000,
}
//更新和插入接口不同
if operate == "update" {
url = configs.BasicApiUrl + "/basic/api/ApiUpdateSupplierInfo"
} else if operate == "insert" {
url = configs.BasicApiUrl + "/basic/api/ApiInsertSupplierInfo"
} else {
url = configs.BasicApiUrl + "/basic/api/ApiDeleteSupplierInfo"
}
req.Debug = false
if operate == "update" || operate == "insert" {
resp, err = req.Post(url, param)
if err != nil {
return
}
if err = resp.ToJSON(&respData); err != nil {
return
}
if respData.Errcode != 101100 {
return errors.New(respData.Errmsg)
}
//都没问题,代表后端那边已经成功修改,修改同步表的状态
if err = logic.SyncSupplierSuccess(supplier.ErpId); err != nil {
return
}
} else {
//删除
}
return
}
func logSyncErrorToSupplier(erpId, syncError string) {
var err error
//请求失败的话,将原因存起来
if err = logic.WriteSupplierSyncError(erpId, syncError); err != nil {
//数据库错误,发送警告
service.SendMessage(common.ErrorSendPhone, err.Error())
}
}
func main() {
t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{
"store_supplier",
"store_supplier",
"store",
"direct",
"amqp://huntadmin:jy2y2900@192.168.1.237:5672/",
}, t, 3)
}
package main
import (
"encoding/json"
"fmt"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
)
func main() {
message := make(map[string]interface{})
message = map[string]interface{}{
"type": "save",
"data": map[string]string{
"FID": "Z8eVSzSLRXKa7ET9WHFzYagYZf0171=",
"FNUMBER": "WT0050333",
"CFNAME": "深圳市鼎驰达电子有限公司",
},
}
data,err := json.Marshal(message)
if err!=nil {
fmt.Println(err)
}
body := string(data)
queueExchange := rabbitmq.QueueExchange{
"store_supplier",
"store_supplier",
"store",
"direct",
"amqp://huntadmin:jy2y2900@192.168.1.237:5672/",
}
rabbitmq.Send(queueExchange, body)
}
...@@ -100,7 +100,6 @@ ERR: ...@@ -100,7 +100,6 @@ ERR:
SyncName: "supplier", SyncName: "supplier",
} }
logic.InsertSyncLog(syncLog) logic.InsertSyncLog(syncLog)
//发送钉钉错误消息 //发送钉钉错误消息
msg, _ := json.Marshal(syncLog) msg, _ := json.Marshal(syncLog)
service.SendMessage(common.ErrorSendPhone, string(msg)) service.SendMessage(common.ErrorSendPhone, string(msg))
......
package customer package customer
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/imroc/req" "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
"github.com/tealeg/xlsx" "github.com/tealeg/xlsx"
"scm_server/configs"
"scm_server/internal/common"
"scm_server/internal/logic" "scm_server/internal/logic"
"scm_server/internal/model" "scm_server/internal/model"
) )
var Exchange rabbitmq.QueueExchange
//导入委托方的信息(excel导入) //导入委托方的信息(excel导入)
func init() {
Exchange = rabbitmq.QueueExchange{
"store_customer",
"store_customer",
"store",
"direct",
"amqp://huntadmin:jy2y2900@192.168.1.237:5672/",
}
}
func Import() { func Import() {
var ( var (
excelFileName string excelFileName string
...@@ -23,70 +33,41 @@ func Import() { ...@@ -23,70 +33,41 @@ func Import() {
if err != nil { if err != nil {
fmt.Printf("open failed: %s\n", err) fmt.Printf("open failed: %s\n", err)
} }
i:=0
//循环遍历数据 //循环遍历数据
for _, sheet := range xlFile.Sheets { for _, sheet := range xlFile.Sheets {
for key, row := range sheet.Rows { for key, row := range sheet.Rows {
//跳过表头
if key == 0 { if key == 0 {
continue continue
} }
customer.ErpId = row.Cells[0].String() customer.ErpId = row.Cells[0].String()
customer.ErpClientCode = row.Cells[1].String() customer.ErpClientCode = row.Cells[1].String()
customer.Name = row.Cells[2].String() customer.Name = row.Cells[2].String()
//存在的记录跳过 //去数据库检测是否已经同步过了
if logic.CheckCustomerExist(customer.ErpId) { if exist := logic.CheckCustomerExist(customer.ErpId); exist {
i++
fmt.Println("已存在,跳过")
fmt.Println(i)
continue continue
} }
if err = logic.InsertCustomer(customer); err != nil { message := make(map[string]interface{})
fmt.Println(err) message = map[string]interface{}{
break "type": "save",
} "data": map[string]string{
//插入成功后还要去请求后端接口同步数据 "FID": customer.ErpId,
SyncCustomerData(customer) "FNUMBER": customer.ErpClientCode,
} "CFNAME": customer.Name,
} },
}
//同步数据
func SyncCustomerData(customer model.Customer) bool {
var (
resp *req.Resp
url string
respData common.Response
err error
)
param := req.Param{
"erp_client_sn": customer.ErpClientCode,
"customer_name": customer.Name,
"erp_customer_id": customer.ErpId,
"admin_name": "admin@ichunt.com",
"admin_id": 1000,
} }
url = configs.BasicApiUrl + "/basic/api/ApiInsertCustomerInfo" data, err := json.Marshal(message)
req.Debug = true
resp, err = req.Post(url, param)
if err != nil { if err != nil {
logSyncErrorToDataBase(customer.ErpId, err.Error())
return false
}
if err = resp.ToJSON(&respData); err != nil {
logSyncErrorToDataBase(customer.ErpId, err.Error())
return false
}
if respData.Errcode != 101100 {
logSyncErrorToDataBase(customer.ErpId, respData.Errmsg)
return false
}
//都没问题,代表后端那边已经成功修改,修改同步表的状态
if err = logic.SyncCustomerSuccess(customer.ErpId); err != nil {
fmt.Println(err) fmt.Println(err)
} }
} body := string(data)
fmt.Println(message)
func logSyncErrorToDataBase(erpId, syncError string) { rabbitmq.Send(Exchange, body)
var err error }
//请求失败的话,将原因存起来
if err = logic.WriteCustomerSyncError(erpId, syncError); err != nil {
//写数据失败,记录到日志
fmt.Println(err)
} }
} }
package main package main
import "scm_server/cmd/source/supplier"
func main() { func main() {
//forever := make(chan bool) //forever := make(chan bool)
//go supplier.Import() //go supplier.Import()
supplier.Import()
//<-forever //<-forever
//customer.Import() //customer.Import()
//goods.Import() //goods.Import()
......
package supplier package supplier
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/imroc/req" "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
"github.com/tealeg/xlsx" "github.com/tealeg/xlsx"
"scm_server/configs"
"scm_server/internal/common"
"scm_server/internal/logic" "scm_server/internal/logic"
"scm_server/internal/model" "scm_server/internal/model"
) )
var Exchange rabbitmq.QueueExchange
//导入委托方的信息(excel导入)
func init() {
Exchange = rabbitmq.QueueExchange{
"store_customer",
"store_customer",
"store",
"direct",
"amqp://huntadmin:jy2y2900@192.168.1.237:5672/",
}
}
//导入委托方的信息(excel导入) //导入委托方的信息(excel导入)
func Import() { func Import() {
var ( var (
...@@ -23,71 +35,37 @@ func Import() { ...@@ -23,71 +35,37 @@ func Import() {
if err != nil { if err != nil {
fmt.Printf("open failed: %s\n", err) fmt.Printf("open failed: %s\n", err)
} }
i := 0
for _, sheet := range xlFile.Sheets { for _, sheet := range xlFile.Sheets {
for key, row := range sheet.Rows { for key, row := range sheet.Rows {
if key > 0 { if key > 0 {
supplier.ErpId = row.Cells[0].String() supplier.ErpId = row.Cells[0].String()
supplier.ErpSupplierCode = row.Cells[1].String() supplier.ErpSupplierCode = row.Cells[1].String()
supplier.Name = row.Cells[2].String() supplier.Name = row.Cells[2].String()
//先去查询是否存在,不存在才去插入 //去数据库检测是否已经同步过了
if logic.CheckSupplierExist(supplier.ErpId) { if exist := logic.CheckSupplierExist(supplier.ErpId); exist {
continue i++
} fmt.Println("已存在,跳过")
if err = logic.InsertSupplier(supplier); err != nil { fmt.Println(i)
fmt.Println(err)
continue
}
//插入成功后还要去请求后端接口同步数据
if !SyncSupplierData(supplier) {
continue continue
} }
message := make(map[string]interface{})
message = map[string]interface{}{
"type": "save",
"data": map[string]string{
"FID": supplier.ErpId,
"FNUMBER": supplier.ErpSupplierCode,
"CFNAME": supplier.Name,
},
} }
} data, err := json.Marshal(message)
}
}
//同步数据
func SyncSupplierData(supplier model.Supplier) bool {
var (
resp *req.Resp
url string
respData common.Response
err error
)
param := req.Param{
"erp_supplier_sn": supplier.ErpSupplierCode,
"supplier_name": supplier.Name,
"erp_supplier_id": supplier.ErpId,
"admin_name": "admin@ichunt.com",
"admin_id": 1000,
}
url = configs.BasicApiUrl + "/basic/api/ApiInsertSupplierInfo"
req.Debug = true
resp, err = req.Post(url, param)
if err != nil { if err != nil {
logSyncErrorToDataBase(supplier.ErpId, err.Error()) fmt.Println(err)
return false
}
if err = resp.ToJSON(&respData); err != nil {
logSyncErrorToDataBase(supplier.ErpId, err.Error())
return false
} }
if respData.Errcode != 101100 { body := string(data)
logSyncErrorToDataBase(supplier.ErpId, respData.Errmsg) fmt.Println(message)
return false rabbitmq.Send(Exchange, body)
} }
//都没问题,代表后端那边已经成功修改,修改同步表的状态
if err = logic.SyncSupplierSuccess(supplier.ErpId); err != nil {
fmt.Println(err)
} }
return true
}
func logSyncErrorToDataBase(erpId, syncError string) {
var err error
//请求失败的话,将原因存起来
if err = logic.WriteSupplierSyncError(erpId, syncError); err != nil {
//写数据失败,记录到日志
fmt.Println(err)
} }
} }
...@@ -10,8 +10,5 @@ const InsertSyncLogSql = "INSERT INTO `lie_sync_log` (`sync_name`,`sync_time`,`s ...@@ -10,8 +10,5 @@ const InsertSyncLogSql = "INSERT INTO `lie_sync_log` (`sync_name`,`sync_time`,`s
//插入错误日志 //插入错误日志
func InsertSyncLog(syncLog model.SyncLog) (err error) { func InsertSyncLog(syncLog model.SyncLog) (err error) {
_, err = dao.GetDb().Exec(InsertSyncLogSql, syncLog.SyncName, syncLog.SyncTime, syncLog.SyncError, syncLog.UniqueId, syncLog.QueueMessage, syncLog.AddTime) _, err = dao.GetDb().Exec(InsertSyncLogSql, syncLog.SyncName, syncLog.SyncTime, syncLog.SyncError, syncLog.UniqueId, syncLog.QueueMessage, syncLog.AddTime)
if err != nil { return err
return
}
return nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment