Commit 70d2087d by Joneq

完成开发

parent 56751931
Showing with 69 additions and 23 deletions
...@@ -3,12 +3,14 @@ package main ...@@ -3,12 +3,14 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"github.com/hprose/hprose-golang/rpc"
"github.com/ichunt2019/cfg/lib" "github.com/ichunt2019/cfg/lib"
"github.com/tidwall/gjson"
"golang-asynctask/app/dao/scm_data_dao" "golang-asynctask/app/dao/scm_data_dao"
"runtime"
"sync" "sync"
"time" "time"
"github.com/hprose/hprose-golang/rpc" "xorm.io/xorm"
"github.com/tidwall/gjson"
) )
var ( var (
...@@ -44,28 +46,22 @@ func init(){ ...@@ -44,28 +46,22 @@ func init(){
return return
} }
var currentDatetime int64
//同步海关信息至中间库 //同步海关信息至中间库
func main() { func main() {
runtime.GOMAXPROCS(5)
//获取当天时间 //获取当天时间
//var currentDate = time.Now().Format("2006-01-02") var currentDate = time.Now().Format("2006-01-02")
//获取当前时间
currentDatetime = getSysLogLastTIme(scm_data_dao.GetScmDataDb())
fmt.Println(currentDatetime)
//循环获取表名,然后获取表内当天信息,并且存入mysql,协程处理
for _,v := range syncTable {
var startTime int64 = 1577808000 wg.Add(1)
var nowtime = time.Now().Unix() handleCurrentTable(v,currentDate)
for startTime < nowtime {
currentDate := time.Unix(startTime,0).Format("2006-01-02")
for _,v := range syncTable {
//wg.Add(1)
handleCurrentTable(v,currentDate)
}
startTime+=86400
} }
//wg.Wait() wg.Wait()
} }
...@@ -74,9 +70,9 @@ func main() { ...@@ -74,9 +70,9 @@ func main() {
//处理当前的这个表任务 //处理当前的这个表任务
func handleCurrentTable(tableName,currentDate string) { func handleCurrentTable(tableName,currentDate string) {
//defer wg.Done() defer wg.Done()
//fmt.Println(currentDate+"开始同步"+tableName) fmt.Println(currentDate+"开始同步"+tableName)
currentJson := requestTableInfo(tableName,currentDate) currentJson := requestTableInfo(tableName,currentDate)
if currentJson == "" { if currentJson == "" {
...@@ -85,14 +81,17 @@ func handleCurrentTable(tableName,currentDate string) { ...@@ -85,14 +81,17 @@ func handleCurrentTable(tableName,currentDate string) {
//var sqlInsert string = "insert into "+ tableName //var sqlInsert string = "insert into "+ tableName
sqlDb := scm_data_dao.GetScmDataDb() sqlDb := scm_data_dao.GetScmDataDb()
var currentKey,valueString,currentNum string var currentKey,valueString,currentNum,dataUpdateTime string
var currentString []interface{} var currentString []interface{}
var timeUnixKey int64
for _, res := range gjson.Parse(currentJson).Array() { for _, res := range gjson.Parse(currentJson).Array() {
currentString = currentString[0:0] currentString = currentString[0:0]
currentKey = "" currentKey = ""
valueString = "" valueString = ""
currentNum = "" currentNum = ""
dataUpdateTime = ""
timeUnixKey = 0
currentString = append(currentString,"") currentString = append(currentString,"")
//sqlkey //sqlkey
...@@ -103,24 +102,47 @@ func handleCurrentTable(tableName,currentDate string) { ...@@ -103,24 +102,47 @@ func handleCurrentTable(tableName,currentDate string) {
currentKey += key.String()+"," currentKey += key.String()+","
valueString += "?," valueString += "?,"
currentString = append(currentString,value.String()) currentString = append(currentString,value.String())
if key.String() == "number" && value.String() != "" { if key.String() == "number" && value.String() != "" {
currentNum = value.String() currentNum = value.String()
} }
if key.String() == "data_update_time" && value.String() != "" {
dataUpdateTime = value.String()
}
} }
return true return true
}) })
timeLocal,_ := time.LoadLocation("Local")
timeNow,err := time.ParseInLocation("2006-01-02 15:04:05",dataUpdateTime,timeLocal)
if err != nil {
timeUnixKey = timeNow.Unix()
}
//如果当前表是日志表,小于当前时间的都跳过
if tableName == "SYS_OPERATE_LOG" && timeUnixKey < currentDatetime {
continue
}
//如果当前num 不为空,就删除掉之前的,然后新增 //如果当前num 不为空,就删除掉之前的,然后新增
if currentNum != "" && tableName != "SYS_OPERATE_LOG" { if currentNum != "" && tableName != "SYS_OPERATE_LOG" {
if getRowExsist(sqlDb,tableName,currentNum,dataUpdateTime) {
continue
}
//如果查询到了,也跳过
sqlDb.Exec("delete from "+tableName+" where number = ?",currentNum) sqlDb.Exec("delete from "+tableName+" where number = ?",currentNum)
} }
currentKey+="data_sync_time," currentKey+="data_sync_time,"
valueString += "?," valueString += "?,"
currentString = append(currentString,time.Now().Format("2006-01-02")) currentString = append(currentString,time.Now().Format("2006-01-02"))
currentString[0] = `INSERT INTO `+tableName+` (`+currentKey[:len(currentKey)-1]+`)VALUES (`+valueString[:len(valueString)-1]+`)` currentString[0] = `INSERT INTO `+tableName+` (`+currentKey[:len(currentKey)-1]+`)VALUES (`+valueString[:len(valueString)-1]+`)`
_, err := sqlDb.Exec(currentString...) _, err = sqlDb.Exec(currentString...)
if err != nil { if err != nil {
fmt.Println(currentString) fmt.Println(currentString)
...@@ -130,6 +152,30 @@ func handleCurrentTable(tableName,currentDate string) { ...@@ -130,6 +152,30 @@ func handleCurrentTable(tableName,currentDate string) {
} }
//获取当前日志表的最后添加时间
func getSysLogLastTIme(db *xorm.Engine)int64 {
type LastTime struct {
DataUpdateTime time.Time `xorm:"DATETIME"`
}
var lasttime LastTime
db.Table("SYS_OPERATE_LOG").OrderBy("data_update_time desc").Cols("data_update_time").Get(&lasttime)
return lasttime.DataUpdateTime.Unix()
}
//查询这个表的当前行是否存在
func getRowExsist(db *xorm.Engine,table,num,datetime string)bool {
var(
number string
)
_,err:=db.Table(table).Where("number=?",num).Where("data_update_time=?",datetime).Cols("number").Get(&number)
if err != nil || number=="" {
return false
}
return true
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment