Commit 6966cf56 by 孙龙

up

parent 33abea4a
......@@ -28,6 +28,7 @@ go run ./cmd/main.go
go run ./cmd/crm/recv_comuser/main.go -config=./config/prod/ -logdir = ./logs/
go run ./cmd/crm/sync_comuser_to_erp/main.go -config=./config/prod/ -logdir = ./logs/
go run ./cmd/crm/sendmsg/main.go -config=./config/prod/ -logdir = ./logs/
##项目的目录结构
......
package crm_comuser
import (
"github.com/ichunt2019/cfg/lib"
)
type BaseDatabase struct {
DataSourceName string
MaxIdleCons int
MaxOpenCons int
Prefix string
}
type GroupDatabase struct {
DataSourceName []string
MaxIdleCons int
MaxOpenCons int
Prefix string
}
//单、主数据 master
func BuildDatabaseList() (DatabaseList map[string]BaseDatabase) {
return map[string]BaseDatabase{
"crm": {
DataSourceName: lib.Instance("db").GetString("crm.data_source_name"),
Prefix: lib.Instance("db").GetString("crm.table_prefix"),
MaxIdleCons:lib.Instance("db").GetInt("crm.max_idle_conn"),
MaxOpenCons:lib.Instance("db").GetInt("crm.max_open_conn"),
},
}
}
//主从数据 master slave
func BuildGroupDatabaseList() (DatabaseList map[string]GroupDatabase) {
return map[string]GroupDatabase{
//"liexin": {
// DataSourceName:lib.Instance("db").GetStringSlice("liexin.data_source_name"),
// Prefix: lib.Instance("db").GetString("liexin.table_prefix"),
// MaxIdleCons:lib.Instance("db").GetInt("liexin.max_idle_conn"),
// MaxOpenCons:lib.Instance("db").GetInt("liexin.max_open_conn"),
//},
}
}
package crm_comuser
import (
"github.com/ichunt2019/cfg/lib"
"time"
)
type RedisGroupDatabase struct {
MasterHost string
Password string //master密码
SlaveHost []string
ReadonlyPassword string //从服务器密码
DialTimeout time.Duration //连接超时
MinIdleConns int //空间链接
ReadTimeout time.Duration //读超时
WriteTimeout time.Duration //写超时
}
//多数据库配置
func BuildRedisGroupConfigs() ( map[string]RedisGroupDatabase) {
//fmt.Println("6666666666")
//fmt.Println(lib.Instance("redis").GetStringSlice("api.slave.host"))
return map[string]RedisGroupDatabase{
"api": {
MasterHost: lib.Instance("redis").GetString("api.master.host"),
Password:lib.Instance("redis").GetString("api.master.password"),
SlaveHost: lib.Instance("redis").GetStringSlice("api.slave.host"),
ReadonlyPassword:lib.Instance("redis").GetString("api.slave.password"),
DialTimeout: time.Duration(lib.Instance("redis").GetInt("api.dial_timeout"))*time.Second,
ReadTimeout: time.Duration(lib.Instance("redis").GetInt("api.read_timeout"))*time.Second,
WriteTimeout: time.Duration(lib.Instance("redis").GetInt("api.write_timeout"))*time.Second,
MinIdleConns:lib.Instance("redis").GetInt("api.min_idle_conns"),
},
"sku": {
MasterHost: lib.Instance("redis").GetString("sku.master.host"),
Password:lib.Instance("redis").GetString("sku.master.password"),
SlaveHost: lib.Instance("redis").GetStringSlice("sku.slave.host"),
ReadonlyPassword:lib.Instance("redis").GetString("sku.slave.password"),
DialTimeout: time.Duration(lib.Instance("redis").GetInt("sku.dial_timeout"))*time.Second,
ReadTimeout: time.Duration(lib.Instance("redis").GetInt("sku.read_timeout"))*time.Second,
WriteTimeout: time.Duration(lib.Instance("redis").GetInt("sku.write_timeout"))*time.Second,
MinIdleConns:lib.Instance("redis").GetInt("sku.min_idle_conns"),
},
}
}
package cron_sendmsg
import (
"golang-asynctask/app/common/config/cron_sendmsg"
"sync"
"time"
"xorm.io/xorm"
"github.com/go-redis/redis/v7"
_ "github.com/go-sql-driver/mysql"
"github.com/ichunt2019/cfg/lib"
redisPool "github.com/ichunt2019/go-redis-pool"
)
var (
once sync.Once
Dao *dao
)
type dao struct {
db map[string]*xorm.Engine //非主从mysql数据库 连接池
dbGroup map[string]*xorm.EngineGroup //mysql主从 连接池
redisGroup map[string]*redisPool.Pool //redis 主从 连接池
}
//获取db实例
func (self *dao) GetDb(databases string) *xorm.Engine {
return self.db[databases]
}
//获取主从db实例
//获取主从db实例
//func (self *dao) GetDbGroup(databases string) *xorm.EngineGroup {
// return self.dbGroup[databases]
//}
//获取主从db实例
//func (self *dao) GetRedisDbGroup(databases string) *redisPool.Pool {
// return self.redisGroup[databases]
//}
func mysqlSetup(d *dao) *dao {
var (
err error
)
DatabaseList := crm_comuser.BuildDatabaseList()
GroupDatabaseList := crm_comuser.BuildGroupDatabaseList()
if len(DatabaseList) > 0 {
for conName, db := range DatabaseList {
d.db[conName], err = xorm.NewEngine("mysql", db.DataSourceName)
if err != nil {
panic(err)
}
//日志打印SQL
ShowSql := lib.Instance("db").GetBool("xorm.ShowSQL")
d.db[conName].ShowSQL(ShowSql)
//设置连接池的空闲数大小
d.db[conName].SetMaxIdleConns(db.MaxIdleCons)
//设置最大打开连接数
d.db[conName].SetMaxOpenConns(db.MaxOpenCons)
}
}
if len(GroupDatabaseList) > 0 {
for conName, db := range GroupDatabaseList {
d.dbGroup[conName], err = xorm.NewEngineGroup("mysql", db.DataSourceName)
if err != nil {
panic(err)
}
//日志打印SQL
ShowSql := lib.Instance("db").GetBool("xorm.ShowSQL")
d.dbGroup[conName].ShowSQL(ShowSql)
//设置连接池的空闲数大小
d.dbGroup[conName].SetMaxIdleConns(db.MaxIdleCons)
//设置最大打开连接数
d.dbGroup[conName].SetMaxOpenConns(db.MaxOpenCons)
}
}
return d
}
func redisSetup(d *dao) *dao {
var err error
redisGroupList := crm_comuser.BuildRedisGroupConfigs()
//fmt.Println(redisGroupList)
for redisServerName, redisInfo := range redisGroupList {
d.redisGroup[redisServerName], err = redisPool.NewHA(&redisPool.HAConfig{
Master: redisInfo.MasterHost,
Slaves: redisInfo.SlaveHost,
Password: redisInfo.Password,
ReadonlyPassword: redisInfo.ReadonlyPassword,
Options: &redis.Options{
DialTimeout: redisInfo.DialTimeout, //连接超时
MinIdleConns: redisInfo.MinIdleConns, //空闲链接数
ReadTimeout: redisInfo.ReadTimeout,
WriteTimeout: redisInfo.WriteTimeout,
},
AutoEjectHost: true,//是否弹出故障主机
ServerFailureLimit: 3,//达到失败次数时弹出
ServerRetryTimeout: 5 * time.Second,//在“ServerRetryTimeout”之后重试弹出的主机`
MinServerNum: 0,//保留min服务器 针对从服务器
})
if err != nil {
panic(err)
}
}
return d
}
func Init() {
Dao = &dao{}
once.Do(func() {
//单、主数据 master
Dao.db = make(map[string]*xorm.Engine, 0)
////主从数据 master slave
//Dao.dbGroup = make(map[string]*xorm.EngineGroup, 0)
////redis连接池 支持主从 master slave
//Dao.redisGroup = make(map[string]*redisPool.Pool, 0)
Dao = mysqlSetup(Dao)
//Dao = redisSetup(Dao)
})
}
package cron_sendmsg
import (
"errors"
"fmt"
"github.com/tidwall/gjson"
"golang-asynctask/app/dao/crm/cron_sendmsg"
"net/http"
"net/url"
"time"
cfg "github.com/ichunt2019/cfg/lib"
comfunc "golang-asynctask/util/lib"
)
func SendMsg() (err error){
//fmt.Println("66666")
currentTime := time.Now().Format("2006-01-02")
//println(currentTime)
//currentTime = "2021-05-06"
sql := "select count(id) as applyNums,apply_admin_id as sale_id,sale_type from lie_invoice_com_apply " +
"where apply_type = 3 and apply_admin_id != 0 and FROM_UNIXTIME(create_time,'%Y-%m-%d') = ? group by apply_admin_id "
res,err :=cron_sendmsg.Dao.GetDb("crm").QueryString(sql,currentTime)
if err != nil{
return err
}
fmt.Println(res)
if(len(res) <= 0){
return
}
for _,v := range res{
if(v["applyNums"] == "0"){
continue
}
//fmt.Println(v)
postToCrm(v)
}
return
}
func postToCrm(data map[string]string) (err error){
push_url := cfg.Instance("config").GetString("sendMsgByChangeInvoiceCom")
urlParams := url.Values{}
header := http.Header{}
header.Set("api-key",cfg.Instance("config").GetString("apiKey"))
header.Set("Content-Type","application/json")
urlParams.Add("applyNums",data["applyNums"])
urlParams.Add("sale_id",data["sale_id"])
urlParams.Add("sale_type",data["sale_type"])
response,returnData,err := comfunc.HttpPOST(push_url,urlParams,0,header,"")
//fmt.Println(response.StatusCode,string(returnData),err)
if err == nil && response.StatusCode == 200{
err_code := gjson.Parse(string(returnData)).Get("errcode").Int()
msg := gjson.Parse(string(returnData)).Get("errmsg").String()
if err_code != 0{
err = errors.New(msg)
}
}
return
}
......@@ -4,10 +4,10 @@ import (
"encoding/json"
"github.com/ichunt2019/cfg/lib"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
//"fmt"
"github.com/syyongx/php2go"
"github.com/tidwall/gjson"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
"github.com/tidwall/gjson"
)
type comUser struct {
......@@ -17,6 +17,13 @@ type comUser struct {
}
type invoiceComApply struct {
Id int64 `json:"id"`
ComId int64 `json:"com_id"`
UserId int64 `json:"user_id"`
SaleId int64 `json:"apply_admin_id"`
}
func DispenseMsg(data string) (err error){
err = nil
//database := gjson.Parse(data).Get("database").String()
......@@ -69,9 +76,28 @@ func DispenseMsg(data string) (err error){
}
}
}
if table == "lie_invoice_com_apply" && action_type == "insert"{
apply_type := gjson.Parse(data).Get("data.apply_type").Int()
if apply_type == 2{
invoiceComApplyInfo := &invoiceComApply{}
invoiceComApplyInfo.Id = gjson.Parse(data).Get("data.id").Int()
invoiceComApplyInfo.ComId = gjson.Parse(data).Get("data.com_id").Int()
invoiceComApplyInfo.UserId = gjson.Parse(data).Get("data.user_id").Int()
invoiceComApplyInfo.SaleId = gjson.Parse(data).Get("data.apply_admin_id").Int()
data,err := json.Marshal(invoiceComApplyInfo)
if err == nil{
sendInvoiceComApplyMsg(string(data))
}
}
}
return err
}
//分发 lie_invoice_company lie_invoice_com_user表消息到 队列
func DispenseMsgToMq(msg string) (err error){
queueExchange := rabbitmq.QueueExchange{
lib.Instance("config").GetString("rabbitmq_crm_comuser.queue_name"),
......@@ -82,4 +108,17 @@ func DispenseMsgToMq(msg string) (err error){
}
rabbitmq.Send(queueExchange,msg)
return
}
//分发消息 申请类型是公司新增联系人
func sendInvoiceComApplyMsg(msg string) (err error){
queueExchange := rabbitmq.QueueExchange{
lib.Instance("config").GetString("rabbitmq_crm_invoice_com_apply.queue_name"),
lib.Instance("config").GetString("rabbitmq_crm_invoice_com_apply.routing_key"),
lib.Instance("config").GetString("rabbitmq_crm_invoice_com_apply.exchange"),
lib.Instance("config").GetString("rabbitmq_crm_invoice_com_apply.type"),
lib.Instance("config").GetString("rabbitmq_crm_invoice_com_apply.dns"),
}
rabbitmq.Send(queueExchange,msg)
return
}
\ No newline at end of file
package boot
import (
"github.com/ichunt2019/cfg/lib"
xlog "github.com/ichunt2019/lxLog/log"
"golang-asynctask/app/dao/crm/cron_sendmsg"
)
func Init(configPath string,logPath string)(err error){
err = lib.Init(configPath)
if err != nil{
panic(err)
}
xlog.Init(logPath,"cron_sendmsg")
//初始化数据库
cron_sendmsg.Init()
return
}
package boot
import (
"github.com/ichunt2019/cfg/lib"
xlog "github.com/ichunt2019/lxLog/log"
)
func Init(configPath string,logPath string)(err error){
err = lib.Init(configPath)
if err != nil{
panic(err)
}
xlog.Init(logPath,"sendmsg")
//初始化数据库
//dao.Init()
return
}
类型2:
1.消息渠道:邮件
2.消息触发条件:做定时任务,判断公司审核列表是否新增”公司发票修改”申请,有则18:00定时发送邮件
3.消息发送邮箱账号:申请销售/当前跟进销售邮箱账号
4.消息描述:今日有“X“位用户,申请修改公司发票,请进入CRM系统公司审核列表进行审核。
\ No newline at end of file
package main
import (
"flag"
"golang-asynctask/app/service/crm/cron_sendmsg"
boot "golang-asynctask/boot/cron_sendmsg"
)
var (
configPath string
logPath string
)
func main(){
flag.StringVar(&configPath, "config", "./config/dev/", "配置文件")
flag.StringVar(&logPath, "logdir", "./logs/", "日志文件存储目录")
flag.Parse()
boot.Init(configPath,logPath)
cron_sendmsg.SendMsg()
}
#分发 maxwell crm数据库表的 insert delete update操作
\ No newline at end of file
类型1:
1.消息渠道:邮件
2.消息触发条件:公司审核列表新增”申请类型是公司新增联系人”申请
3.消息发送邮箱账号:管理员邮箱账号;查看下级角色邮箱账号;当前公司跟进销售邮箱账号;申请销售邮箱账号
4.消息描述:销售员”XXX“申请关联公司:”XXXXXX",当前公司“XXXXX”跟进销售为销售员"XXX,XXX“,请销售员与管理员确认公司关联关系,由管理员”XXX“进入CRM系统公司审核列表进行审核。
(注意:非跨部门公司新增联系人申请,管理员指查看下级角色;款部门公司新增联系人申请,管理员指公司管理员角色”)
\ No newline at end of file
package main
import (
"flag"
boot "golang-asynctask/boot/sendmsg"
"golang-asynctask/cmd/crm/sendmsg/service"
"os"
"os/signal"
"syscall"
)
var (
configPath string
logPath string
)
func main(){
flag.StringVar(&configPath, "config", "./config/dev/", "配置文件")
flag.StringVar(&logPath, "logdir", "./logs/", "日志文件存储目录")
flag.Parse()
boot.Init(configPath,logPath)
service.Listen()
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
\ No newline at end of file
package service
import (
"errors"
"net/http"
"net/url"
"strconv"
cfg "github.com/ichunt2019/cfg/lib"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
logger "github.com/ichunt2019/lxLog/log"
"github.com/tidwall/gjson"
comfunc "golang-asynctask/util/lib"
)
type RecvPro struct {
}
//// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db
func (t *RecvPro) Consumer(dataByte []byte) error {
logger.Instance("sendmsg").Info(string(dataByte))
error := sendmsg(string(dataByte))
return error
}
//消息已经消费3次 失败了 请进行处理
func (t *RecvPro) FailAction(err error,dataByte []byte) error {
logger.Instance("sendmsg").Error("任务处理失败了,我要进入db日志库了")
logger.Instance("sendmsg").Error("任务处理失败了,发送钉钉消息通知主人")
logger.Instance("sendmsg").Error(string(dataByte))
logger.Instance("sendmsg").Error("错误原因:%s",err)
return nil
}
func sendmsg(data string) (err error){
push_url := cfg.Instance("config").GetString("crmSendMsgByconflictSale")
urlParams := url.Values{}
applyId := gjson.Parse(data).Get("id").Int()
header := http.Header{}
header.Set("api-key",cfg.Instance("config").GetString("apiKey"))
header.Set("Content-Type","application/json")
urlParams.Add("id",strconv.FormatInt(applyId,10))
response,returnData,err := comfunc.HttpPOST(push_url,urlParams,0,header,"")
//fmt.Println(response.StatusCode,string(returnData),err)
if err == nil && response.StatusCode == 200{
err_code := gjson.Parse(string(returnData)).Get("errcode").Int()
msg := gjson.Parse(string(returnData)).Get("errmsg").String()
if err_code != 0{
err = errors.New(msg)
}
}
return
}
func Listen(){
t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{
cfg.Instance("config").GetString("rabbitmq_crm_invoice_com_apply.queue_name"),
cfg.Instance("config").GetString("rabbitmq_crm_invoice_com_apply.routing_key"),
cfg.Instance("config").GetString("rabbitmq_crm_invoice_com_apply.exchange"),
cfg.Instance("config").GetString("rabbitmq_crm_invoice_com_apply.type"),
cfg.Instance("config").GetString("rabbitmq_crm_invoice_com_apply.dns"),
},t,1)
}
#异步任务 消费者 同步公司+销售 到erp
\ No newline at end of file
......@@ -3,6 +3,8 @@ title = "TOML 例子"
viewpath = "/home/www/templates/"
pushErpDomain = "http://crm.liexin.net/api/pushComUserRelationToErp"
crmSendMsgByconflictSale = "http://crm.liexin.net/api/sendMsgByconflictSale"
sendMsgByChangeInvoiceCom = "http://crm.liexin.net/api/sendMsgByChangeInvoiceCom"
apiKey = "crm a1b2c3d4e5f6g7h8i9jk"
[owner]
......@@ -28,6 +30,14 @@ type="direct"
dns="amqp://guest:guest@192.168.2.232:5672/"
[rabbitmq_crm_invoice_com_apply]
queue_name="crm_invoice_com_apply"
routing_key="crm_invoice_com_apply"
exchange=""
type="direct"
dns="amqp://guest:guest@192.168.2.232:5672/"
[servers]
# 你可以依照你的意愿缩进。使用空格或Tab。TOML不会在意。
[servers.alpha]
......
......@@ -3,8 +3,8 @@ ShowSQL = true
[crm]
data_source_name = "liexin_crm:liexin_crm#zsyM@tcp(192.168.1.235:3306)/liexin_crm?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 3
max_idle_conn = 1
max_open_conn = 0
max_idle_conn = 0
table_prefix = "lie_"
max_conn_life_time = 100
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment