Commit e06f08d3 by gongyang

设置超时时间

parent a0cc2e53
...@@ -200,6 +200,8 @@ func RequestTableInfo(requestType string) string { ...@@ -200,6 +200,8 @@ func RequestTableInfo(requestType string) string {
var sub *Sub var sub *Sub
client := rpc.NewHTTPClient("http://119.23.228.186:50005/") client := rpc.NewHTTPClient("http://119.23.228.186:50005/")
outTime := time.Second * 120
client.SetTimeout(outTime)
client.UseService(&sub) client.UseService(&sub)
result, err := sub.Get_materialstatistic(`{"date":"` + strings.Replace(currentDate, "-", "", -1) + `","requestType":"` + requestType + `"}`) result, err := sub.Get_materialstatistic(`{"date":"` + strings.Replace(currentDate, "-", "", -1) + `","requestType":"` + requestType + `"}`)
......
...@@ -3,44 +3,44 @@ package main ...@@ -3,44 +3,44 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"github.com/hprose/hprose-golang/rpc"
"github.com/ichunt2019/cfg/lib"
"github.com/tidwall/gjson"
"golang-asynctask/app/dao/scm_data_dao" "golang-asynctask/app/dao/scm_data_dao"
"runtime" "runtime"
"sync" "sync"
"time" "time"
"github.com/hprose/hprose-golang/rpc"
"github.com/ichunt2019/cfg/lib"
"github.com/tidwall/gjson"
"xorm.io/xorm" "xorm.io/xorm"
) )
var ( var (
//需要同步的的表 //需要同步的的表
syncTable = []string{ syncTable = []string{
"Ent_balance_sheet","Ent_cash_flow", "Ent_balance_sheet", "Ent_cash_flow",
"Ent_profit", "Purchase_Order","SYS_OPERATE_LOG", "Ent_profit", "Purchase_Order", "SYS_OPERATE_LOG",
} }
//"Ent_balance_sheet","Ent_cash_flow", //"Ent_balance_sheet","Ent_cash_flow",
//"Ent_profit","Gathering_Account", //"Ent_profit","Gathering_Account",
//"HK_Customs_Declaration","Payment_Account", //"HK_Customs_Declaration","Payment_Account",
//"Purchase_Order","Sales_Order", //"Purchase_Order","Sales_Order",
//"SYS_OPERATE_LOG", //"SYS_OPERATE_LOG",
wg sync.WaitGroup wg sync.WaitGroup
) )
func init(){ func init() {
var ( var (
configPath string configPath string
logPath string logPath string
) )
flag.StringVar(&configPath, "config", "./config/dev/", "配置文件") flag.StringVar(&configPath, "config", "./config/dev/", "配置文件")
flag.StringVar(&logPath, "logdir", "./logs/", "日志文件存储目录") flag.StringVar(&logPath, "logdir", "./logs/", "日志文件存储目录")
flag.Parse() flag.Parse()
err := lib.Init(configPath) err := lib.Init(configPath)
if err != nil{ if err != nil {
panic(err) panic(err)
} }
return return
...@@ -48,8 +48,8 @@ func init(){ ...@@ -48,8 +48,8 @@ func init(){
var currentDatetime int64 var currentDatetime int64
//同步海关信息至中间库 // 同步海关信息至中间库
func main() { func main() {
//UpdateOldGoodsId() //UpdateOldGoodsId()
//panic(1) //panic(1)
...@@ -58,15 +58,15 @@ func main() { ...@@ -58,15 +58,15 @@ func main() {
//获取当天时间 //获取当天时间
var currentDate = time.Now().Format("2006-01-02") var currentDate = time.Now().Format("2006-01-02")
//获取当前时间 //获取当前时间
currentDatetime = getSysLogLastTIme(scm_data_dao.GetScmDataDb()) currentDatetime = getSysLogLastTIme(scm_data_dao.GetScmDataDb())
fmt.Println(currentDatetime) fmt.Println(currentDatetime)
//删除订单 //删除订单
deleteOrder(currentDate) deleteOrder(currentDate)
for _,v := range syncTable { for _, v := range syncTable {
wg.Add(1) wg.Add(1)
go handleCurrentTable(v,currentDate) go handleCurrentTable(v, currentDate)
} }
wg.Wait() wg.Wait()
} }
...@@ -91,10 +91,9 @@ func main() { ...@@ -91,10 +91,9 @@ func main() {
// } // }
//} //}
func deleteOrder(currentDate string) {
func deleteOrder(currentDate string) {
table := "Purchase_delete_Order" table := "Purchase_delete_Order"
jsonStr := requestTableInfo(table,currentDate) jsonStr := requestTableInfo(table, currentDate)
//[{"number": "CG210577669-B79595", "data_update_time": "2021-05-19 11:25:45"}] //[{"number": "CG210577669-B79595", "data_update_time": "2021-05-19 11:25:45"}]
sqlDb := scm_data_dao.GetScmDataDb() sqlDb := scm_data_dao.GetScmDataDb()
...@@ -107,24 +106,21 @@ func deleteOrder(currentDate string) { ...@@ -107,24 +106,21 @@ func deleteOrder(currentDate string) {
}) })
//删除采购订单中当前单号的,like单号的 //删除采购订单中当前单号的,like单号的
_,err :=sqlDb.Exec("delete from Purchase_Order where number = ? and data_update_time < ?",deleteKeyErp["number"],deleteKeyErp["data_update_time"]); _, err := sqlDb.Exec("delete from Purchase_Order where number = ? and data_update_time < ?", deleteKeyErp["number"], deleteKeyErp["data_update_time"])
_,err = sqlDb.Exec("delete from Purchase_Order where number like ? and data_update_time < ?",deleteKeyErp["number"]+"-%",deleteKeyErp["data_update_time"]); _, err = sqlDb.Exec("delete from Purchase_Order where number like ? and data_update_time < ?", deleteKeyErp["number"]+"-%", deleteKeyErp["data_update_time"])
if err != nil{ if err != nil {
fmt.Println(err) fmt.Println(err)
} }
} }
} }
// 处理当前的这个表任务
func handleCurrentTable(tableName, currentDate string) {
//处理当前的这个表任务
func handleCurrentTable(tableName,currentDate string) {
defer wg.Done() defer wg.Done()
fmt.Println(currentDate+"开始同步"+tableName) fmt.Println(currentDate + "开始同步" + tableName)
currentJson := requestTableInfo(tableName,currentDate) currentJson := requestTableInfo(tableName, currentDate)
if currentJson == "" { if currentJson == "" {
return return
...@@ -132,7 +128,7 @@ func handleCurrentTable(tableName,currentDate string) { ...@@ -132,7 +128,7 @@ func handleCurrentTable(tableName,currentDate string) {
//var sqlInsert string = "insert into "+ tableName //var sqlInsert string = "insert into "+ tableName
sqlDb := scm_data_dao.GetScmDataDb() sqlDb := scm_data_dao.GetScmDataDb()
var currentKey,valueString,currentNum,dataUpdateTime string var currentKey, valueString, currentNum, dataUpdateTime string
var currentString []interface{} var currentString []interface{}
var timeUnixKey int64 var timeUnixKey int64
for _, res := range gjson.Parse(currentJson).Array() { for _, res := range gjson.Parse(currentJson).Array() {
...@@ -144,7 +140,7 @@ func handleCurrentTable(tableName,currentDate string) { ...@@ -144,7 +140,7 @@ func handleCurrentTable(tableName,currentDate string) {
dataUpdateTime = "" dataUpdateTime = ""
timeUnixKey = 0 timeUnixKey = 0
currentString = append(currentString,"") currentString = append(currentString, "")
//sqlkey //sqlkey
res.ForEach(func(key, value gjson.Result) bool { res.ForEach(func(key, value gjson.Result) bool {
...@@ -155,7 +151,7 @@ func handleCurrentTable(tableName,currentDate string) { ...@@ -155,7 +151,7 @@ func handleCurrentTable(tableName,currentDate string) {
currentKey += key.String() + "," currentKey += key.String() + ","
valueString += "?," valueString += "?,"
currentString = append(currentString,value.String()) currentString = append(currentString, value.String())
if key.String() == "number" && value.String() != "" { if key.String() == "number" && value.String() != "" {
currentNum = value.String() currentNum = value.String()
...@@ -169,8 +165,8 @@ func handleCurrentTable(tableName,currentDate string) { ...@@ -169,8 +165,8 @@ func handleCurrentTable(tableName,currentDate string) {
return true return true
}) })
timeLocal,_ := time.LoadLocation("Local") timeLocal, _ := time.LoadLocation("Local")
timeNow,err := time.ParseInLocation("2006-01-02 15:04:05",dataUpdateTime,timeLocal) timeNow, err := time.ParseInLocation("2006-01-02 15:04:05", dataUpdateTime, timeLocal)
if err != nil { if err != nil {
timeUnixKey = timeNow.Unix() timeUnixKey = timeNow.Unix()
} }
...@@ -183,31 +179,31 @@ func handleCurrentTable(tableName,currentDate string) { ...@@ -183,31 +179,31 @@ func handleCurrentTable(tableName,currentDate string) {
//如果当前num 不为空,就删除掉之前的,然后新增 //如果当前num 不为空,就删除掉之前的,然后新增
if currentNum != "" && tableName != "SYS_OPERATE_LOG" { if currentNum != "" && tableName != "SYS_OPERATE_LOG" {
if getRowExsist(sqlDb,tableName,currentNum,dataUpdateTime) { if getRowExsist(sqlDb, tableName, currentNum, dataUpdateTime) {
continue continue
} }
//如果查询到了,也跳过 //如果查询到了,也跳过
sqlDb.Exec("delete from "+tableName+" where number = ?",currentNum) sqlDb.Exec("delete from "+tableName+" where number = ?", currentNum)
} }
currentKey+="data_sync_time," currentKey += "data_sync_time,"
valueString += "?," valueString += "?,"
currentString = append(currentString,time.Now().Format("2006-01-02 15:04:05")) currentString = append(currentString, time.Now().Format("2006-01-02 15:04:05"))
currentString[0] = `INSERT INTO `+tableName+` (`+currentKey[:len(currentKey)-1]+`)VALUES (`+valueString[:len(valueString)-1]+`)` currentString[0] = `INSERT INTO ` + tableName + ` (` + currentKey[:len(currentKey)-1] + `)VALUES (` + valueString[:len(valueString)-1] + `)`
_, err = sqlDb.Exec(currentString...) _, err = sqlDb.Exec(currentString...)
if err != nil { if err != nil {
fmt.Println(currentString) fmt.Println(currentString)
fmt.Println(err.Error()+tableName) fmt.Println(err.Error() + tableName)
} }
} }
} }
//获取当前日志表的最后添加时间 // 获取当前日志表的最后添加时间
func getSysLogLastTIme(db *xorm.Engine)int64 { func getSysLogLastTIme(db *xorm.Engine) int64 {
type LastTime struct { type LastTime struct {
DataUpdateTime time.Time `xorm:"DATETIME"` DataUpdateTime time.Time `xorm:"DATETIME"`
} }
...@@ -216,75 +212,68 @@ func getSysLogLastTIme(db *xorm.Engine)int64 { ...@@ -216,75 +212,68 @@ func getSysLogLastTIme(db *xorm.Engine)int64 {
return lasttime.DataUpdateTime.Unix() return lasttime.DataUpdateTime.Unix()
} }
//查询这个表的当前行是否存在 // 查询这个表的当前行是否存在
func getRowExsist(db *xorm.Engine,table,num,datetime string)bool { func getRowExsist(db *xorm.Engine, table, num, datetime string) bool {
var( var (
number string number string
) )
_,err:=db.Table(table).Where("number=?",num).Where("data_update_time=?",datetime).Cols("number").Get(&number) _, err := db.Table(table).Where("number=?", num).Where("data_update_time=?", datetime).Cols("number").Get(&number)
if err != nil || number=="" { if err != nil || number == "" {
return false return false
} }
return true return true
} }
// 20210812 修改旧的商品ID
//20210812 修改旧的商品ID func UpdateOldGoodsId() {
func UpdateOldGoodsId() {
type Sub struct { type Sub struct {
GetCustomsDockingData func(string) (string, error) GetCustomsDockingData func(string) (string, error)
} }
var ( var (
sub *Sub sub *Sub
numArr []string numArr []string
) )
client := rpc.NewHTTPClient("http://172.18.137.19:50005/") client := rpc.NewHTTPClient("http://172.18.137.19:50005/")
outTime := time.Second * 120
client.SetTimeout(outTime)
client.UseService(&sub) client.UseService(&sub)
sqlDb := scm_data_dao.GetScmDataDb() sqlDb := scm_data_dao.GetScmDataDb()
sqlDb.Table("Purchase_Order").Cols("number").Find(&numArr) sqlDb.Table("Purchase_Order").Cols("number").Find(&numArr)
for _, v := range numArr {
for _,v:=range numArr{
startTime := time.Now() startTime := time.Now()
result,_ := sub.GetCustomsDockingData(`{"tableName":"Purchase_Order2","bizDate":"`+v+`"}`) result, _ := sub.GetCustomsDockingData(`{"tableName":"Purchase_Order2","bizDate":"` + v + `"}`)
fmt.Println("正在修改"+v) fmt.Println("正在修改" + v)
fmt.Println("获取数据时间",time.Since(startTime)) fmt.Println("获取数据时间", time.Since(startTime))
if gjson.Get(result,"0.goods_id").String() != "" { if gjson.Get(result, "0.goods_id").String() != "" {
sqlDb.Exec("update Purchase_Order set goods_id = '"+gjson.Get(result,"0.goods_id").String()+"' where number ='"+v+"'") sqlDb.Exec("update Purchase_Order set goods_id = '" + gjson.Get(result, "0.goods_id").String() + "' where number ='" + v + "'")
} }
fmt.Println("修改时间",time.Since(startTime)) fmt.Println("修改时间", time.Since(startTime))
} }
} }
// 获取表信息,返回byte
// request example //http://119.23.228.186:50005/ getCustomsDockingData {"tableName":"SYS_OPERATE_LOG","bizDate":"2021-05-07"}
func requestTableInfo(tableName, currentDate string) string {
//获取表信息,返回byte
//request example //http://119.23.228.186:50005/ getCustomsDockingData {"tableName":"SYS_OPERATE_LOG","bizDate":"2021-05-07"}
func requestTableInfo(tableName,currentDate string)string {
type Sub struct { type Sub struct {
GetCustomsDockingData func(string) (string, error) GetCustomsDockingData func(string) (string, error)
} }
var sub *Sub var sub *Sub
client := rpc.NewHTTPClient("http://119.23.228.186:50005/") client := rpc.NewHTTPClient("http://119.23.228.186:50005/")
time := time.Second * 120 outTime := time.Second * 120
client.SetTimeout(time) client.SetTimeout(outTime)
client.UseService(&sub) client.UseService(&sub)
result,err := sub.GetCustomsDockingData(`{"tableName":"`+tableName+`","bizDate":"`+currentDate+`"}`) result, err := sub.GetCustomsDockingData(`{"tableName":"` + tableName + `","bizDate":"` + currentDate + `"}`)
if err != nil { if err != nil {
fmt.Println(tableName+"请求服务器出错"+err.Error()) fmt.Println(tableName + "请求服务器出错" + err.Error())
return "" return ""
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment