Commit 6f81e9ac by 孙龙

up

parent 9735af3d
package config package crm_comuser
import ( import (
"github.com/ichunt2019/cfg/lib" "github.com/ichunt2019/cfg/lib"
......
package config package crm_comuser
import ( import (
"github.com/ichunt2019/cfg/lib" "github.com/ichunt2019/cfg/lib"
......
package dao package crm
import ( import (
"golang-asynctask/app/common/config/crm_comuser"
"sync" "sync"
"time" "time"
"xorm.io/xorm" "xorm.io/xorm"
...@@ -9,7 +10,6 @@ import ( ...@@ -9,7 +10,6 @@ import (
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/ichunt2019/cfg/lib" "github.com/ichunt2019/cfg/lib"
redisPool "github.com/ichunt2019/go-redis-pool" redisPool "github.com/ichunt2019/go-redis-pool"
"golang-asynctask/app/common/config"
) )
var ( var (
...@@ -42,8 +42,8 @@ func mysqlSetup(d *dao) *dao { ...@@ -42,8 +42,8 @@ func mysqlSetup(d *dao) *dao {
var ( var (
err error err error
) )
DatabaseList := config.BuildDatabaseList() DatabaseList := crm_comuser.BuildDatabaseList()
GroupDatabaseList := config.BuildGroupDatabaseList() GroupDatabaseList := crm_comuser.BuildGroupDatabaseList()
if len(DatabaseList) > 0 { if len(DatabaseList) > 0 {
for conName, db := range DatabaseList { for conName, db := range DatabaseList {
d.db[conName], err = xorm.NewEngine("mysql", db.DataSourceName) d.db[conName], err = xorm.NewEngine("mysql", db.DataSourceName)
...@@ -82,7 +82,7 @@ func mysqlSetup(d *dao) *dao { ...@@ -82,7 +82,7 @@ func mysqlSetup(d *dao) *dao {
func redisSetup(d *dao) *dao { func redisSetup(d *dao) *dao {
var err error var err error
redisGroupList := config.BuildRedisGroupConfigs() redisGroupList := crm_comuser.BuildRedisGroupConfigs()
//fmt.Println(redisGroupList) //fmt.Println(redisGroupList)
for redisServerName, redisInfo := range redisGroupList { for redisServerName, redisInfo := range redisGroupList {
d.redisGroup[redisServerName], err = redisPool.NewHA(&redisPool.HAConfig{ d.redisGroup[redisServerName], err = redisPool.NewHA(&redisPool.HAConfig{
......
package dao package dao
import "fmt" import (
"fmt"
"golang-asynctask/app/dao/crm"
)
func GetUser(){ func GetUser(){
fmt.Println("555555555555555555555555555555555") fmt.Println("555555555555555555555555555555555")
res,err :=Dao.GetDb("micro").QueryString("select service_name from lie_service_info where is_delete = 1 ") res,err := crm.Dao.GetDb("micro").QueryString("select service_name from lie_service_info where is_delete = 1 ")
if err != nil{ if err != nil{
} }
fmt.Println(res) fmt.Println(res)
res,err =Dao.GetDb("micro").QueryString("select service_desc from lie_service_info where is_delete = 1 ") res,err = crm.Dao.GetDb("micro").QueryString("select service_desc from lie_service_info where is_delete = 1 ")
if err != nil{ if err != nil{
} }
...@@ -21,7 +24,7 @@ func GetUser(){ ...@@ -21,7 +24,7 @@ func GetUser(){
//fmt.Println(a.Result()) //fmt.Println(a.Result())
// //
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef")) //fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef")) fmt.Println(crm.Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef")) //fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef")) //fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef")) //fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
...@@ -30,7 +33,7 @@ func GetUser(){ ...@@ -30,7 +33,7 @@ func GetUser(){
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef")) //fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
fmt.Println(Dao.GetRedisDbGroup("sku").Get("abcdef").String()) fmt.Println(crm.Dao.GetRedisDbGroup("sku").Get("abcdef").String())
//fmt.Println("555555555555555555555555555555555") //fmt.Println("555555555555555555555555555555555")
} }
package crm
import (
"encoding/json"
"github.com/ichunt2019/cfg/lib"
//"fmt"
"github.com/syyongx/php2go"
"github.com/tidwall/gjson"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
)
type comUser struct {
ComId int64 `json:"com_id"`
UserId int64 `json:"user_id"`
SaleId int64 `json:"sale_id"`
}
func DispenseMsg(data string) (err error){
//database := gjson.Parse(data).Get("database").String()
table := gjson.Parse(data).Get("table").String()
action_type := gjson.Parse(data).Get("type").String()
olddata := gjson.Parse(data).Get("old").Map()
//fmt.Println(database,table,action_type,olddata)
//需要关心的操作
action_type_slice := []string{"update","insert"}
//需要关心的字段
lie_invoice_company_field := []string{"com_name","com_addr","com_tel","com_tax_registration","com_bank","com_bank_num"}
lie_invoice_com_user_field := []string{"com_id","user_id","sale_id"}
pushMsg := comUser{}
if table == "lie_invoice_company" && php2go.InArray(action_type,action_type_slice){
for field,_ := range olddata{
if php2go.InArray(field,lie_invoice_company_field){
//分发
pushMsg.ComId = gjson.Parse(data).Get("data.id").Int()
data,err := json.Marshal(&pushMsg)
if err == nil{
DispenseMsgToMq(string(data))
}
break
}
}
}
if table == "lie_invoice_com_user" && php2go.InArray(action_type,action_type_slice){
for field,_ := range olddata{
if php2go.InArray(field,lie_invoice_com_user_field){
//分发
pushMsg.ComId = gjson.Parse(data).Get("data.com_id").Int()
pushMsg.UserId = gjson.Parse(data).Get("data.user_id").Int()
pushMsg.SaleId = gjson.Parse(data).Get("data.sale_id").Int()
data,err := json.Marshal(&pushMsg)
if err == nil{
DispenseMsgToMq(string(data))
}
break
}
}
}
return err
}
func DispenseMsgToMq(msg string) (err error){
queueExchange := rabbitmq.QueueExchange{
lib.Instance("config").GetString("rabbitmq_crm_comuser.queue_name"),
lib.Instance("config").GetString("rabbitmq_crm_comuser.routing_key"),
lib.Instance("config").GetString("rabbitmq_crm_comuser.exchange"),
lib.Instance("config").GetString("rabbitmq_crm_comuser.type"),
lib.Instance("config").GetString("rabbitmq_crm_comuser.dns"),
}
rabbitmq.Send(queueExchange,msg)
return
}
\ No newline at end of file
...@@ -9,7 +9,7 @@ func Init(configPath string,logPath string)(err error){ ...@@ -9,7 +9,7 @@ func Init(configPath string,logPath string)(err error){
if err != nil{ if err != nil{
panic(err) panic(err)
} }
xlog.Init(logPath,"crmuser_to_erp") xlog.Init(logPath,"crmuser_to_erp","recv_comuser")
//初始化数据库 //初始化数据库
//dao.Init() //dao.Init()
......
package main
import (
"flag"
"golang-asynctask/boot/crm_comuser_boot"
"os"
"os/signal"
"syscall"
"golang-asynctask/cmd/crm/recv_comuser/service"
)
var (
configPath string
logPath string
)
func main(){
flag.StringVar(&configPath, "config", "./config/dev/", "配置文件")
flag.StringVar(&logPath, "logdir", "./logs/", "日志文件存储目录")
flag.Parse()
boot.Init(configPath,logPath)
service.Listen()
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
package service package service
import ( import (
"github.com/ichunt2019/cfg/lib"
logger "github.com/ichunt2019/lxLog/log" logger "github.com/ichunt2019/lxLog/log"
"golang-asynctask/app/dao/crm" //"golang-asynctask/app/dao/crm"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
service_crm "golang-asynctask/app/service/crm"
) )
type RecvPro struct { type RecvPro struct {
...@@ -13,7 +16,8 @@ type RecvPro struct { ...@@ -13,7 +16,8 @@ type RecvPro struct {
func (t *RecvPro) Consumer(dataByte []byte) error { func (t *RecvPro) Consumer(dataByte []byte) error {
//fmt.Println(string(dataByte)) //fmt.Println(string(dataByte))
logger.Instance("crmuser_to_erp").Info(string(dataByte)) logger.Instance("crmuser_to_erp").Info(string(dataByte))
error := crm.PushComUserInfoToErp(string(dataByte)) //error := crm.PushComUserInfoToErp(string(dataByte))
error := service_crm.DispenseMsg(string(dataByte))
return error return error
} }
...@@ -25,3 +29,16 @@ func (t *RecvPro) FailAction(err error,dataByte []byte) error { ...@@ -25,3 +29,16 @@ func (t *RecvPro) FailAction(err error,dataByte []byte) error {
logger.Instance("crmuser_to_erp").Error("错误原因:%s",err) logger.Instance("crmuser_to_erp").Error("错误原因:%s",err)
return nil return nil
} }
func Listen(){
t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.queue_name"),
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.routing_key"),
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.exchange"),
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.type"),
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.dns"),
},t,1)
}
...@@ -2,14 +2,13 @@ package main ...@@ -2,14 +2,13 @@ package main
import ( import (
"flag" "flag"
"golang-asynctask/cmd/crm/sync_comuser_to_erp/service"
"golang-asynctask/boot"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"golang-asynctask/boot"
"golang-asynctask/cmd/crm/sync_comuser_to_erp/service"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
) )
var ( var (
configPath string configPath string
...@@ -23,15 +22,15 @@ func main(){ ...@@ -23,15 +22,15 @@ func main(){
flag.Parse() flag.Parse()
boot.Init(configPath,logPath) boot.Init(configPath,logPath)
t := &service.RecvPro{} service.Listen()
rabbitmq.Recv(rabbitmq.QueueExchange{ //rabbitmq.Recv(rabbitmq.QueueExchange{
"a-maxwell", // "a-maxwell",
"maxwell.lie_invoice_company", // "maxwell.lie_invoice_company",
"", // "",
"direct", // "direct",
"amqp://guest:guest@192.168.2.232:5672/", // "amqp://guest:guest@192.168.2.232:5672/",
},t,1) //},t,1)
quit := make(chan os.Signal) quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
......
package service
import (
"github.com/ichunt2019/cfg/lib"
logger "github.com/ichunt2019/lxLog/log"
//"golang-asynctask/app/dao/crm"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
service_crm "golang-asynctask/app/service/crm"
)
type RecvPro struct {
}
//// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db
func (t *RecvPro) Consumer(dataByte []byte) error {
//fmt.Println(string(dataByte))
logger.Instance("crmuser_to_erp").Info(string(dataByte))
//error := crm.PushComUserInfoToErp(string(dataByte))
error := service_crm.DispenseMsg(string(dataByte))
return error
}
//消息已经消费3次 失败了 请进行处理
func (t *RecvPro) FailAction(err error,dataByte []byte) error {
logger.Instance("crmuser_to_erp").Error("任务处理失败了,我要进入db日志库了")
logger.Instance("crmuser_to_erp").Error("任务处理失败了,发送钉钉消息通知主人")
logger.Instance("crmuser_to_erp").Error(string(dataByte))
logger.Instance("crmuser_to_erp").Error("错误原因:%s",err)
return nil
}
func Listen(){
t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.queue_name"),
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.routing_key"),
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.exchange"),
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.type"),
lib.Instance("config").GetString("rabbitmq_crm_comuser_fenfa.dns"),
},t,1)
}
...@@ -12,6 +12,21 @@ bio = "GitHub Cofounder & CEO\nLikes tater tots and beer." ...@@ -12,6 +12,21 @@ bio = "GitHub Cofounder & CEO\nLikes tater tots and beer."
dob = 1979-05-27T07:32:00Z # 日期时间是一等公民。为什么不呢? dob = 1979-05-27T07:32:00Z # 日期时间是一等公民。为什么不呢?
[rabbitmq_crm_comuser_fenfa]
queue_name="a-maxwell"
routing_key="maxwell.lie_invoice_company"
exchange=""
type="direct"
dns="amqp://guest:guest@192.168.2.232:5672/"
[rabbitmq_crm_comuser]
queue_name="crm_comusertoerp"
routing_key="crm_comusertoerp"
exchange=""
type="direct"
dns="amqp://guest:guest@192.168.2.232:5672/"
[servers] [servers]
# 你可以依照你的意愿缩进。使用空格或Tab。TOML不会在意。 # 你可以依照你的意愿缩进。使用空格或Tab。TOML不会在意。
......
...@@ -5,7 +5,12 @@ viewpath = "/home/www/templates/" ...@@ -5,7 +5,12 @@ viewpath = "/home/www/templates/"
pushErpDomain = "http://crm.ichunt.net/api/pushComUserRelationToErp" pushErpDomain = "http://crm.ichunt.net/api/pushComUserRelationToErp"
apiKey = "crm a1b2c3d4e5f6g7h8i9jk" apiKey = "crm a1b2c3d4e5f6g7h8i9jk"
[rabbitmq_crm_comuser]
queue_name="maxwell"
routing_key="maxwell_liexin_crm.lie_invoice_company"
exchange=""
type="direct"
dns="amqp://WmsQue:EizM9I1TAxoO0tmzoOm@172.18.137.33:5672/"
...@@ -20,13 +20,14 @@ require ( ...@@ -20,13 +20,14 @@ require (
github.com/gorilla/sessions v1.2.1 // indirect github.com/gorilla/sessions v1.2.1 // indirect
github.com/ichunt2019/cfg v0.0.0-20210310074903-4b1bcab17717 github.com/ichunt2019/cfg v0.0.0-20210310074903-4b1bcab17717
github.com/ichunt2019/go-redis-pool v0.0.0-20210305064829-86b9011c57f5 github.com/ichunt2019/go-redis-pool v0.0.0-20210305064829-86b9011c57f5
github.com/ichunt2019/golang-rbmq-sl v0.0.0-20200515075131-59a37ab77d7d // indirect github.com/ichunt2019/golang-rbmq-sl v0.0.0-20200515075131-59a37ab77d7d
github.com/ichunt2019/ichunt-micro-registry v1.0.1 github.com/ichunt2019/ichunt-micro-registry v1.0.1
github.com/ichunt2019/lxLog v0.0.0-20210226024426-781becb3c042 github.com/ichunt2019/lxLog v0.0.0-20210226024426-781becb3c042
github.com/lib/pq v1.9.0 // indirect github.com/lib/pq v1.9.0 // indirect
github.com/mattn/go-sqlite3 v1.14.6 // indirect github.com/mattn/go-sqlite3 v1.14.6 // indirect
github.com/spf13/viper v1.7.1 github.com/spf13/viper v1.7.1
github.com/syndtr/goleveldb v1.0.0 // indirect github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/syyongx/php2go v0.9.4
github.com/tealeg/xlsx v1.0.5 // indirect github.com/tealeg/xlsx v1.0.5 // indirect
github.com/tidwall/gjson v1.6.8 github.com/tidwall/gjson v1.6.8
github.com/xormplus/builder v0.0.0-20200331055651-240ff40009be // indirect github.com/xormplus/builder v0.0.0-20200331055651-240ff40009be // indirect
......
...@@ -366,6 +366,7 @@ github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s ...@@ -366,6 +366,7 @@ github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/syyongx/php2go v0.9.4 h1:qUtETTHzqHzxZK8plkbkb0YawD8bpLpxNsbzHQmb22Y=
github.com/syyongx/php2go v0.9.4/go.mod h1:meN2eIhhUoxOd2nMxbpe8g6cFPXI5O9/UAAuz7oDdzw= github.com/syyongx/php2go v0.9.4/go.mod h1:meN2eIhhUoxOd2nMxbpe8g6cFPXI5O9/UAAuz7oDdzw=
github.com/tealeg/xlsx v1.0.5 h1:+f8oFmvY8Gw1iUXzPk+kz+4GpbDZPK1FhPiQRd+ypgE= github.com/tealeg/xlsx v1.0.5 h1:+f8oFmvY8Gw1iUXzPk+kz+4GpbDZPK1FhPiQRd+ypgE=
github.com/tealeg/xlsx v1.0.5/go.mod h1:btRS8dz54TDnvKNosuAqxrM1QgN1udgk9O34bDCnORM= github.com/tealeg/xlsx v1.0.5/go.mod h1:btRS8dz54TDnvKNosuAqxrM1QgN1udgk9O34bDCnORM=
......
...@@ -2,16 +2,16 @@ package lib ...@@ -2,16 +2,16 @@ package lib
import ( import (
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"xorm.io/xorm"
"golang-asynctask/app/common/config"
"github.com/ichunt2019/cfg/lib" "github.com/ichunt2019/cfg/lib"
"golang-asynctask/app/common/config/crm_comuser"
"xorm.io/xorm"
) )
var DatabaseConMap map[string]*xorm.Engine var DatabaseConMap map[string]*xorm.Engine
func Setup() error { func Setup() error {
DatabaseConMap = make(map[string]*xorm.Engine, 0) DatabaseConMap = make(map[string]*xorm.Engine, 0)
DatabaseList := config.BuildDatabaseList() DatabaseList := crm_comuser.BuildDatabaseList()
var err error var err error
//循环生成数据库链接 //循环生成数据库链接
for conName, db := range DatabaseList { for conName, db := range DatabaseList {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment