Commit 9191b5ed by Joneq
parents 5c153e3f b945be84
package wechat_yunxin
import (
"github.com/ichunt2019/cfg/lib"
)
type BaseDatabase struct {
DataSourceName string
MaxIdleCons int
MaxOpenCons int
Prefix string
}
type GroupDatabase struct {
DataSourceName []string
MaxIdleCons int
MaxOpenCons int
Prefix string
}
//单、主数据 master
func BuildDatabaseList() (DatabaseList map[string]BaseDatabase) {
return map[string]BaseDatabase{
"liexin": {
DataSourceName: lib.Instance("db").GetString("liexin.data_source_name"),
Prefix: lib.Instance("db").GetString("liexin.table_prefix"),
MaxIdleCons:lib.Instance("db").GetInt("liexin.max_idle_conn"),
MaxOpenCons:lib.Instance("db").GetInt("liexin.max_open_conn"),
},
}
}
//主从数据 master slave
func BuildGroupDatabaseList() (DatabaseList map[string]GroupDatabase) {
return map[string]GroupDatabase{
//"liexin": {
// DataSourceName:lib.Instance("db").GetStringSlice("liexin.data_source_name"),
// Prefix: lib.Instance("db").GetString("liexin.table_prefix"),
// MaxIdleCons:lib.Instance("db").GetInt("liexin.max_idle_conn"),
// MaxOpenCons:lib.Instance("db").GetInt("liexin.max_open_conn"),
//},
}
}
package wechat_yunxin
import (
"time"
)
type RedisGroupDatabase struct {
MasterHost string
Password string //master密码
SlaveHost []string
ReadonlyPassword string //从服务器密码
DialTimeout time.Duration //连接超时
MinIdleConns int //空间链接
ReadTimeout time.Duration //读超时
WriteTimeout time.Duration //写超时
}
//多数据库配置
func BuildRedisGroupConfigs() ( map[string]RedisGroupDatabase) {
//fmt.Println("6666666666")
//fmt.Println(lib.Instance("redis").GetStringSlice("api.slave.host"))
return map[string]RedisGroupDatabase{
"api": {
//MasterHost: lib.Instance("redis").GetString("api.master.host"),
//Password:lib.Instance("redis").GetString("api.master.password"),
//SlaveHost: lib.Instance("redis").GetStringSlice("api.slave.host"),
//ReadonlyPassword:lib.Instance("redis").GetString("api.slave.password"),
//DialTimeout: time.Duration(lib.Instance("redis").GetInt("api.dial_timeout"))*time.Second,
//ReadTimeout: time.Duration(lib.Instance("redis").GetInt("api.read_timeout"))*time.Second,
//WriteTimeout: time.Duration(lib.Instance("redis").GetInt("api.write_timeout"))*time.Second,
//MinIdleConns:lib.Instance("redis").GetInt("api.min_idle_conns"),
},
"sku": {
//MasterHost: lib.Instance("redis").GetString("sku.master.host"),
//Password:lib.Instance("redis").GetString("sku.master.password"),
//SlaveHost: lib.Instance("redis").GetStringSlice("sku.slave.host"),
//ReadonlyPassword:lib.Instance("redis").GetString("sku.slave.password"),
//DialTimeout: time.Duration(lib.Instance("redis").GetInt("sku.dial_timeout"))*time.Second,
//ReadTimeout: time.Duration(lib.Instance("redis").GetInt("sku.read_timeout"))*time.Second,
//WriteTimeout: time.Duration(lib.Instance("redis").GetInt("sku.write_timeout"))*time.Second,
//MinIdleConns:lib.Instance("redis").GetInt("sku.min_idle_conns"),
},
}
}
package wechat_yunxin
import (
"sync"
"xorm.io/xorm"
_ "github.com/go-sql-driver/mysql"
"github.com/ichunt2019/cfg/lib"
"golang-asynctask/app/common/config/wechat_yunxin"
)
var (
once sync.Once
Dao *dao
)
type dao struct {
db map[string]*xorm.Engine //非主从mysql数据库 连接池
}
//获取db实例
func (self *dao) GetDb(databases string) *xorm.Engine {
return self.db[databases]
}
func mysqlSetup(d *dao) *dao {
var (
err error
)
DatabaseList := wechat_yunxin.BuildDatabaseList()
if len(DatabaseList) > 0 {
for conName, db := range DatabaseList {
d.db[conName], err = xorm.NewEngine("mysql", db.DataSourceName)
if err != nil {
panic(err)
}
//日志打印SQL
ShowSql := lib.Instance("db").GetBool("xorm.ShowSQL")
d.db[conName].ShowSQL(ShowSql)
//设置连接池的空闲数大小
d.db[conName].SetMaxIdleConns(db.MaxIdleCons)
//设置最大打开连接数
d.db[conName].SetMaxOpenConns(db.MaxOpenCons)
}
}
return d
}
func Init() {
Dao = &dao{}
once.Do(func() {
//单、主数据 master
Dao.db = make(map[string]*xorm.Engine, 0)
Dao = mysqlSetup(Dao)
})
}
package wechat_yunxin
type WechatOauth struct {
OauthType int8 `json:"oauth_type"`
OpenId string `json:"open_id"`
BindTime int `json:"bind_time"`
OauthHead string `json:"oauth_head"`
OauthNickname string `json:"oauth_nickname"`
OauthStatus int8 `json:"oauth_status"`
UnionId string `json:"union_id"`
}
\ No newline at end of file
......@@ -36,7 +36,7 @@ func DispenseMsg(data string) (err error){
action_type_slice := []string{"update","insert"}
//需要关心的字段
lie_invoice_company_field := []string{"com_name","com_addr","com_tel","com_tax_registration","com_bank","com_bank_num"}
lie_invoice_company_field := []string{"com_name","com_addr","com_tel","com_tax_registration","com_bank","com_bank_num","status"}
lie_invoice_com_user_field := []string{"com_id","user_id","sale_id"}
pushMsg := comUser{}
......
package wechat_yunxin
import (
"encoding/json"
"errors"
"github.com/tidwall/gjson"
"golang-asynctask/app/dao/wechat_yunxin"
wechat_yunxin_model "golang-asynctask/app/model/wechat_yunxin"
"net/url"
"reflect"
"golang-asynctask/util/lib"
cfg "github.com/ichunt2019/cfg/lib"
)
func PushWechatToYunXin(data string) (err error) {
wechatModel := &wechat_yunxin_model.WechatOauth{}
err = json.Unmarshal([]byte(data),wechatModel)
if err != nil{
return
}
//插入数据
sql2 := "select count(id) as num from lie_wechat_oauth where open_id = ? "
existx,err := wechat_yunxin.Dao.GetDb("liexin").SQL(sql2,wechatModel.OpenId).QueryInterface()
if err != nil{
return
}
_type := reflect.TypeOf(existx[0]["num"])
if _type.Kind() == reflect.Int64 && existx[0]["num"].(int64) > 0 {
return nil
}
sql := "insert into lie_wechat_oauth (oauth_type,open_id,bind_time,oauth_head,oauth_nickname,oauth_status,union_id) values(?,?,?,?,?,?,?) "
_,err = wechat_yunxin.Dao.GetDb("liexin").Exec(sql,wechatModel.OauthType,wechatModel.OpenId,wechatModel.BindTime,wechatModel.OauthHead,wechatModel.OauthNickname,
wechatModel.OauthStatus,wechatModel.UnionId)
if(err != nil){
return err
}
//请求接口
push_url := cfg.Instance("config").GetString("pushWechatYunXinApi")
urlParams := url.Values{}
urlParams.Add("union_id",wechatModel.UnionId)
urlParams.Add("open_id",wechatModel.OpenId)
response,returnData,err := lib.HttpPOST(push_url,urlParams,0,nil,"")
//fmt.Println(response.Status)
//fmt.Println(string(returnData))
if err == nil && response.StatusCode == 200{
err_code := gjson.Parse(string(returnData)).Get("err_code").Int()
msg := gjson.Parse(string(returnData)).Get("err_msg").String()
if err_code != 0{
err = errors.New(msg)
}
}
return
}
package wechat_yunxin
import (
"github.com/ichunt2019/cfg/lib"
logger "github.com/ichunt2019/lxLog/log"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
)
type RecvPro struct {
}
//// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db
func (t *RecvPro) Consumer(dataByte []byte) error {
//fmt.Println(string(dataByte))
logger.Instance("wechat_yunxin").Info(string(dataByte))
error := PushWechatToYunXin(string(dataByte))
return error
}
//消息已经消费3次 失败了 请进行处理
func (t *RecvPro) FailAction(err error,dataByte []byte) error {
logger.Instance("crmuser_to_erp").Error("任务处理失败了,我要进入db日志库了")
logger.Instance("crmuser_to_erp").Error("任务处理失败了,发送钉钉消息通知主人")
logger.Instance("crmuser_to_erp").Error(string(dataByte))
logger.Instance("crmuser_to_erp").Error("错误原因:%s",err)
return nil
}
func Listen(){
t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{
lib.Instance("config").GetString("rabbitmq_ichunt_wechat_focus.queue_name"),
lib.Instance("config").GetString("rabbitmq_ichunt_wechat_focus.routing_key"),
lib.Instance("config").GetString("rabbitmq_ichunt_wechat_focus.exchange"),
lib.Instance("config").GetString("rabbitmq_ichunt_wechat_focus.type"),
lib.Instance("config").GetString("rabbitmq_ichunt_wechat_focus.dns"),
},t,1)
}
package boot
import (
"github.com/ichunt2019/cfg/lib"
xlog "github.com/ichunt2019/lxLog/log"
"golang-asynctask/app/dao/wechat_yunxin"
)
func Init(configPath string,logPath string)(err error){
err = lib.Init(configPath)
if err != nil{
panic(err)
}
xlog.Init(logPath,"wechat_yunxin")
//初始化数据库
wechat_yunxin.Init()
return
}
package main
import (
"flag"
"os"
"os/signal"
"syscall"
"golang-asynctask/boot/wechat_yunxin"
"golang-asynctask/app/service/wechat_yunxin"
)
var (
configPath string
logPath string
)
func main(){
flag.StringVar(&configPath, "config", "./config/dev/", "配置文件")
flag.StringVar(&logPath, "logdir", "./logs/", "日志文件存储目录")
flag.Parse()
boot.Init(configPath,logPath)
wechat_yunxin.Listen()
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
......@@ -7,6 +7,9 @@ crmSendMsgByconflictSale = "http://crm.liexin.net/api/sendMsgByconflictSale"
sendMsgByChangeInvoiceCom = "http://crm.liexin.net/api/sendMsgByChangeInvoiceCom"
apiKey = "crm a1b2c3d4e5f6g7h8i9jk"
#推送云芯 微信用户信息
pushWechatYunXinApi = "http://cloud.liexindev.net/inner/oauth/resetopenid"
[owner]
name = "Tom Preston-Werner"
organization = "GitHub"
......@@ -38,6 +41,14 @@ type="direct"
dns="amqp://guest:guest@192.168.2.232:5672/"
[rabbitmq_ichunt_wechat_focus]
queue_name="szichunt_wechat_userinfo"
routing_key="szichunt_wechat_userinfo"
exchange=""
type="direct"
dns="amqp://guest:guest@192.168.2.232:5672/"
[servers]
# 你可以依照你的意愿缩进。使用空格或Tab。TOML不会在意。
[servers.alpha]
......
......@@ -2,13 +2,24 @@
ShowSQL = true
[crm]
data_source_name = "liexin_crm:liexin_crm#zsyM@tcp(192.168.1.235:3306)/liexin_crm?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
data_source_name = "liexin_crm:liexin_crm#zsyM@tcp(192.168.1.235:3306)/liexin_crm?charset=utf8&parseTime=true&loc=Asia%2FShanghai"
max_open_conn = 0
max_idle_conn = 0
table_prefix = "lie_"
max_conn_life_time = 100
[liexin]
data_source_name = "liexin:liexin#zsyM@tcp(192.168.2.232:3306)/liexin?charset=utf8&parseTime=true&loc=Asia%2FShanghai"
max_open_conn = 0
max_idle_conn = 0
table_prefix = "lie_"
max_conn_life_time = 100
[liexin_demo]
max_open_conn = 20
max_idle_conn = 10
table_prefix = ""
......
......@@ -7,6 +7,9 @@ crmSendMsgByconflictSale = "http://crm.ichunt.net/api/sendMsgByconflictSale"
sendMsgByChangeInvoiceCom = "http://crm.ichunt.net/api/sendMsgByChangeInvoiceCom"
apiKey = "crm a1b2c3d4e5f6g7h8i9jk"
#推送云芯 微信用户信息
pushWechatYunXinApi = "http://cloud.ichunt.com/inner/oauth/resetopenid"
[rabbitmq_crm_comuser_fenfa]
queue_name="bus_crm_com_user_to_erp"
routing_key="maxwell_liexin_crm.lie_invoice_company "
......@@ -29,4 +32,12 @@ queue_name="crm_invoice_com_apply"
routing_key="crm_invoice_com_apply"
exchange=""
type="direct"
dns="amqp://WmsQue:EizM9I1TAxoO0tmzoOm@172.18.137.33:5672/"
[rabbitmq_ichunt_wechat_focus]
queue_name="ichunt_wechat_userinfo"
routing_key="ichunt_wechat_userinfo"
exchange=""
type="direct"
dns="amqp://WmsQue:EizM9I1TAxoO0tmzoOm@172.18.137.33:5672/"
\ No newline at end of file
......@@ -16,6 +16,7 @@ require (
github.com/spf13/viper v1.7.1
github.com/syyongx/php2go v0.9.4
github.com/tidwall/gjson v1.6.8
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e
google.golang.org/protobuf v1.25.0 // indirect
xorm.io/xorm v1.0.7
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment